gcp.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from __future__ import annotations as _annotations
  2. from collections.abc import Iterator, Mapping
  3. from functools import cached_property
  4. from typing import TYPE_CHECKING, Optional
  5. from .env import EnvSettingsSource
  6. if TYPE_CHECKING:
  7. from google.auth import default as google_auth_default
  8. from google.auth.credentials import Credentials
  9. from google.cloud.secretmanager import SecretManagerServiceClient
  10. from pydantic_settings.main import BaseSettings
  11. else:
  12. Credentials = None
  13. SecretManagerServiceClient = None
  14. google_auth_default = None
  15. def import_gcp_secret_manager() -> None:
  16. global Credentials
  17. global SecretManagerServiceClient
  18. global google_auth_default
  19. try:
  20. from google.auth import default as google_auth_default
  21. from google.auth.credentials import Credentials
  22. from google.cloud.secretmanager import SecretManagerServiceClient
  23. except ImportError as e: # pragma: no cover
  24. raise ImportError(
  25. 'GCP Secret Manager dependencies are not installed, run `pip install pydantic-settings[gcp-secret-manager]`'
  26. ) from e
  27. class GoogleSecretManagerMapping(Mapping[str, Optional[str]]):
  28. _loaded_secrets: dict[str, str | None]
  29. _secret_client: SecretManagerServiceClient
  30. def __init__(self, secret_client: SecretManagerServiceClient, project_id: str, case_sensitive: bool) -> None:
  31. self._loaded_secrets = {}
  32. self._secret_client = secret_client
  33. self._project_id = project_id
  34. self._case_sensitive = case_sensitive
  35. @property
  36. def _gcp_project_path(self) -> str:
  37. return self._secret_client.common_project_path(self._project_id)
  38. @cached_property
  39. def _secret_names(self) -> list[str]:
  40. rv: list[str] = []
  41. secrets = self._secret_client.list_secrets(parent=self._gcp_project_path)
  42. for secret in secrets:
  43. name = self._secret_client.parse_secret_path(secret.name).get('secret', '')
  44. if not self._case_sensitive:
  45. name = name.lower()
  46. rv.append(name)
  47. return rv
  48. def _secret_version_path(self, key: str, version: str = 'latest') -> str:
  49. return self._secret_client.secret_version_path(self._project_id, key, version)
  50. def __getitem__(self, key: str) -> str | None:
  51. if not self._case_sensitive:
  52. key = key.lower()
  53. if key not in self._loaded_secrets:
  54. # If we know the key isn't available in secret manager, raise a key error
  55. if key not in self._secret_names:
  56. raise KeyError(key)
  57. try:
  58. self._loaded_secrets[key] = self._secret_client.access_secret_version(
  59. name=self._secret_version_path(key)
  60. ).payload.data.decode('UTF-8')
  61. except Exception:
  62. raise KeyError(key)
  63. return self._loaded_secrets[key]
  64. def __len__(self) -> int:
  65. return len(self._secret_names)
  66. def __iter__(self) -> Iterator[str]:
  67. return iter(self._secret_names)
  68. class GoogleSecretManagerSettingsSource(EnvSettingsSource):
  69. _credentials: Credentials
  70. _secret_client: SecretManagerServiceClient
  71. _project_id: str
  72. def __init__(
  73. self,
  74. settings_cls: type[BaseSettings],
  75. credentials: Credentials | None = None,
  76. project_id: str | None = None,
  77. env_prefix: str | None = None,
  78. env_parse_none_str: str | None = None,
  79. env_parse_enums: bool | None = None,
  80. secret_client: SecretManagerServiceClient | None = None,
  81. case_sensitive: bool | None = True,
  82. ) -> None:
  83. # Import Google Packages if they haven't already been imported
  84. if SecretManagerServiceClient is None or Credentials is None or google_auth_default is None:
  85. import_gcp_secret_manager()
  86. # If credentials or project_id are not passed, then
  87. # try to get them from the default function
  88. if not credentials or not project_id:
  89. _creds, _project_id = google_auth_default() # type: ignore[no-untyped-call]
  90. # Set the credentials and/or project id if they weren't specified
  91. if credentials is None:
  92. credentials = _creds
  93. if project_id is None:
  94. if isinstance(_project_id, str):
  95. project_id = _project_id
  96. else:
  97. raise AttributeError(
  98. 'project_id is required to be specified either as an argument or from the google.auth.default. See https://google-auth.readthedocs.io/en/master/reference/google.auth.html#google.auth.default'
  99. )
  100. self._credentials: Credentials = credentials
  101. self._project_id: str = project_id
  102. if secret_client:
  103. self._secret_client = secret_client
  104. else:
  105. self._secret_client = SecretManagerServiceClient(credentials=self._credentials)
  106. super().__init__(
  107. settings_cls,
  108. case_sensitive=case_sensitive,
  109. env_prefix=env_prefix,
  110. env_ignore_empty=False,
  111. env_parse_none_str=env_parse_none_str,
  112. env_parse_enums=env_parse_enums,
  113. )
  114. def _load_env_vars(self) -> Mapping[str, Optional[str]]:
  115. return GoogleSecretManagerMapping(
  116. self._secret_client, project_id=self._project_id, case_sensitive=self.case_sensitive
  117. )
  118. def __repr__(self) -> str:
  119. return f'{self.__class__.__name__}(project_id={self._project_id!r}, env_nested_delimiter={self.env_nested_delimiter!r})'
  120. __all__ = ['GoogleSecretManagerSettingsSource', 'GoogleSecretManagerMapping']