dotenv.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """Dotenv file settings source."""
  2. from __future__ import annotations as _annotations
  3. import os
  4. import warnings
  5. from collections.abc import Mapping
  6. from pathlib import Path
  7. from typing import TYPE_CHECKING, Any
  8. from dotenv import dotenv_values
  9. from pydantic._internal._typing_extra import ( # type: ignore[attr-defined]
  10. get_origin,
  11. )
  12. from typing_inspection.introspection import is_union_origin
  13. from ..types import ENV_FILE_SENTINEL, DotenvType
  14. from ..utils import (
  15. _annotation_is_complex,
  16. _union_is_complex,
  17. parse_env_vars,
  18. )
  19. from .env import EnvSettingsSource
  20. if TYPE_CHECKING:
  21. from pydantic_settings.main import BaseSettings
  22. class DotEnvSettingsSource(EnvSettingsSource):
  23. """
  24. Source class for loading settings values from env files.
  25. """
  26. def __init__(
  27. self,
  28. settings_cls: type[BaseSettings],
  29. env_file: DotenvType | None = ENV_FILE_SENTINEL,
  30. env_file_encoding: str | None = None,
  31. case_sensitive: bool | None = None,
  32. env_prefix: str | None = None,
  33. env_nested_delimiter: str | None = None,
  34. env_nested_max_split: int | None = None,
  35. env_ignore_empty: bool | None = None,
  36. env_parse_none_str: str | None = None,
  37. env_parse_enums: bool | None = None,
  38. ) -> None:
  39. self.env_file = env_file if env_file != ENV_FILE_SENTINEL else settings_cls.model_config.get('env_file')
  40. self.env_file_encoding = (
  41. env_file_encoding if env_file_encoding is not None else settings_cls.model_config.get('env_file_encoding')
  42. )
  43. super().__init__(
  44. settings_cls,
  45. case_sensitive,
  46. env_prefix,
  47. env_nested_delimiter,
  48. env_nested_max_split,
  49. env_ignore_empty,
  50. env_parse_none_str,
  51. env_parse_enums,
  52. )
  53. def _load_env_vars(self) -> Mapping[str, str | None]:
  54. return self._read_env_files()
  55. @staticmethod
  56. def _static_read_env_file(
  57. file_path: Path,
  58. *,
  59. encoding: str | None = None,
  60. case_sensitive: bool = False,
  61. ignore_empty: bool = False,
  62. parse_none_str: str | None = None,
  63. ) -> Mapping[str, str | None]:
  64. file_vars: dict[str, str | None] = dotenv_values(file_path, encoding=encoding or 'utf8')
  65. return parse_env_vars(file_vars, case_sensitive, ignore_empty, parse_none_str)
  66. def _read_env_file(
  67. self,
  68. file_path: Path,
  69. ) -> Mapping[str, str | None]:
  70. return self._static_read_env_file(
  71. file_path,
  72. encoding=self.env_file_encoding,
  73. case_sensitive=self.case_sensitive,
  74. ignore_empty=self.env_ignore_empty,
  75. parse_none_str=self.env_parse_none_str,
  76. )
  77. def _read_env_files(self) -> Mapping[str, str | None]:
  78. env_files = self.env_file
  79. if env_files is None:
  80. return {}
  81. if isinstance(env_files, (str, os.PathLike)):
  82. env_files = [env_files]
  83. dotenv_vars: dict[str, str | None] = {}
  84. for env_file in env_files:
  85. env_path = Path(env_file).expanduser()
  86. if env_path.is_file():
  87. dotenv_vars.update(self._read_env_file(env_path))
  88. return dotenv_vars
  89. def __call__(self) -> dict[str, Any]:
  90. data: dict[str, Any] = super().__call__()
  91. is_extra_allowed = self.config.get('extra') != 'forbid'
  92. # As `extra` config is allowed in dotenv settings source, We have to
  93. # update data with extra env variables from dotenv file.
  94. for env_name, env_value in self.env_vars.items():
  95. if not env_value or env_name in data or (self.env_prefix and env_name in self.settings_cls.model_fields):
  96. continue
  97. env_used = False
  98. for field_name, field in self.settings_cls.model_fields.items():
  99. for _, field_env_name, _ in self._extract_field_info(field, field_name):
  100. if env_name == field_env_name or (
  101. (
  102. _annotation_is_complex(field.annotation, field.metadata)
  103. or (
  104. is_union_origin(get_origin(field.annotation))
  105. and _union_is_complex(field.annotation, field.metadata)
  106. )
  107. )
  108. and env_name.startswith(field_env_name)
  109. ):
  110. env_used = True
  111. break
  112. if env_used:
  113. break
  114. if not env_used:
  115. if is_extra_allowed and env_name.startswith(self.env_prefix):
  116. # env_prefix should be respected and removed from the env_name
  117. normalized_env_name = env_name[len(self.env_prefix) :]
  118. data[normalized_env_name] = env_value
  119. else:
  120. data[env_name] = env_value
  121. return data
  122. def __repr__(self) -> str:
  123. return (
  124. f'{self.__class__.__name__}(env_file={self.env_file!r}, env_file_encoding={self.env_file_encoding!r}, '
  125. f'env_nested_delimiter={self.env_nested_delimiter!r}, env_prefix_len={self.env_prefix_len!r})'
  126. )
  127. def read_env_file(
  128. file_path: Path,
  129. *,
  130. encoding: str | None = None,
  131. case_sensitive: bool = False,
  132. ignore_empty: bool = False,
  133. parse_none_str: str | None = None,
  134. ) -> Mapping[str, str | None]:
  135. warnings.warn(
  136. 'read_env_file will be removed in the next version, use DotEnvSettingsSource._static_read_env_file if you must',
  137. DeprecationWarning,
  138. )
  139. return DotEnvSettingsSource._static_read_env_file(
  140. file_path,
  141. encoding=encoding,
  142. case_sensitive=case_sensitive,
  143. ignore_empty=ignore_empty,
  144. parse_none_str=parse_none_str,
  145. )
  146. __all__ = ['DotEnvSettingsSource', 'read_env_file']