cli.py 60 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331
  1. """Command-line interface settings source."""
  2. from __future__ import annotations as _annotations
  3. import json
  4. import re
  5. import shlex
  6. import sys
  7. import typing
  8. from argparse import (
  9. SUPPRESS,
  10. ArgumentParser,
  11. BooleanOptionalAction,
  12. Namespace,
  13. RawDescriptionHelpFormatter,
  14. _SubParsersAction,
  15. )
  16. from collections import defaultdict
  17. from collections.abc import Mapping, Sequence
  18. from enum import Enum
  19. from functools import cached_property
  20. from textwrap import dedent
  21. from types import SimpleNamespace
  22. from typing import (
  23. TYPE_CHECKING,
  24. Annotated,
  25. Any,
  26. Callable,
  27. Generic,
  28. NoReturn,
  29. Optional,
  30. TypeVar,
  31. Union,
  32. cast,
  33. overload,
  34. )
  35. import typing_extensions
  36. from pydantic import AliasChoices, AliasPath, BaseModel, Field, PrivateAttr
  37. from pydantic._internal._repr import Representation
  38. from pydantic._internal._utils import is_model_class
  39. from pydantic.dataclasses import is_pydantic_dataclass
  40. from pydantic.fields import FieldInfo
  41. from pydantic_core import PydanticUndefined
  42. from typing_extensions import get_args, get_origin
  43. from typing_inspection import typing_objects
  44. from typing_inspection.introspection import is_union_origin
  45. from ...exceptions import SettingsError
  46. from ...utils import _lenient_issubclass, _WithArgsTypes
  47. from ..types import (
  48. ForceDecode,
  49. NoDecode,
  50. PydanticModel,
  51. _CliExplicitFlag,
  52. _CliImplicitFlag,
  53. _CliPositionalArg,
  54. _CliSubCommand,
  55. _CliUnknownArgs,
  56. )
  57. from ..utils import (
  58. _annotation_contains_types,
  59. _annotation_enum_val_to_name,
  60. _get_alias_names,
  61. _get_model_fields,
  62. _is_function,
  63. _strip_annotated,
  64. parse_env_vars,
  65. )
  66. from .env import EnvSettingsSource
  67. if TYPE_CHECKING:
  68. from pydantic_settings.main import BaseSettings
  69. class _CliInternalArgParser(ArgumentParser):
  70. def __init__(self, cli_exit_on_error: bool = True, **kwargs: Any) -> None:
  71. super().__init__(**kwargs)
  72. self._cli_exit_on_error = cli_exit_on_error
  73. def error(self, message: str) -> NoReturn:
  74. if not self._cli_exit_on_error:
  75. raise SettingsError(f'error parsing CLI: {message}')
  76. super().error(message)
  77. class CliMutuallyExclusiveGroup(BaseModel):
  78. pass
  79. class _CliArg(BaseModel):
  80. model: Any
  81. field_name: str
  82. arg_prefix: str
  83. case_sensitive: bool
  84. hide_none_type: bool
  85. kebab_case: bool
  86. enable_decoding: Optional[bool]
  87. env_prefix_len: int
  88. args: list[str] = []
  89. kwargs: dict[str, Any] = {}
  90. _alias_names: tuple[str, ...] = PrivateAttr(())
  91. _alias_paths: dict[str, Optional[int]] = PrivateAttr({})
  92. _is_alias_path_only: bool = PrivateAttr(False)
  93. _field_info: FieldInfo = PrivateAttr()
  94. def __init__(
  95. self,
  96. field_info: FieldInfo,
  97. parser_map: defaultdict[str | FieldInfo, dict[Optional[int] | str, _CliArg]],
  98. **values: Any,
  99. ) -> None:
  100. super().__init__(**values)
  101. self._field_info = field_info
  102. self._alias_names, self._is_alias_path_only = _get_alias_names(
  103. self.field_name, self.field_info, alias_path_args=self._alias_paths, case_sensitive=self.case_sensitive
  104. )
  105. alias_path_dests = {f'{self.arg_prefix}{name}': index for name, index in self._alias_paths.items()}
  106. if self.subcommand_dest:
  107. for sub_model in self.sub_models:
  108. subcommand_alias = self.subcommand_alias(sub_model)
  109. parser_map[self.subcommand_dest][subcommand_alias] = self.model_copy(update={'args': [], 'kwargs': {}})
  110. parser_map[self.field_info][subcommand_alias] = parser_map[self.subcommand_dest][subcommand_alias]
  111. elif self.dest not in alias_path_dests:
  112. parser_map[self.dest][None] = self
  113. parser_map[self.field_info][None] = parser_map[self.dest][None]
  114. for alias_path_dest, index in alias_path_dests.items():
  115. parser_map[alias_path_dest][index] = self.model_copy(update={'args': [], 'kwargs': {}})
  116. parser_map[self.field_info][index] = parser_map[alias_path_dest][index]
  117. @classmethod
  118. def get_kebab_case(cls, name: str, kebab_case: Optional[bool]) -> str:
  119. return name.replace('_', '-') if kebab_case else name
  120. def subcommand_alias(self, sub_model: type[BaseModel]) -> str:
  121. return self.get_kebab_case(
  122. sub_model.__name__ if len(self.sub_models) > 1 else self.preferred_alias, self.kebab_case
  123. )
  124. @cached_property
  125. def field_info(self) -> FieldInfo:
  126. return self._field_info
  127. @cached_property
  128. def subcommand_dest(self) -> Optional[str]:
  129. return f'{self.arg_prefix}:subcommand' if _CliSubCommand in self.field_info.metadata else None
  130. @cached_property
  131. def dest(self) -> str:
  132. if (
  133. not self.subcommand_dest
  134. and self.arg_prefix
  135. and self.field_info.validation_alias is not None
  136. and not self.is_parser_submodel
  137. ):
  138. # Strip prefix if validation alias is set and value is not complex.
  139. # Related https://github.com/pydantic/pydantic-settings/pull/25
  140. return f'{self.arg_prefix}{self.preferred_alias}'[self.env_prefix_len :]
  141. return f'{self.arg_prefix}{self.preferred_alias}'
  142. @cached_property
  143. def preferred_arg_name(self) -> str:
  144. return self.args[0].replace('_', '-') if self.kebab_case else self.args[0]
  145. @cached_property
  146. def sub_models(self) -> list[type[BaseModel]]:
  147. field_types: tuple[Any, ...] = (
  148. (self.field_info.annotation,)
  149. if not get_args(self.field_info.annotation)
  150. else get_args(self.field_info.annotation)
  151. )
  152. if self.hide_none_type:
  153. field_types = tuple([type_ for type_ in field_types if type_ is not type(None)])
  154. sub_models: list[type[BaseModel]] = []
  155. for type_ in field_types:
  156. if _annotation_contains_types(type_, (_CliSubCommand,), is_include_origin=False):
  157. raise SettingsError(
  158. f'CliSubCommand is not outermost annotation for {self.model.__name__}.{self.field_name}'
  159. )
  160. elif _annotation_contains_types(type_, (_CliPositionalArg,), is_include_origin=False):
  161. raise SettingsError(
  162. f'CliPositionalArg is not outermost annotation for {self.model.__name__}.{self.field_name}'
  163. )
  164. if is_model_class(_strip_annotated(type_)) or is_pydantic_dataclass(_strip_annotated(type_)):
  165. sub_models.append(_strip_annotated(type_))
  166. return sub_models
  167. @cached_property
  168. def alias_names(self) -> tuple[str, ...]:
  169. return self._alias_names
  170. @cached_property
  171. def alias_paths(self) -> dict[str, Optional[int]]:
  172. return self._alias_paths
  173. @cached_property
  174. def preferred_alias(self) -> str:
  175. return self._alias_names[0]
  176. @cached_property
  177. def is_alias_path_only(self) -> bool:
  178. return self._is_alias_path_only
  179. @cached_property
  180. def is_append_action(self) -> bool:
  181. return not self.subcommand_dest and _annotation_contains_types(
  182. self.field_info.annotation, (list, set, dict, Sequence, Mapping), is_strip_annotated=True
  183. )
  184. @cached_property
  185. def is_parser_submodel(self) -> bool:
  186. return not self.subcommand_dest and bool(self.sub_models) and not self.is_append_action
  187. @cached_property
  188. def is_no_decode(self) -> bool:
  189. return self.field_info is not None and (
  190. NoDecode in self.field_info.metadata
  191. or (self.enable_decoding is False and ForceDecode not in self.field_info.metadata)
  192. )
  193. T = TypeVar('T')
  194. CliSubCommand = Annotated[Union[T, None], _CliSubCommand]
  195. CliPositionalArg = Annotated[T, _CliPositionalArg]
  196. _CliBoolFlag = TypeVar('_CliBoolFlag', bound=bool)
  197. CliImplicitFlag = Annotated[_CliBoolFlag, _CliImplicitFlag]
  198. CliExplicitFlag = Annotated[_CliBoolFlag, _CliExplicitFlag]
  199. CLI_SUPPRESS = SUPPRESS
  200. CliSuppress = Annotated[T, CLI_SUPPRESS]
  201. CliUnknownArgs = Annotated[list[str], Field(default=[]), _CliUnknownArgs, NoDecode]
  202. class CliSettingsSource(EnvSettingsSource, Generic[T]):
  203. """
  204. Source class for loading settings values from CLI.
  205. Note:
  206. A `CliSettingsSource` connects with a `root_parser` object by using the parser methods to add
  207. `settings_cls` fields as command line arguments. The `CliSettingsSource` internal parser representation
  208. is based upon the `argparse` parsing library, and therefore, requires the parser methods to support
  209. the same attributes as their `argparse` library counterparts.
  210. Args:
  211. cli_prog_name: The CLI program name to display in help text. Defaults to `None` if cli_parse_args is `None`.
  212. Otherwise, defaults to sys.argv[0].
  213. cli_parse_args: The list of CLI arguments to parse. Defaults to None.
  214. If set to `True`, defaults to sys.argv[1:].
  215. cli_parse_none_str: The CLI string value that should be parsed (e.g. "null", "void", "None", etc.) into `None`
  216. type(None). Defaults to "null" if cli_avoid_json is `False`, and "None" if cli_avoid_json is `True`.
  217. cli_hide_none_type: Hide `None` values in CLI help text. Defaults to `False`.
  218. cli_avoid_json: Avoid complex JSON objects in CLI help text. Defaults to `False`.
  219. cli_enforce_required: Enforce required fields at the CLI. Defaults to `False`.
  220. cli_use_class_docs_for_groups: Use class docstrings in CLI group help text instead of field descriptions.
  221. Defaults to `False`.
  222. cli_exit_on_error: Determines whether or not the internal parser exits with error info when an error occurs.
  223. Defaults to `True`.
  224. cli_prefix: Prefix for command line arguments added under the root parser. Defaults to "".
  225. cli_flag_prefix_char: The flag prefix character to use for CLI optional arguments. Defaults to '-'.
  226. cli_implicit_flags: Whether `bool` fields should be implicitly converted into CLI boolean flags.
  227. (e.g. --flag, --no-flag). Defaults to `False`.
  228. cli_ignore_unknown_args: Whether to ignore unknown CLI args and parse only known ones. Defaults to `False`.
  229. cli_kebab_case: CLI args use kebab case. Defaults to `False`.
  230. cli_shortcuts: Mapping of target field name to alias names. Defaults to `None`.
  231. case_sensitive: Whether CLI "--arg" names should be read with case-sensitivity. Defaults to `True`.
  232. Note: Case-insensitive matching is only supported on the internal root parser and does not apply to CLI
  233. subcommands.
  234. root_parser: The root parser object.
  235. parse_args_method: The root parser parse args method. Defaults to `argparse.ArgumentParser.parse_args`.
  236. add_argument_method: The root parser add argument method. Defaults to `argparse.ArgumentParser.add_argument`.
  237. add_argument_group_method: The root parser add argument group method.
  238. Defaults to `argparse.ArgumentParser.add_argument_group`.
  239. add_parser_method: The root parser add new parser (sub-command) method.
  240. Defaults to `argparse._SubParsersAction.add_parser`.
  241. add_subparsers_method: The root parser add subparsers (sub-commands) method.
  242. Defaults to `argparse.ArgumentParser.add_subparsers`.
  243. formatter_class: A class for customizing the root parser help text. Defaults to `argparse.RawDescriptionHelpFormatter`.
  244. """
  245. def __init__(
  246. self,
  247. settings_cls: type[BaseSettings],
  248. cli_prog_name: str | None = None,
  249. cli_parse_args: bool | list[str] | tuple[str, ...] | None = None,
  250. cli_parse_none_str: str | None = None,
  251. cli_hide_none_type: bool | None = None,
  252. cli_avoid_json: bool | None = None,
  253. cli_enforce_required: bool | None = None,
  254. cli_use_class_docs_for_groups: bool | None = None,
  255. cli_exit_on_error: bool | None = None,
  256. cli_prefix: str | None = None,
  257. cli_flag_prefix_char: str | None = None,
  258. cli_implicit_flags: bool | None = None,
  259. cli_ignore_unknown_args: bool | None = None,
  260. cli_kebab_case: bool | None = None,
  261. cli_shortcuts: Mapping[str, str | list[str]] | None = None,
  262. case_sensitive: bool | None = True,
  263. root_parser: Any = None,
  264. parse_args_method: Callable[..., Any] | None = None,
  265. add_argument_method: Callable[..., Any] | None = ArgumentParser.add_argument,
  266. add_argument_group_method: Callable[..., Any] | None = ArgumentParser.add_argument_group,
  267. add_parser_method: Callable[..., Any] | None = _SubParsersAction.add_parser,
  268. add_subparsers_method: Callable[..., Any] | None = ArgumentParser.add_subparsers,
  269. formatter_class: Any = RawDescriptionHelpFormatter,
  270. ) -> None:
  271. self.cli_prog_name = (
  272. cli_prog_name if cli_prog_name is not None else settings_cls.model_config.get('cli_prog_name', sys.argv[0])
  273. )
  274. self.cli_hide_none_type = (
  275. cli_hide_none_type
  276. if cli_hide_none_type is not None
  277. else settings_cls.model_config.get('cli_hide_none_type', False)
  278. )
  279. self.cli_avoid_json = (
  280. cli_avoid_json if cli_avoid_json is not None else settings_cls.model_config.get('cli_avoid_json', False)
  281. )
  282. if not cli_parse_none_str:
  283. cli_parse_none_str = 'None' if self.cli_avoid_json is True else 'null'
  284. self.cli_parse_none_str = cli_parse_none_str
  285. self.cli_enforce_required = (
  286. cli_enforce_required
  287. if cli_enforce_required is not None
  288. else settings_cls.model_config.get('cli_enforce_required', False)
  289. )
  290. self.cli_use_class_docs_for_groups = (
  291. cli_use_class_docs_for_groups
  292. if cli_use_class_docs_for_groups is not None
  293. else settings_cls.model_config.get('cli_use_class_docs_for_groups', False)
  294. )
  295. self.cli_exit_on_error = (
  296. cli_exit_on_error
  297. if cli_exit_on_error is not None
  298. else settings_cls.model_config.get('cli_exit_on_error', True)
  299. )
  300. self.cli_prefix = cli_prefix if cli_prefix is not None else settings_cls.model_config.get('cli_prefix', '')
  301. self.cli_flag_prefix_char = (
  302. cli_flag_prefix_char
  303. if cli_flag_prefix_char is not None
  304. else settings_cls.model_config.get('cli_flag_prefix_char', '-')
  305. )
  306. self._cli_flag_prefix = self.cli_flag_prefix_char * 2
  307. if self.cli_prefix:
  308. if cli_prefix.startswith('.') or cli_prefix.endswith('.') or not cli_prefix.replace('.', '').isidentifier(): # type: ignore
  309. raise SettingsError(f'CLI settings source prefix is invalid: {cli_prefix}')
  310. self.cli_prefix += '.'
  311. self.cli_implicit_flags = (
  312. cli_implicit_flags
  313. if cli_implicit_flags is not None
  314. else settings_cls.model_config.get('cli_implicit_flags', False)
  315. )
  316. self.cli_ignore_unknown_args = (
  317. cli_ignore_unknown_args
  318. if cli_ignore_unknown_args is not None
  319. else settings_cls.model_config.get('cli_ignore_unknown_args', False)
  320. )
  321. self.cli_kebab_case = (
  322. cli_kebab_case if cli_kebab_case is not None else settings_cls.model_config.get('cli_kebab_case', False)
  323. )
  324. self.cli_shortcuts = (
  325. cli_shortcuts if cli_shortcuts is not None else settings_cls.model_config.get('cli_shortcuts', None)
  326. )
  327. case_sensitive = case_sensitive if case_sensitive is not None else True
  328. if not case_sensitive and root_parser is not None:
  329. raise SettingsError('Case-insensitive matching is only supported on the internal root parser')
  330. super().__init__(
  331. settings_cls,
  332. env_nested_delimiter='.',
  333. env_parse_none_str=self.cli_parse_none_str,
  334. env_parse_enums=True,
  335. env_prefix=self.cli_prefix,
  336. case_sensitive=case_sensitive,
  337. )
  338. root_parser = (
  339. _CliInternalArgParser(
  340. cli_exit_on_error=self.cli_exit_on_error,
  341. prog=self.cli_prog_name,
  342. description=None if settings_cls.__doc__ is None else dedent(settings_cls.__doc__),
  343. formatter_class=formatter_class,
  344. prefix_chars=self.cli_flag_prefix_char,
  345. allow_abbrev=False,
  346. )
  347. if root_parser is None
  348. else root_parser
  349. )
  350. self._connect_root_parser(
  351. root_parser=root_parser,
  352. parse_args_method=parse_args_method,
  353. add_argument_method=add_argument_method,
  354. add_argument_group_method=add_argument_group_method,
  355. add_parser_method=add_parser_method,
  356. add_subparsers_method=add_subparsers_method,
  357. formatter_class=formatter_class,
  358. )
  359. if cli_parse_args not in (None, False):
  360. if cli_parse_args is True:
  361. cli_parse_args = sys.argv[1:]
  362. elif not isinstance(cli_parse_args, (list, tuple)):
  363. raise SettingsError(
  364. f'cli_parse_args must be a list or tuple of strings, received {type(cli_parse_args)}'
  365. )
  366. self._load_env_vars(parsed_args=self._parse_args(self.root_parser, cli_parse_args))
  367. @overload
  368. def __call__(self) -> dict[str, Any]: ...
  369. @overload
  370. def __call__(self, *, args: list[str] | tuple[str, ...] | bool) -> CliSettingsSource[T]:
  371. """
  372. Parse and load the command line arguments list into the CLI settings source.
  373. Args:
  374. args:
  375. The command line arguments to parse and load. Defaults to `None`, which means do not parse
  376. command line arguments. If set to `True`, defaults to sys.argv[1:]. If set to `False`, does
  377. not parse command line arguments.
  378. Returns:
  379. CliSettingsSource: The object instance itself.
  380. """
  381. ...
  382. @overload
  383. def __call__(self, *, parsed_args: Namespace | SimpleNamespace | dict[str, Any]) -> CliSettingsSource[T]:
  384. """
  385. Loads parsed command line arguments into the CLI settings source.
  386. Note:
  387. The parsed args must be in `argparse.Namespace`, `SimpleNamespace`, or vars dictionary
  388. (e.g., vars(argparse.Namespace)) format.
  389. Args:
  390. parsed_args: The parsed args to load.
  391. Returns:
  392. CliSettingsSource: The object instance itself.
  393. """
  394. ...
  395. def __call__(
  396. self,
  397. *,
  398. args: list[str] | tuple[str, ...] | bool | None = None,
  399. parsed_args: Namespace | SimpleNamespace | dict[str, list[str] | str] | None = None,
  400. ) -> dict[str, Any] | CliSettingsSource[T]:
  401. if args is not None and parsed_args is not None:
  402. raise SettingsError('`args` and `parsed_args` are mutually exclusive')
  403. elif args is not None:
  404. if args is False:
  405. return self._load_env_vars(parsed_args={})
  406. if args is True:
  407. args = sys.argv[1:]
  408. return self._load_env_vars(parsed_args=self._parse_args(self.root_parser, args))
  409. elif parsed_args is not None:
  410. return self._load_env_vars(parsed_args=parsed_args)
  411. else:
  412. return super().__call__()
  413. @overload
  414. def _load_env_vars(self) -> Mapping[str, str | None]: ...
  415. @overload
  416. def _load_env_vars(self, *, parsed_args: Namespace | SimpleNamespace | dict[str, Any]) -> CliSettingsSource[T]:
  417. """
  418. Loads the parsed command line arguments into the CLI environment settings variables.
  419. Note:
  420. The parsed args must be in `argparse.Namespace`, `SimpleNamespace`, or vars dictionary
  421. (e.g., vars(argparse.Namespace)) format.
  422. Args:
  423. parsed_args: The parsed args to load.
  424. Returns:
  425. CliSettingsSource: The object instance itself.
  426. """
  427. ...
  428. def _load_env_vars(
  429. self, *, parsed_args: Namespace | SimpleNamespace | dict[str, list[str] | str] | None = None
  430. ) -> Mapping[str, str | None] | CliSettingsSource[T]:
  431. if parsed_args is None:
  432. return {}
  433. if isinstance(parsed_args, (Namespace, SimpleNamespace)):
  434. parsed_args = vars(parsed_args)
  435. selected_subcommands: list[str] = []
  436. for field_name, val in list(parsed_args.items()):
  437. if isinstance(val, list):
  438. if self._is_nested_alias_path_only_workaround(parsed_args, field_name, val):
  439. # Workaround for nested alias path environment variables not being handled.
  440. # See https://github.com/pydantic/pydantic-settings/issues/670
  441. continue
  442. cli_arg = self._parser_map.get(field_name, {}).get(None)
  443. if cli_arg and cli_arg.is_no_decode:
  444. parsed_args[field_name] = ','.join(val)
  445. continue
  446. parsed_args[field_name] = self._merge_parsed_list(val, field_name)
  447. elif field_name.endswith(':subcommand') and val is not None:
  448. selected_subcommands.append(self._parser_map[field_name][val].dest)
  449. for arg_dest, arg_map in self._parser_map.items():
  450. if isinstance(arg_dest, str) and arg_dest.endswith(':subcommand'):
  451. for subcommand_dest in [arg.dest for arg in arg_map.values()]:
  452. if subcommand_dest not in selected_subcommands:
  453. parsed_args[subcommand_dest] = self.cli_parse_none_str
  454. parsed_args = {
  455. key: val
  456. for key, val in parsed_args.items()
  457. if not key.endswith(':subcommand') and val is not PydanticUndefined
  458. }
  459. if selected_subcommands:
  460. last_selected_subcommand = max(selected_subcommands, key=len)
  461. if not any(field_name for field_name in parsed_args.keys() if f'{last_selected_subcommand}.' in field_name):
  462. parsed_args[last_selected_subcommand] = '{}'
  463. parsed_args.update(self._cli_unknown_args)
  464. self.env_vars = parse_env_vars(
  465. cast(Mapping[str, str], parsed_args),
  466. self.case_sensitive,
  467. self.env_ignore_empty,
  468. self.cli_parse_none_str,
  469. )
  470. return self
  471. def _is_nested_alias_path_only_workaround(
  472. self, parsed_args: dict[str, list[str] | str], field_name: str, val: list[str]
  473. ) -> bool:
  474. """
  475. Workaround for nested alias path environment variables not being handled.
  476. See https://github.com/pydantic/pydantic-settings/issues/670
  477. """
  478. known_arg = self._parser_map.get(field_name, {}).values()
  479. if not known_arg:
  480. return False
  481. arg = next(iter(known_arg))
  482. if arg.is_alias_path_only and arg.arg_prefix.endswith('.'):
  483. del parsed_args[field_name]
  484. nested_dest = arg.arg_prefix[:-1]
  485. nested_val = f'"{arg.preferred_alias}": {self._merge_parsed_list(val, field_name)}'
  486. parsed_args[nested_dest] = (
  487. f'{{{nested_val}}}'
  488. if nested_dest not in parsed_args
  489. else f'{parsed_args[nested_dest][:-1]}, {nested_val}}}'
  490. )
  491. return True
  492. return False
  493. def _get_merge_parsed_list_types(
  494. self, parsed_list: list[str], field_name: str
  495. ) -> tuple[Optional[type], Optional[type]]:
  496. merge_type = self._cli_dict_args.get(field_name, list)
  497. if (
  498. merge_type is list
  499. or not is_union_origin(get_origin(merge_type))
  500. or not any(
  501. type_
  502. for type_ in get_args(merge_type)
  503. if type_ is not type(None) and get_origin(type_) not in (dict, Mapping)
  504. )
  505. ):
  506. inferred_type = merge_type
  507. else:
  508. inferred_type = list if parsed_list and (len(parsed_list) > 1 or parsed_list[0].startswith('[')) else str
  509. return merge_type, inferred_type
  510. def _merged_list_to_str(self, merged_list: list[str], field_name: str) -> str:
  511. decode_list: list[str] = []
  512. is_use_decode: Optional[bool] = None
  513. cli_arg_map = self._parser_map.get(field_name, {})
  514. for index, item in enumerate(merged_list):
  515. cli_arg = cli_arg_map.get(index)
  516. is_decode = cli_arg is None or not cli_arg.is_no_decode
  517. if is_use_decode is None:
  518. is_use_decode = is_decode
  519. elif is_use_decode != is_decode:
  520. raise SettingsError('Mixing Decode and NoDecode across different AliasPath fields is not allowed')
  521. if is_use_decode:
  522. item = item.replace('\\', '\\\\')
  523. elif item.startswith('"') and item.endswith('"'):
  524. item = item[1:-1]
  525. decode_list.append(item)
  526. merged_list_str = ','.join(decode_list)
  527. return f'[{merged_list_str}]' if is_use_decode else merged_list_str
  528. def _merge_parsed_list(self, parsed_list: list[str], field_name: str) -> str:
  529. try:
  530. merged_list: list[str] = []
  531. is_last_consumed_a_value = False
  532. merge_type, inferred_type = self._get_merge_parsed_list_types(parsed_list, field_name)
  533. for val in parsed_list:
  534. if not isinstance(val, str):
  535. # If val is not a string, it's from an external parser and we can ignore parsing the rest of the
  536. # list.
  537. break
  538. val = val.strip()
  539. if val.startswith('[') and val.endswith(']'):
  540. val = val[1:-1].strip()
  541. while val:
  542. val = val.strip()
  543. if val.startswith(','):
  544. val = self._consume_comma(val, merged_list, is_last_consumed_a_value)
  545. is_last_consumed_a_value = False
  546. else:
  547. if val.startswith('{') or val.startswith('['):
  548. val = self._consume_object_or_array(val, merged_list)
  549. else:
  550. try:
  551. val = self._consume_string_or_number(val, merged_list, merge_type)
  552. except ValueError as e:
  553. if merge_type is inferred_type:
  554. raise e
  555. merge_type = inferred_type
  556. val = self._consume_string_or_number(val, merged_list, merge_type)
  557. is_last_consumed_a_value = True
  558. if not is_last_consumed_a_value:
  559. val = self._consume_comma(val, merged_list, is_last_consumed_a_value)
  560. if merge_type is str:
  561. return merged_list[0]
  562. elif merge_type is list:
  563. return self._merged_list_to_str(merged_list, field_name)
  564. else:
  565. merged_dict: dict[str, str] = {}
  566. for item in merged_list:
  567. merged_dict.update(json.loads(item))
  568. return json.dumps(merged_dict)
  569. except Exception as e:
  570. raise SettingsError(f'Parsing error encountered for {field_name}: {e}')
  571. def _consume_comma(self, item: str, merged_list: list[str], is_last_consumed_a_value: bool) -> str:
  572. if not is_last_consumed_a_value:
  573. merged_list.append('""')
  574. return item[1:]
  575. def _consume_object_or_array(self, item: str, merged_list: list[str]) -> str:
  576. count = 1
  577. close_delim = '}' if item.startswith('{') else ']'
  578. in_str = False
  579. for consumed in range(1, len(item)):
  580. if item[consumed] == '"' and item[consumed - 1] != '\\':
  581. in_str = not in_str
  582. elif in_str:
  583. continue
  584. elif item[consumed] in ('{', '['):
  585. count += 1
  586. elif item[consumed] in ('}', ']'):
  587. count -= 1
  588. if item[consumed] == close_delim and count == 0:
  589. merged_list.append(item[: consumed + 1])
  590. return item[consumed + 1 :]
  591. raise SettingsError(f'Missing end delimiter "{close_delim}"')
  592. def _consume_string_or_number(self, item: str, merged_list: list[str], merge_type: type[Any] | None) -> str:
  593. consumed = 0 if merge_type is not str else len(item)
  594. is_find_end_quote = False
  595. while consumed < len(item):
  596. if item[consumed] == '"' and (consumed == 0 or item[consumed - 1] != '\\'):
  597. is_find_end_quote = not is_find_end_quote
  598. if not is_find_end_quote and item[consumed] == ',':
  599. break
  600. consumed += 1
  601. if is_find_end_quote:
  602. raise SettingsError('Mismatched quotes')
  603. val_string = item[:consumed].strip()
  604. if merge_type in (list, str):
  605. try:
  606. float(val_string)
  607. except ValueError:
  608. if val_string == self.cli_parse_none_str:
  609. val_string = 'null'
  610. if val_string not in ('true', 'false', 'null') and not val_string.startswith('"'):
  611. val_string = f'"{val_string}"'
  612. merged_list.append(val_string)
  613. else:
  614. key, val = (kv for kv in val_string.split('=', 1))
  615. if key.startswith('"') and not key.endswith('"') and not val.startswith('"') and val.endswith('"'):
  616. raise ValueError(f'Dictionary key=val parameter is a quoted string: {val_string}')
  617. key, val = key.strip('"'), val.strip('"')
  618. merged_list.append(json.dumps({key: val}))
  619. return item[consumed:]
  620. def _verify_cli_flag_annotations(self, model: type[BaseModel], field_name: str, field_info: FieldInfo) -> None:
  621. if _CliImplicitFlag in field_info.metadata:
  622. cli_flag_name = 'CliImplicitFlag'
  623. elif _CliExplicitFlag in field_info.metadata:
  624. cli_flag_name = 'CliExplicitFlag'
  625. else:
  626. return
  627. if field_info.annotation is not bool:
  628. raise SettingsError(f'{cli_flag_name} argument {model.__name__}.{field_name} is not of type bool')
  629. def _sort_arg_fields(self, model: type[BaseModel]) -> list[tuple[str, FieldInfo]]:
  630. positional_variadic_arg = []
  631. positional_args, subcommand_args, optional_args = [], [], []
  632. for field_name, field_info in _get_model_fields(model).items():
  633. if _CliSubCommand in field_info.metadata:
  634. if not field_info.is_required():
  635. raise SettingsError(f'subcommand argument {model.__name__}.{field_name} has a default value')
  636. else:
  637. alias_names, *_ = _get_alias_names(field_name, field_info)
  638. if len(alias_names) > 1:
  639. raise SettingsError(f'subcommand argument {model.__name__}.{field_name} has multiple aliases')
  640. field_types = [type_ for type_ in get_args(field_info.annotation) if type_ is not type(None)]
  641. for field_type in field_types:
  642. if not (is_model_class(field_type) or is_pydantic_dataclass(field_type)):
  643. raise SettingsError(
  644. f'subcommand argument {model.__name__}.{field_name} has type not derived from BaseModel'
  645. )
  646. subcommand_args.append((field_name, field_info))
  647. elif _CliPositionalArg in field_info.metadata:
  648. alias_names, *_ = _get_alias_names(field_name, field_info)
  649. if len(alias_names) > 1:
  650. raise SettingsError(f'positional argument {model.__name__}.{field_name} has multiple aliases')
  651. is_append_action = _annotation_contains_types(
  652. field_info.annotation, (list, set, dict, Sequence, Mapping), is_strip_annotated=True
  653. )
  654. if not is_append_action:
  655. positional_args.append((field_name, field_info))
  656. else:
  657. positional_variadic_arg.append((field_name, field_info))
  658. else:
  659. self._verify_cli_flag_annotations(model, field_name, field_info)
  660. optional_args.append((field_name, field_info))
  661. if positional_variadic_arg:
  662. if len(positional_variadic_arg) > 1:
  663. field_names = ', '.join([name for name, info in positional_variadic_arg])
  664. raise SettingsError(f'{model.__name__} has multiple variadic positional arguments: {field_names}')
  665. elif subcommand_args:
  666. field_names = ', '.join([name for name, info in positional_variadic_arg + subcommand_args])
  667. raise SettingsError(
  668. f'{model.__name__} has variadic positional arguments and subcommand arguments: {field_names}'
  669. )
  670. return positional_args + positional_variadic_arg + subcommand_args + optional_args
  671. @property
  672. def root_parser(self) -> T:
  673. """The connected root parser instance."""
  674. return self._root_parser
  675. def _connect_parser_method(
  676. self, parser_method: Callable[..., Any] | None, method_name: str, *args: Any, **kwargs: Any
  677. ) -> Callable[..., Any]:
  678. if (
  679. parser_method is not None
  680. and self.case_sensitive is False
  681. and method_name == 'parse_args_method'
  682. and isinstance(self._root_parser, _CliInternalArgParser)
  683. ):
  684. def parse_args_insensitive_method(
  685. root_parser: _CliInternalArgParser,
  686. args: list[str] | tuple[str, ...] | None = None,
  687. namespace: Namespace | None = None,
  688. ) -> Any:
  689. insensitive_args = []
  690. for arg in shlex.split(shlex.join(args)) if args else []:
  691. flag_prefix = rf'\{self.cli_flag_prefix_char}{{1,2}}'
  692. matched = re.match(rf'^({flag_prefix}[^\s=]+)(.*)', arg)
  693. if matched:
  694. arg = matched.group(1).lower() + matched.group(2)
  695. insensitive_args.append(arg)
  696. return parser_method(root_parser, insensitive_args, namespace)
  697. return parse_args_insensitive_method
  698. elif parser_method is None:
  699. def none_parser_method(*args: Any, **kwargs: Any) -> Any:
  700. raise SettingsError(
  701. f'cannot connect CLI settings source root parser: {method_name} is set to `None` but is needed for connecting'
  702. )
  703. return none_parser_method
  704. else:
  705. return parser_method
  706. def _connect_group_method(self, add_argument_group_method: Callable[..., Any] | None) -> Callable[..., Any]:
  707. add_argument_group = self._connect_parser_method(add_argument_group_method, 'add_argument_group_method')
  708. def add_group_method(parser: Any, **kwargs: Any) -> Any:
  709. if not kwargs.pop('_is_cli_mutually_exclusive_group'):
  710. kwargs.pop('required')
  711. return add_argument_group(parser, **kwargs)
  712. else:
  713. main_group_kwargs = {arg: kwargs.pop(arg) for arg in ['title', 'description'] if arg in kwargs}
  714. main_group_kwargs['title'] += ' (mutually exclusive)'
  715. group = add_argument_group(parser, **main_group_kwargs)
  716. if not hasattr(group, 'add_mutually_exclusive_group'):
  717. raise SettingsError(
  718. 'cannot connect CLI settings source root parser: '
  719. 'group object is missing add_mutually_exclusive_group but is needed for connecting'
  720. )
  721. return group.add_mutually_exclusive_group(**kwargs)
  722. return add_group_method
  723. def _connect_root_parser(
  724. self,
  725. root_parser: T,
  726. parse_args_method: Callable[..., Any] | None,
  727. add_argument_method: Callable[..., Any] | None = ArgumentParser.add_argument,
  728. add_argument_group_method: Callable[..., Any] | None = ArgumentParser.add_argument_group,
  729. add_parser_method: Callable[..., Any] | None = _SubParsersAction.add_parser,
  730. add_subparsers_method: Callable[..., Any] | None = ArgumentParser.add_subparsers,
  731. formatter_class: Any = RawDescriptionHelpFormatter,
  732. ) -> None:
  733. self._cli_unknown_args: dict[str, list[str]] = {}
  734. def _parse_known_args(*args: Any, **kwargs: Any) -> Namespace:
  735. args, unknown_args = ArgumentParser.parse_known_args(*args, **kwargs)
  736. for dest in self._cli_unknown_args:
  737. self._cli_unknown_args[dest] = unknown_args
  738. return cast(Namespace, args)
  739. self._root_parser = root_parser
  740. if parse_args_method is None:
  741. parse_args_method = _parse_known_args if self.cli_ignore_unknown_args else ArgumentParser.parse_args
  742. self._parse_args = self._connect_parser_method(parse_args_method, 'parse_args_method')
  743. self._add_argument = self._connect_parser_method(add_argument_method, 'add_argument_method')
  744. self._add_group = self._connect_group_method(add_argument_group_method)
  745. self._add_parser = self._connect_parser_method(add_parser_method, 'add_parser_method')
  746. self._add_subparsers = self._connect_parser_method(add_subparsers_method, 'add_subparsers_method')
  747. self._formatter_class = formatter_class
  748. self._cli_dict_args: dict[str, type[Any] | None] = {}
  749. self._parser_map: defaultdict[str | FieldInfo, dict[Optional[int] | str, _CliArg]] = defaultdict(dict)
  750. self._add_parser_args(
  751. parser=self.root_parser,
  752. model=self.settings_cls,
  753. added_args=[],
  754. arg_prefix=self.env_prefix,
  755. subcommand_prefix=self.env_prefix,
  756. group=None,
  757. alias_prefixes=[],
  758. model_default=PydanticUndefined,
  759. )
  760. def _add_parser_args(
  761. self,
  762. parser: Any,
  763. model: type[BaseModel],
  764. added_args: list[str],
  765. arg_prefix: str,
  766. subcommand_prefix: str,
  767. group: Any,
  768. alias_prefixes: list[str],
  769. model_default: Any,
  770. is_model_suppressed: bool = False,
  771. ) -> ArgumentParser:
  772. subparsers: Any = None
  773. alias_path_args: dict[str, Optional[int]] = {}
  774. # Ignore model default if the default is a model and not a subclass of the current model.
  775. model_default = (
  776. None
  777. if (
  778. (is_model_class(type(model_default)) or is_pydantic_dataclass(type(model_default)))
  779. and not issubclass(type(model_default), model)
  780. )
  781. else model_default
  782. )
  783. for field_name, field_info in self._sort_arg_fields(model):
  784. arg = _CliArg(
  785. field_info=field_info,
  786. parser_map=self._parser_map,
  787. model=model,
  788. field_name=field_name,
  789. arg_prefix=arg_prefix,
  790. case_sensitive=self.case_sensitive,
  791. hide_none_type=self.cli_hide_none_type,
  792. kebab_case=self.cli_kebab_case,
  793. enable_decoding=self.config.get('enable_decoding'),
  794. env_prefix_len=self.env_prefix_len,
  795. )
  796. alias_path_args.update(arg.alias_paths)
  797. if arg.subcommand_dest:
  798. for sub_model in arg.sub_models:
  799. subcommand_alias = arg.subcommand_alias(sub_model)
  800. subcommand_arg = self._parser_map[arg.subcommand_dest][subcommand_alias]
  801. subcommand_arg.args = [subcommand_alias]
  802. subcommand_arg.kwargs['allow_abbrev'] = False
  803. subcommand_arg.kwargs['formatter_class'] = self._formatter_class
  804. subcommand_arg.kwargs['description'] = (
  805. None if sub_model.__doc__ is None else dedent(sub_model.__doc__)
  806. )
  807. subcommand_arg.kwargs['help'] = None if len(arg.sub_models) > 1 else field_info.description
  808. if self.cli_use_class_docs_for_groups:
  809. subcommand_arg.kwargs['help'] = None if sub_model.__doc__ is None else dedent(sub_model.__doc__)
  810. subparsers = (
  811. self._add_subparsers(
  812. parser,
  813. title='subcommands',
  814. dest=f'{arg_prefix}:subcommand',
  815. description=field_info.description if len(arg.sub_models) > 1 else None,
  816. )
  817. if subparsers is None
  818. else subparsers
  819. )
  820. if hasattr(subparsers, 'metavar'):
  821. subparsers.metavar = (
  822. f'{subparsers.metavar[:-1]},{subcommand_alias}}}'
  823. if subparsers.metavar
  824. else f'{{{subcommand_alias}}}'
  825. )
  826. self._add_parser_args(
  827. parser=self._add_parser(subparsers, *subcommand_arg.args, **subcommand_arg.kwargs),
  828. model=sub_model,
  829. added_args=[],
  830. arg_prefix=f'{arg.dest}.',
  831. subcommand_prefix=f'{subcommand_prefix}{arg.preferred_alias}.',
  832. group=None,
  833. alias_prefixes=[],
  834. model_default=PydanticUndefined,
  835. )
  836. else:
  837. flag_prefix: str = self._cli_flag_prefix
  838. arg.kwargs['dest'] = arg.dest
  839. arg.kwargs['default'] = CLI_SUPPRESS
  840. arg.kwargs['help'] = self._help_format(field_name, field_info, model_default, is_model_suppressed)
  841. arg.kwargs['metavar'] = self._metavar_format(field_info.annotation)
  842. arg.kwargs['required'] = (
  843. self.cli_enforce_required and field_info.is_required() and model_default is PydanticUndefined
  844. )
  845. arg_names = self._get_arg_names(
  846. arg_prefix, subcommand_prefix, alias_prefixes, arg.alias_names, added_args
  847. )
  848. if not arg_names or (arg.kwargs['dest'] in added_args):
  849. continue
  850. self._convert_append_action(arg.kwargs, field_info, arg.is_append_action)
  851. if _CliPositionalArg in field_info.metadata:
  852. arg_names, flag_prefix = self._convert_positional_arg(
  853. arg.kwargs, field_info, arg.preferred_alias, model_default
  854. )
  855. self._convert_bool_flag(arg.kwargs, field_info, model_default)
  856. if arg.is_parser_submodel and not getattr(field_info.annotation, '__pydantic_root_model__', False):
  857. self._add_parser_submodels(
  858. parser,
  859. model,
  860. arg.sub_models,
  861. added_args,
  862. arg_prefix,
  863. subcommand_prefix,
  864. flag_prefix,
  865. arg_names,
  866. arg.kwargs,
  867. field_name,
  868. field_info,
  869. arg.alias_names,
  870. model_default=model_default,
  871. is_model_suppressed=is_model_suppressed,
  872. )
  873. elif _CliUnknownArgs in field_info.metadata:
  874. self._cli_unknown_args[arg.kwargs['dest']] = []
  875. elif not arg.is_alias_path_only:
  876. if isinstance(group, dict):
  877. group = self._add_group(parser, **group)
  878. context = parser if group is None else group
  879. arg.args = [f'{flag_prefix[: len(name)]}{name}' for name in arg_names]
  880. self._add_argument(context, *arg.args, **arg.kwargs)
  881. added_args += list(arg_names)
  882. self._add_parser_alias_paths(parser, alias_path_args, added_args, arg_prefix, subcommand_prefix, group)
  883. return parser
  884. def _convert_append_action(self, kwargs: dict[str, Any], field_info: FieldInfo, is_append_action: bool) -> None:
  885. if is_append_action:
  886. kwargs['action'] = 'append'
  887. if _annotation_contains_types(field_info.annotation, (dict, Mapping), is_strip_annotated=True):
  888. self._cli_dict_args[kwargs['dest']] = field_info.annotation
  889. def _convert_bool_flag(self, kwargs: dict[str, Any], field_info: FieldInfo, model_default: Any) -> None:
  890. if kwargs['metavar'] == 'bool':
  891. if (self.cli_implicit_flags or _CliImplicitFlag in field_info.metadata) and (
  892. _CliExplicitFlag not in field_info.metadata
  893. ):
  894. del kwargs['metavar']
  895. kwargs['action'] = BooleanOptionalAction
  896. def _convert_positional_arg(
  897. self, kwargs: dict[str, Any], field_info: FieldInfo, preferred_alias: str, model_default: Any
  898. ) -> tuple[list[str], str]:
  899. flag_prefix = ''
  900. arg_names = [kwargs['dest']]
  901. kwargs['default'] = PydanticUndefined
  902. kwargs['metavar'] = _CliArg.get_kebab_case(preferred_alias.upper(), self.cli_kebab_case)
  903. # Note: CLI positional args are always strictly required at the CLI. Therefore, use field_info.is_required in
  904. # conjunction with model_default instead of the derived kwargs['required'].
  905. is_required = field_info.is_required() and model_default is PydanticUndefined
  906. if kwargs.get('action') == 'append':
  907. del kwargs['action']
  908. kwargs['nargs'] = '+' if is_required else '*'
  909. elif not is_required:
  910. kwargs['nargs'] = '?'
  911. del kwargs['dest']
  912. del kwargs['required']
  913. return arg_names, flag_prefix
  914. def _get_arg_names(
  915. self,
  916. arg_prefix: str,
  917. subcommand_prefix: str,
  918. alias_prefixes: list[str],
  919. alias_names: tuple[str, ...],
  920. added_args: list[str],
  921. ) -> list[str]:
  922. arg_names: list[str] = []
  923. for prefix in [arg_prefix] + alias_prefixes:
  924. for name in alias_names:
  925. arg_name = _CliArg.get_kebab_case(
  926. f'{prefix}{name}'
  927. if subcommand_prefix == self.env_prefix
  928. else f'{prefix.replace(subcommand_prefix, "", 1)}{name}',
  929. self.cli_kebab_case,
  930. )
  931. if arg_name not in added_args:
  932. arg_names.append(arg_name)
  933. if self.cli_shortcuts:
  934. for target, aliases in self.cli_shortcuts.items():
  935. if target in arg_names:
  936. alias_list = [aliases] if isinstance(aliases, str) else aliases
  937. arg_names.extend(alias for alias in alias_list if alias not in added_args)
  938. return arg_names
  939. def _add_parser_submodels(
  940. self,
  941. parser: Any,
  942. model: type[BaseModel],
  943. sub_models: list[type[BaseModel]],
  944. added_args: list[str],
  945. arg_prefix: str,
  946. subcommand_prefix: str,
  947. flag_prefix: str,
  948. arg_names: list[str],
  949. kwargs: dict[str, Any],
  950. field_name: str,
  951. field_info: FieldInfo,
  952. alias_names: tuple[str, ...],
  953. model_default: Any,
  954. is_model_suppressed: bool,
  955. ) -> None:
  956. if issubclass(model, CliMutuallyExclusiveGroup):
  957. # Argparse has deprecated "calling add_argument_group() or add_mutually_exclusive_group() on a
  958. # mutually exclusive group" (https://docs.python.org/3/library/argparse.html#mutual-exclusion).
  959. # Since nested models result in a group add, raise an exception for nested models in a mutually
  960. # exclusive group.
  961. raise SettingsError('cannot have nested models in a CliMutuallyExclusiveGroup')
  962. model_group: Any = None
  963. model_group_kwargs: dict[str, Any] = {}
  964. model_group_kwargs['title'] = f'{arg_names[0]} options'
  965. model_group_kwargs['description'] = field_info.description
  966. model_group_kwargs['required'] = kwargs['required']
  967. model_group_kwargs['_is_cli_mutually_exclusive_group'] = any(
  968. issubclass(model, CliMutuallyExclusiveGroup) for model in sub_models
  969. )
  970. if model_group_kwargs['_is_cli_mutually_exclusive_group'] and len(sub_models) > 1:
  971. raise SettingsError('cannot use union with CliMutuallyExclusiveGroup')
  972. if self.cli_use_class_docs_for_groups and len(sub_models) == 1:
  973. model_group_kwargs['description'] = None if sub_models[0].__doc__ is None else dedent(sub_models[0].__doc__)
  974. if model_default is not PydanticUndefined:
  975. if is_model_class(type(model_default)) or is_pydantic_dataclass(type(model_default)):
  976. model_default = getattr(model_default, field_name)
  977. else:
  978. if field_info.default is not PydanticUndefined:
  979. model_default = field_info.default
  980. elif field_info.default_factory is not None:
  981. model_default = field_info.default_factory
  982. if model_default is None:
  983. desc_header = f'default: {self.cli_parse_none_str} (undefined)'
  984. if model_group_kwargs['description'] is not None:
  985. model_group_kwargs['description'] = dedent(f'{desc_header}\n{model_group_kwargs["description"]}')
  986. else:
  987. model_group_kwargs['description'] = desc_header
  988. preferred_alias = alias_names[0]
  989. is_model_suppressed = self._is_field_suppressed(field_info) or is_model_suppressed
  990. if is_model_suppressed:
  991. model_group_kwargs['description'] = CLI_SUPPRESS
  992. if not self.cli_avoid_json:
  993. added_args.append(arg_names[0])
  994. kwargs['required'] = False
  995. kwargs['nargs'] = '?'
  996. kwargs['const'] = '{}'
  997. kwargs['help'] = (
  998. CLI_SUPPRESS if is_model_suppressed else f'set {arg_names[0]} from JSON string (default: {{}})'
  999. )
  1000. model_group = self._add_group(parser, **model_group_kwargs)
  1001. self._add_argument(model_group, *(f'{flag_prefix}{name}' for name in arg_names), **kwargs)
  1002. for model in sub_models:
  1003. self._add_parser_args(
  1004. parser=parser,
  1005. model=model,
  1006. added_args=added_args,
  1007. arg_prefix=f'{arg_prefix}{preferred_alias}.',
  1008. subcommand_prefix=subcommand_prefix,
  1009. group=model_group if model_group else model_group_kwargs,
  1010. alias_prefixes=[f'{arg_prefix}{name}.' for name in alias_names[1:]],
  1011. model_default=model_default,
  1012. is_model_suppressed=is_model_suppressed,
  1013. )
  1014. def _add_parser_alias_paths(
  1015. self,
  1016. parser: Any,
  1017. alias_path_args: dict[str, Optional[int]],
  1018. added_args: list[str],
  1019. arg_prefix: str,
  1020. subcommand_prefix: str,
  1021. group: Any,
  1022. ) -> None:
  1023. if alias_path_args:
  1024. context = parser
  1025. if group is not None:
  1026. context = self._add_group(parser, **group) if isinstance(group, dict) else group
  1027. for name, index in alias_path_args.items():
  1028. arg_name = (
  1029. f'{arg_prefix}{name}'
  1030. if subcommand_prefix == self.env_prefix
  1031. else f'{arg_prefix.replace(subcommand_prefix, "", 1)}{name}'
  1032. )
  1033. kwargs: dict[str, Any] = {}
  1034. kwargs['default'] = CLI_SUPPRESS
  1035. kwargs['help'] = 'pydantic alias path'
  1036. kwargs['action'] = 'append'
  1037. kwargs['metavar'] = 'list'
  1038. if index is None:
  1039. kwargs['metavar'] = 'dict'
  1040. self._cli_dict_args[arg_name] = dict
  1041. args = [f'{self._cli_flag_prefix}{arg_name}']
  1042. for key, arg in self._parser_map[arg_name].items():
  1043. arg.args, arg.kwargs = args, kwargs
  1044. self._add_argument(context, *args, **kwargs)
  1045. added_args.append(arg_name)
  1046. def _get_modified_args(self, obj: Any) -> tuple[str, ...]:
  1047. if not self.cli_hide_none_type:
  1048. return get_args(obj)
  1049. else:
  1050. return tuple([type_ for type_ in get_args(obj) if type_ is not type(None)])
  1051. def _metavar_format_choices(self, args: list[str], obj_qualname: str | None = None) -> str:
  1052. if 'JSON' in args:
  1053. args = args[: args.index('JSON') + 1] + [arg for arg in args[args.index('JSON') + 1 :] if arg != 'JSON']
  1054. metavar = ','.join(args)
  1055. if obj_qualname:
  1056. return f'{obj_qualname}[{metavar}]'
  1057. else:
  1058. return metavar if len(args) == 1 else f'{{{metavar}}}'
  1059. def _metavar_format_recurse(self, obj: Any) -> str:
  1060. """Pretty metavar representation of a type. Adapts logic from `pydantic._repr.display_as_type`."""
  1061. obj = _strip_annotated(obj)
  1062. if _is_function(obj):
  1063. # If function is locally defined use __name__ instead of __qualname__
  1064. return obj.__name__ if '<locals>' in obj.__qualname__ else obj.__qualname__
  1065. elif obj is ...:
  1066. return '...'
  1067. elif isinstance(obj, Representation):
  1068. return repr(obj)
  1069. elif typing_objects.is_typealiastype(obj):
  1070. return str(obj)
  1071. origin = get_origin(obj)
  1072. if origin is None and not isinstance(obj, (type, typing.ForwardRef, typing_extensions.ForwardRef)):
  1073. obj = obj.__class__
  1074. if is_union_origin(origin):
  1075. return self._metavar_format_choices(list(map(self._metavar_format_recurse, self._get_modified_args(obj))))
  1076. elif typing_objects.is_literal(origin):
  1077. return self._metavar_format_choices(list(map(str, self._get_modified_args(obj))))
  1078. elif _lenient_issubclass(obj, Enum):
  1079. return self._metavar_format_choices([val.name for val in obj])
  1080. elif isinstance(obj, _WithArgsTypes):
  1081. return self._metavar_format_choices(
  1082. list(map(self._metavar_format_recurse, self._get_modified_args(obj))),
  1083. obj_qualname=obj.__qualname__ if hasattr(obj, '__qualname__') else str(obj),
  1084. )
  1085. elif obj is type(None):
  1086. return self.cli_parse_none_str
  1087. elif is_model_class(obj) or is_pydantic_dataclass(obj):
  1088. return (
  1089. self._metavar_format_recurse(_get_model_fields(obj)['root'].annotation)
  1090. if getattr(obj, '__pydantic_root_model__', False)
  1091. else 'JSON'
  1092. )
  1093. elif isinstance(obj, type):
  1094. return obj.__qualname__
  1095. else:
  1096. return repr(obj).replace('typing.', '').replace('typing_extensions.', '')
  1097. def _metavar_format(self, obj: Any) -> str:
  1098. return self._metavar_format_recurse(obj).replace(', ', ',')
  1099. def _help_format(
  1100. self, field_name: str, field_info: FieldInfo, model_default: Any, is_model_suppressed: bool
  1101. ) -> str:
  1102. _help = field_info.description if field_info.description else ''
  1103. if is_model_suppressed or self._is_field_suppressed(field_info):
  1104. return CLI_SUPPRESS
  1105. if field_info.is_required() and model_default in (PydanticUndefined, None):
  1106. if _CliPositionalArg not in field_info.metadata:
  1107. ifdef = 'ifdef: ' if model_default is None else ''
  1108. _help += f' ({ifdef}required)' if _help else f'({ifdef}required)'
  1109. else:
  1110. default = f'(default: {self.cli_parse_none_str})'
  1111. if is_model_class(type(model_default)) or is_pydantic_dataclass(type(model_default)):
  1112. default = f'(default: {getattr(model_default, field_name)})'
  1113. elif model_default not in (PydanticUndefined, None) and _is_function(model_default):
  1114. default = f'(default factory: {self._metavar_format(model_default)})'
  1115. elif field_info.default not in (PydanticUndefined, None):
  1116. enum_name = _annotation_enum_val_to_name(field_info.annotation, field_info.default)
  1117. default = f'(default: {field_info.default if enum_name is None else enum_name})'
  1118. elif field_info.default_factory is not None:
  1119. default = f'(default factory: {self._metavar_format(field_info.default_factory)})'
  1120. _help += f' {default}' if _help else default
  1121. return _help.replace('%', '%%') if issubclass(type(self._root_parser), ArgumentParser) else _help
  1122. def _is_field_suppressed(self, field_info: FieldInfo) -> bool:
  1123. _help = field_info.description if field_info.description else ''
  1124. return _help == CLI_SUPPRESS or CLI_SUPPRESS in field_info.metadata
  1125. def _update_alias_path_only_default(
  1126. self, arg_name: str, value: Any, field_info: FieldInfo, alias_path_only_defaults: dict[str, Any]
  1127. ) -> list[Any] | dict[str, Any]:
  1128. alias_path: AliasPath = [
  1129. alias if isinstance(alias, AliasPath) else cast(AliasPath, alias.choices[0])
  1130. for alias in (field_info.alias, field_info.validation_alias)
  1131. if isinstance(alias, (AliasPath, AliasChoices))
  1132. ][0]
  1133. alias_nested_paths: list[str] = alias_path.path[1:-1] # type: ignore
  1134. if not alias_nested_paths:
  1135. alias_path_only_defaults.setdefault(arg_name, [])
  1136. alias_default = alias_path_only_defaults[arg_name]
  1137. else:
  1138. alias_path_only_defaults.setdefault(arg_name, {})
  1139. current_path = alias_path_only_defaults[arg_name]
  1140. for nested_path in alias_nested_paths[:-1]:
  1141. current_path.setdefault(nested_path, {})
  1142. current_path = current_path[nested_path]
  1143. current_path.setdefault(alias_nested_paths[-1], [])
  1144. alias_default = current_path[alias_nested_paths[-1]]
  1145. alias_path_index = cast(int, alias_path.path[-1])
  1146. alias_default.extend([''] * max(alias_path_index + 1 - len(alias_default), 0))
  1147. alias_default[alias_path_index] = value
  1148. return alias_path_only_defaults[arg_name]
  1149. def _serialized_args(self, model: PydanticModel, _is_submodel: bool = False) -> list[str]:
  1150. alias_path_only_defaults: dict[str, Any] = {}
  1151. optional_args: list[str | list[Any] | dict[str, Any]] = []
  1152. positional_args: list[str | list[Any] | dict[str, Any]] = []
  1153. subcommand_args: list[str] = []
  1154. for field_name, field_info in _get_model_fields(type(model) if _is_submodel else self.settings_cls).items():
  1155. model_default = getattr(model, field_name)
  1156. if field_info.default == model_default:
  1157. continue
  1158. if _CliSubCommand in field_info.metadata and model_default is None:
  1159. continue
  1160. arg = next(iter(self._parser_map[field_info].values()))
  1161. if arg.subcommand_dest:
  1162. subcommand_args.append(arg.subcommand_alias(type(model_default)))
  1163. subcommand_args += self._serialized_args(model_default, _is_submodel=True)
  1164. continue
  1165. if is_model_class(type(model_default)) or is_pydantic_dataclass(type(model_default)):
  1166. positional_args += self._serialized_args(model_default, _is_submodel=True)
  1167. continue
  1168. matched = re.match(r'(-*)(.+)', arg.preferred_arg_name)
  1169. flag_chars, arg_name = matched.groups() if matched else ('', '')
  1170. value: str | list[Any] | dict[str, Any] = (
  1171. json.dumps(model_default) if isinstance(model_default, (dict, list, set)) else str(model_default)
  1172. )
  1173. if arg.is_alias_path_only:
  1174. # For alias path only, we wont know the complete value until we've finished parsing the entire class. In
  1175. # this case, insert value as a non-string reference pointing to the relevant alias_path_only_defaults
  1176. # entry and convert into completed string value later.
  1177. value = self._update_alias_path_only_default(arg_name, value, field_info, alias_path_only_defaults)
  1178. if _CliPositionalArg in field_info.metadata:
  1179. for value in model_default if isinstance(model_default, list) else [model_default]:
  1180. value = json.dumps(value) if isinstance(value, (dict, list, set)) else str(value)
  1181. positional_args.append(value)
  1182. continue
  1183. # Note: prepend 'no-' for boolean optional action flag if model_default value is False and flag is not a short option
  1184. if arg.kwargs.get('action') == BooleanOptionalAction and model_default is False and flag_chars == '--':
  1185. flag_chars += 'no-'
  1186. optional_args.append(f'{flag_chars}{arg_name}')
  1187. # If implicit bool flag, do not add a value
  1188. if arg.kwargs.get('action') != BooleanOptionalAction:
  1189. optional_args.append(value)
  1190. serialized_args: list[str] = []
  1191. serialized_args += [json.dumps(value) if not isinstance(value, str) else value for value in optional_args]
  1192. serialized_args += [json.dumps(value) if not isinstance(value, str) else value for value in positional_args]
  1193. return serialized_args + subcommand_args