win32util.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. import sys
  2. import dns._features
  3. # pylint: disable=W0612,W0613,C0301
  4. if sys.platform == "win32":
  5. import ctypes
  6. import ctypes.wintypes as wintypes
  7. import winreg # pylint: disable=import-error
  8. from enum import IntEnum
  9. import dns.name
  10. # Keep pylint quiet on non-windows.
  11. try:
  12. _ = WindowsError # pylint: disable=used-before-assignment
  13. except NameError:
  14. WindowsError = Exception
  15. class ConfigMethod(IntEnum):
  16. Registry = 1
  17. WMI = 2
  18. Win32 = 3
  19. class DnsInfo:
  20. def __init__(self):
  21. self.domain = None
  22. self.nameservers = []
  23. self.search = []
  24. _config_method = ConfigMethod.Registry
  25. if dns._features.have("wmi"):
  26. import threading
  27. import pythoncom # pylint: disable=import-error
  28. import wmi # pylint: disable=import-error
  29. # Prefer WMI by default if wmi is installed.
  30. _config_method = ConfigMethod.WMI
  31. class _WMIGetter(threading.Thread):
  32. # pylint: disable=possibly-used-before-assignment
  33. def __init__(self):
  34. super().__init__()
  35. self.info = DnsInfo()
  36. def run(self):
  37. pythoncom.CoInitialize()
  38. try:
  39. system = wmi.WMI()
  40. for interface in system.Win32_NetworkAdapterConfiguration():
  41. if interface.IPEnabled and interface.DNSServerSearchOrder:
  42. self.info.nameservers = list(interface.DNSServerSearchOrder)
  43. if interface.DNSDomain:
  44. self.info.domain = _config_domain(interface.DNSDomain)
  45. if interface.DNSDomainSuffixSearchOrder:
  46. self.info.search = [
  47. _config_domain(x)
  48. for x in interface.DNSDomainSuffixSearchOrder
  49. ]
  50. break
  51. finally:
  52. pythoncom.CoUninitialize()
  53. def get(self):
  54. # We always run in a separate thread to avoid any issues with
  55. # the COM threading model.
  56. self.start()
  57. self.join()
  58. return self.info
  59. else:
  60. class _WMIGetter: # type: ignore
  61. pass
  62. def _config_domain(domain):
  63. # Sometimes DHCP servers add a '.' prefix to the default domain, and
  64. # Windows just stores such values in the registry (see #687).
  65. # Check for this and fix it.
  66. if domain.startswith("."):
  67. domain = domain[1:]
  68. return dns.name.from_text(domain)
  69. class _RegistryGetter:
  70. def __init__(self):
  71. self.info = DnsInfo()
  72. def _split(self, text):
  73. # The windows registry has used both " " and "," as a delimiter, and while
  74. # it is currently using "," in Windows 10 and later, updates can seemingly
  75. # leave a space in too, e.g. "a, b". So we just convert all commas to
  76. # spaces, and use split() in its default configuration, which splits on
  77. # all whitespace and ignores empty strings.
  78. return text.replace(",", " ").split()
  79. def _config_nameservers(self, nameservers):
  80. for ns in self._split(nameservers):
  81. if ns not in self.info.nameservers:
  82. self.info.nameservers.append(ns)
  83. def _config_search(self, search):
  84. for s in self._split(search):
  85. s = _config_domain(s)
  86. if s not in self.info.search:
  87. self.info.search.append(s)
  88. def _config_fromkey(self, key, always_try_domain):
  89. try:
  90. servers, _ = winreg.QueryValueEx(key, "NameServer")
  91. except WindowsError:
  92. servers = None
  93. if servers:
  94. self._config_nameservers(servers)
  95. if servers or always_try_domain:
  96. try:
  97. dom, _ = winreg.QueryValueEx(key, "Domain")
  98. if dom:
  99. self.info.domain = _config_domain(dom)
  100. except WindowsError:
  101. pass
  102. else:
  103. try:
  104. servers, _ = winreg.QueryValueEx(key, "DhcpNameServer")
  105. except WindowsError:
  106. servers = None
  107. if servers:
  108. self._config_nameservers(servers)
  109. try:
  110. dom, _ = winreg.QueryValueEx(key, "DhcpDomain")
  111. if dom:
  112. self.info.domain = _config_domain(dom)
  113. except WindowsError:
  114. pass
  115. try:
  116. search, _ = winreg.QueryValueEx(key, "SearchList")
  117. except WindowsError:
  118. search = None
  119. if search is None:
  120. try:
  121. search, _ = winreg.QueryValueEx(key, "DhcpSearchList")
  122. except WindowsError:
  123. search = None
  124. if search:
  125. self._config_search(search)
  126. def _is_nic_enabled(self, lm, guid):
  127. # Look in the Windows Registry to determine whether the network
  128. # interface corresponding to the given guid is enabled.
  129. #
  130. # (Code contributed by Paul Marks, thanks!)
  131. #
  132. try:
  133. # This hard-coded location seems to be consistent, at least
  134. # from Windows 2000 through Vista.
  135. connection_key = winreg.OpenKey(
  136. lm,
  137. r"SYSTEM\CurrentControlSet\Control\Network"
  138. r"\{4D36E972-E325-11CE-BFC1-08002BE10318}"
  139. rf"\{guid}\Connection",
  140. )
  141. try:
  142. # The PnpInstanceID points to a key inside Enum
  143. (pnp_id, ttype) = winreg.QueryValueEx(
  144. connection_key, "PnpInstanceID"
  145. )
  146. if ttype != winreg.REG_SZ:
  147. raise ValueError # pragma: no cover
  148. device_key = winreg.OpenKey(
  149. lm, rf"SYSTEM\CurrentControlSet\Enum\{pnp_id}"
  150. )
  151. try:
  152. # Get ConfigFlags for this device
  153. (flags, ttype) = winreg.QueryValueEx(device_key, "ConfigFlags")
  154. if ttype != winreg.REG_DWORD:
  155. raise ValueError # pragma: no cover
  156. # Based on experimentation, bit 0x1 indicates that the
  157. # device is disabled.
  158. #
  159. # XXXRTH I suspect we really want to & with 0x03 so
  160. # that CONFIGFLAGS_REMOVED devices are also ignored,
  161. # but we're shifting to WMI as ConfigFlags is not
  162. # supposed to be used.
  163. return not flags & 0x1
  164. finally:
  165. device_key.Close()
  166. finally:
  167. connection_key.Close()
  168. except Exception: # pragma: no cover
  169. return False
  170. def get(self):
  171. """Extract resolver configuration from the Windows registry."""
  172. lm = winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE)
  173. try:
  174. tcp_params = winreg.OpenKey(
  175. lm, r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters"
  176. )
  177. try:
  178. self._config_fromkey(tcp_params, True)
  179. finally:
  180. tcp_params.Close()
  181. interfaces = winreg.OpenKey(
  182. lm,
  183. r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters\Interfaces",
  184. )
  185. try:
  186. i = 0
  187. while True:
  188. try:
  189. guid = winreg.EnumKey(interfaces, i)
  190. i += 1
  191. key = winreg.OpenKey(interfaces, guid)
  192. try:
  193. if not self._is_nic_enabled(lm, guid):
  194. continue
  195. self._config_fromkey(key, False)
  196. finally:
  197. key.Close()
  198. except OSError:
  199. break
  200. finally:
  201. interfaces.Close()
  202. finally:
  203. lm.Close()
  204. return self.info
  205. class _Win32Getter(_RegistryGetter):
  206. def get(self):
  207. """Get the attributes using the Windows API."""
  208. # Load the IP Helper library
  209. # # https://learn.microsoft.com/en-us/windows/win32/api/iphlpapi/nf-iphlpapi-getadaptersaddresses
  210. IPHLPAPI = ctypes.WinDLL("Iphlpapi.dll")
  211. # Constants
  212. AF_UNSPEC = 0
  213. ERROR_SUCCESS = 0
  214. GAA_FLAG_INCLUDE_PREFIX = 0x00000010
  215. AF_INET = 2
  216. AF_INET6 = 23
  217. IF_TYPE_SOFTWARE_LOOPBACK = 24
  218. # Define necessary structures
  219. class SOCKADDRV4(ctypes.Structure):
  220. _fields_ = [
  221. ("sa_family", wintypes.USHORT),
  222. ("sa_data", ctypes.c_ubyte * 14),
  223. ]
  224. class SOCKADDRV6(ctypes.Structure):
  225. _fields_ = [
  226. ("sa_family", wintypes.USHORT),
  227. ("sa_data", ctypes.c_ubyte * 26),
  228. ]
  229. class SOCKET_ADDRESS(ctypes.Structure):
  230. _fields_ = [
  231. ("lpSockaddr", ctypes.POINTER(SOCKADDRV4)),
  232. ("iSockaddrLength", wintypes.INT),
  233. ]
  234. class IP_ADAPTER_DNS_SERVER_ADDRESS(ctypes.Structure):
  235. pass # Forward declaration
  236. IP_ADAPTER_DNS_SERVER_ADDRESS._fields_ = [
  237. ("Length", wintypes.ULONG),
  238. ("Reserved", wintypes.DWORD),
  239. ("Next", ctypes.POINTER(IP_ADAPTER_DNS_SERVER_ADDRESS)),
  240. ("Address", SOCKET_ADDRESS),
  241. ]
  242. class IF_LUID(ctypes.Structure):
  243. _fields_ = [("Value", ctypes.c_ulonglong)]
  244. class NET_IF_NETWORK_GUID(ctypes.Structure):
  245. _fields_ = [("Value", ctypes.c_ubyte * 16)]
  246. class IP_ADAPTER_PREFIX_XP(ctypes.Structure):
  247. pass # Left undefined here for simplicity
  248. class IP_ADAPTER_GATEWAY_ADDRESS_LH(ctypes.Structure):
  249. pass # Left undefined here for simplicity
  250. class IP_ADAPTER_DNS_SUFFIX(ctypes.Structure):
  251. _fields_ = [
  252. ("String", ctypes.c_wchar * 256),
  253. ("Next", ctypes.POINTER(ctypes.c_void_p)),
  254. ]
  255. class IP_ADAPTER_UNICAST_ADDRESS_LH(ctypes.Structure):
  256. pass # Left undefined here for simplicity
  257. class IP_ADAPTER_MULTICAST_ADDRESS_XP(ctypes.Structure):
  258. pass # Left undefined here for simplicity
  259. class IP_ADAPTER_ANYCAST_ADDRESS_XP(ctypes.Structure):
  260. pass # Left undefined here for simplicity
  261. class IP_ADAPTER_DNS_SERVER_ADDRESS_XP(ctypes.Structure):
  262. pass # Left undefined here for simplicity
  263. class IP_ADAPTER_ADDRESSES(ctypes.Structure):
  264. pass # Forward declaration
  265. IP_ADAPTER_ADDRESSES._fields_ = [
  266. ("Length", wintypes.ULONG),
  267. ("IfIndex", wintypes.DWORD),
  268. ("Next", ctypes.POINTER(IP_ADAPTER_ADDRESSES)),
  269. ("AdapterName", ctypes.c_char_p),
  270. ("FirstUnicastAddress", ctypes.POINTER(SOCKET_ADDRESS)),
  271. ("FirstAnycastAddress", ctypes.POINTER(SOCKET_ADDRESS)),
  272. ("FirstMulticastAddress", ctypes.POINTER(SOCKET_ADDRESS)),
  273. (
  274. "FirstDnsServerAddress",
  275. ctypes.POINTER(IP_ADAPTER_DNS_SERVER_ADDRESS),
  276. ),
  277. ("DnsSuffix", wintypes.LPWSTR),
  278. ("Description", wintypes.LPWSTR),
  279. ("FriendlyName", wintypes.LPWSTR),
  280. ("PhysicalAddress", ctypes.c_ubyte * 8),
  281. ("PhysicalAddressLength", wintypes.ULONG),
  282. ("Flags", wintypes.ULONG),
  283. ("Mtu", wintypes.ULONG),
  284. ("IfType", wintypes.ULONG),
  285. ("OperStatus", ctypes.c_uint),
  286. # Remaining fields removed for brevity
  287. ]
  288. def format_ipv4(sockaddr_in):
  289. return ".".join(map(str, sockaddr_in.sa_data[2:6]))
  290. def format_ipv6(sockaddr_in6):
  291. # The sa_data is:
  292. #
  293. # USHORT sin6_port;
  294. # ULONG sin6_flowinfo;
  295. # IN6_ADDR sin6_addr;
  296. # ULONG sin6_scope_id;
  297. #
  298. # which is 2 + 4 + 16 + 4 = 26 bytes, and we need the plus 6 below
  299. # to be in the sin6_addr range.
  300. parts = [
  301. sockaddr_in6.sa_data[i + 6] << 8 | sockaddr_in6.sa_data[i + 6 + 1]
  302. for i in range(0, 16, 2)
  303. ]
  304. return ":".join(f"{part:04x}" for part in parts)
  305. buffer_size = ctypes.c_ulong(15000)
  306. while True:
  307. buffer = ctypes.create_string_buffer(buffer_size.value)
  308. ret_val = IPHLPAPI.GetAdaptersAddresses(
  309. AF_UNSPEC,
  310. GAA_FLAG_INCLUDE_PREFIX,
  311. None,
  312. buffer,
  313. ctypes.byref(buffer_size),
  314. )
  315. if ret_val == ERROR_SUCCESS:
  316. break
  317. elif ret_val != 0x6F: # ERROR_BUFFER_OVERFLOW
  318. print(f"Error retrieving adapter information: {ret_val}")
  319. return
  320. adapter_addresses = ctypes.cast(
  321. buffer, ctypes.POINTER(IP_ADAPTER_ADDRESSES)
  322. )
  323. current_adapter = adapter_addresses
  324. while current_adapter:
  325. # Skip non-operational adapters.
  326. oper_status = current_adapter.contents.OperStatus
  327. if oper_status != 1:
  328. current_adapter = current_adapter.contents.Next
  329. continue
  330. # Exclude loopback adapters.
  331. if current_adapter.contents.IfType == IF_TYPE_SOFTWARE_LOOPBACK:
  332. current_adapter = current_adapter.contents.Next
  333. continue
  334. # Get the domain from the DnsSuffix attribute.
  335. dns_suffix = current_adapter.contents.DnsSuffix
  336. if dns_suffix:
  337. self.info.domain = dns.name.from_text(dns_suffix)
  338. current_dns_server = current_adapter.contents.FirstDnsServerAddress
  339. while current_dns_server:
  340. sockaddr = current_dns_server.contents.Address.lpSockaddr
  341. sockaddr_family = sockaddr.contents.sa_family
  342. ip = None
  343. if sockaddr_family == AF_INET: # IPv4
  344. ip = format_ipv4(sockaddr.contents)
  345. elif sockaddr_family == AF_INET6: # IPv6
  346. sockaddr = ctypes.cast(sockaddr, ctypes.POINTER(SOCKADDRV6))
  347. ip = format_ipv6(sockaddr.contents)
  348. if ip:
  349. if ip not in self.info.nameservers:
  350. self.info.nameservers.append(ip)
  351. current_dns_server = current_dns_server.contents.Next
  352. current_adapter = current_adapter.contents.Next
  353. # Use the registry getter to get the search info, since it is set at the system level.
  354. registry_getter = _RegistryGetter()
  355. info = registry_getter.get()
  356. self.info.search = info.search
  357. return self.info
  358. def set_config_method(method: ConfigMethod) -> None:
  359. global _config_method
  360. _config_method = method
  361. def get_dns_info() -> DnsInfo:
  362. """Extract resolver configuration."""
  363. if _config_method == ConfigMethod.Win32:
  364. getter = _Win32Getter()
  365. elif _config_method == ConfigMethod.WMI:
  366. getter = _WMIGetter()
  367. else:
  368. getter = _RegistryGetter()
  369. return getter.get()