mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-06-27 18:50:41 +00:00
# What does this PR do? - **chore: mypy for strong_typing** - **chore: mypy for remote::vllm** - **chore: mypy for remote::ollama** - **chore: mypy for providers.datatype** --------- Signed-off-by: Ihar Hrachyshka <ihar.hrachyshka@gmail.com>
1035 lines
32 KiB
Python
1035 lines
32 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the terms described in the LICENSE file in
|
|
# the root directory of this source tree.
|
|
|
|
"""
|
|
Type-safe data interchange for Python data classes.
|
|
|
|
:see: https://github.com/hunyadi/strong_typing
|
|
"""
|
|
|
|
import dataclasses
|
|
import datetime
|
|
import enum
|
|
import importlib
|
|
import importlib.machinery
|
|
import importlib.util
|
|
import inspect
|
|
import re
|
|
import sys
|
|
import types
|
|
import typing
|
|
import uuid
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
Iterable,
|
|
List,
|
|
Literal,
|
|
NamedTuple,
|
|
Optional,
|
|
Protocol,
|
|
Set,
|
|
Tuple,
|
|
Type,
|
|
TypeVar,
|
|
Union,
|
|
runtime_checkable,
|
|
)
|
|
|
|
if sys.version_info >= (3, 9):
|
|
from typing import Annotated
|
|
else:
|
|
from typing_extensions import Annotated
|
|
|
|
if sys.version_info >= (3, 10):
|
|
from typing import TypeGuard
|
|
else:
|
|
from typing_extensions import TypeGuard
|
|
|
|
S = TypeVar("S")
|
|
T = TypeVar("T")
|
|
K = TypeVar("K")
|
|
V = TypeVar("V")
|
|
|
|
|
|
def _is_type_like(data_type: object) -> bool:
|
|
"""
|
|
Checks if the object is a type or type-like object (e.g. generic type).
|
|
|
|
:param data_type: The object to validate.
|
|
:returns: True if the object is a type or type-like object.
|
|
"""
|
|
|
|
if isinstance(data_type, type):
|
|
# a standard type
|
|
return True
|
|
elif typing.get_origin(data_type) is not None:
|
|
# a generic type such as `list`, `dict` or `set`
|
|
return True
|
|
elif hasattr(data_type, "__forward_arg__"):
|
|
# an instance of `ForwardRef`
|
|
return True
|
|
elif data_type is Any:
|
|
# the special form `Any`
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
if sys.version_info >= (3, 9):
|
|
TypeLike = Union[type, types.GenericAlias, typing.ForwardRef, Any]
|
|
|
|
def is_type_like(
|
|
data_type: object,
|
|
) -> TypeGuard[TypeLike]:
|
|
"""
|
|
Checks if the object is a type or type-like object (e.g. generic type).
|
|
|
|
:param data_type: The object to validate.
|
|
:returns: True if the object is a type or type-like object.
|
|
"""
|
|
|
|
return _is_type_like(data_type)
|
|
|
|
else:
|
|
TypeLike = object
|
|
|
|
def is_type_like(
|
|
data_type: object,
|
|
) -> bool:
|
|
return _is_type_like(data_type)
|
|
|
|
|
|
def evaluate_member_type(typ: Any, cls: type) -> Any:
|
|
"""
|
|
Evaluates a forward reference type in a dataclass member.
|
|
|
|
:param typ: The dataclass member type to convert.
|
|
:param cls: The dataclass in which the member is defined.
|
|
:returns: The evaluated type.
|
|
"""
|
|
|
|
return evaluate_type(typ, sys.modules[cls.__module__])
|
|
|
|
|
|
def evaluate_type(typ: Any, module: types.ModuleType) -> Any:
|
|
"""
|
|
Evaluates a forward reference type.
|
|
|
|
:param typ: The type to convert, typically a dataclass member type.
|
|
:param module: The context for the type, i.e. the module in which the member is defined.
|
|
:returns: The evaluated type.
|
|
"""
|
|
|
|
if isinstance(typ, str):
|
|
# evaluate data-class field whose type annotation is a string
|
|
return eval(typ, module.__dict__, locals())
|
|
if isinstance(typ, typing.ForwardRef):
|
|
if sys.version_info >= (3, 9):
|
|
return typ._evaluate(module.__dict__, locals(), recursive_guard=frozenset())
|
|
else:
|
|
return typ._evaluate(module.__dict__, locals())
|
|
else:
|
|
return typ
|
|
|
|
|
|
@runtime_checkable
|
|
class DataclassInstance(Protocol):
|
|
__dataclass_fields__: typing.ClassVar[Dict[str, dataclasses.Field]]
|
|
|
|
|
|
def is_dataclass_type(typ: Any) -> TypeGuard[Type[DataclassInstance]]:
|
|
"True if the argument corresponds to a data class type (but not an instance)."
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
return isinstance(typ, type) and dataclasses.is_dataclass(typ)
|
|
|
|
|
|
def is_dataclass_instance(obj: Any) -> TypeGuard[DataclassInstance]:
|
|
"True if the argument corresponds to a data class instance (but not a type)."
|
|
|
|
return not isinstance(obj, type) and dataclasses.is_dataclass(obj)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class DataclassField:
|
|
name: str
|
|
type: Any
|
|
default: Any
|
|
|
|
def __init__(self, name: str, type: Any, default: Any = dataclasses.MISSING) -> None:
|
|
self.name = name
|
|
self.type = type
|
|
self.default = default
|
|
|
|
|
|
def dataclass_fields(cls: Type[DataclassInstance]) -> Iterable[DataclassField]:
|
|
"Generates the fields of a data-class resolving forward references."
|
|
|
|
for field in dataclasses.fields(cls):
|
|
yield DataclassField(field.name, evaluate_member_type(field.type, cls), field.default)
|
|
|
|
|
|
def dataclass_field_by_name(cls: Type[DataclassInstance], name: str) -> DataclassField:
|
|
"Looks up a field in a data-class by its field name."
|
|
|
|
for field in dataclasses.fields(cls):
|
|
if field.name == name:
|
|
return DataclassField(field.name, evaluate_member_type(field.type, cls))
|
|
|
|
raise LookupError(f"field `{name}` missing from class `{cls.__name__}`")
|
|
|
|
|
|
def is_named_tuple_instance(obj: Any) -> TypeGuard[NamedTuple]:
|
|
"True if the argument corresponds to a named tuple instance."
|
|
|
|
return is_named_tuple_type(type(obj))
|
|
|
|
|
|
def is_named_tuple_type(typ: Any) -> TypeGuard[Type[NamedTuple]]:
|
|
"""
|
|
True if the argument corresponds to a named tuple type.
|
|
|
|
Calling the function `collections.namedtuple` gives a new type that is a subclass of `tuple` (and no other classes)
|
|
with a member named `_fields` that is a tuple whose items are all strings.
|
|
"""
|
|
|
|
if not isinstance(typ, type):
|
|
return False
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
|
|
b = getattr(typ, "__bases__", None)
|
|
if b is None:
|
|
return False
|
|
|
|
if len(b) != 1 or b[0] != tuple:
|
|
return False
|
|
|
|
f = getattr(typ, "_fields", None)
|
|
if not isinstance(f, tuple):
|
|
return False
|
|
|
|
return all(isinstance(n, str) for n in f)
|
|
|
|
|
|
if sys.version_info >= (3, 11):
|
|
|
|
def is_type_enum(typ: object) -> TypeGuard[Type[enum.Enum]]:
|
|
"True if the specified type is an enumeration type."
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
return isinstance(typ, enum.EnumType)
|
|
|
|
else:
|
|
|
|
def is_type_enum(typ: object) -> TypeGuard[Type[enum.Enum]]:
|
|
"True if the specified type is an enumeration type."
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
|
|
# use an explicit isinstance(..., type) check to filter out special forms like generics
|
|
return isinstance(typ, type) and issubclass(typ, enum.Enum)
|
|
|
|
|
|
def enum_value_types(enum_type: Type[enum.Enum]) -> List[type]:
|
|
"""
|
|
Returns all unique value types of the `enum.Enum` type in definition order.
|
|
"""
|
|
|
|
# filter unique enumeration value types by keeping definition order
|
|
return list(dict.fromkeys(type(e.value) for e in enum_type))
|
|
|
|
|
|
def extend_enum(
|
|
source: Type[enum.Enum],
|
|
) -> Callable[[Type[enum.Enum]], Type[enum.Enum]]:
|
|
"""
|
|
Creates a new enumeration type extending the set of values in an existing type.
|
|
|
|
:param source: The existing enumeration type to be extended with new values.
|
|
:returns: A new enumeration type with the extended set of values.
|
|
"""
|
|
|
|
def wrap(extend: Type[enum.Enum]) -> Type[enum.Enum]:
|
|
# create new enumeration type combining the values from both types
|
|
values: Dict[str, Any] = {}
|
|
values.update((e.name, e.value) for e in source)
|
|
values.update((e.name, e.value) for e in extend)
|
|
# mypy fails to determine that __name__ is always a string; hence the `ignore` directive.
|
|
enum_class: Type[enum.Enum] = enum.Enum(extend.__name__, values) # type: ignore[misc]
|
|
|
|
# assign the newly created type to the same module where the extending class is defined
|
|
enum_class.__module__ = extend.__module__
|
|
enum_class.__doc__ = extend.__doc__
|
|
setattr(sys.modules[extend.__module__], extend.__name__, enum_class)
|
|
|
|
return enum.unique(enum_class)
|
|
|
|
return wrap
|
|
|
|
|
|
if sys.version_info >= (3, 10):
|
|
|
|
def _is_union_like(typ: object) -> bool:
|
|
"True if type is a union such as `Union[T1, T2, ...]` or a union type `T1 | T2`."
|
|
|
|
return typing.get_origin(typ) is Union or isinstance(typ, types.UnionType)
|
|
|
|
else:
|
|
|
|
def _is_union_like(typ: object) -> bool:
|
|
"True if type is a union such as `Union[T1, T2, ...]` or a union type `T1 | T2`."
|
|
|
|
return typing.get_origin(typ) is Union
|
|
|
|
|
|
def is_type_optional(typ: object, strict: bool = False) -> TypeGuard[Type[Optional[Any]]]:
|
|
"""
|
|
True if the type annotation corresponds to an optional type (e.g. `Optional[T]` or `Union[T1,T2,None]`).
|
|
|
|
`Optional[T]` is represented as `Union[T, None]` is classic style, and is equivalent to `T | None` in new style.
|
|
|
|
:param strict: True if only `Optional[T]` qualifies as an optional type but `Union[T1, T2, None]` does not.
|
|
"""
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
|
|
if _is_union_like(typ):
|
|
args = typing.get_args(typ)
|
|
if strict and len(args) != 2:
|
|
return False
|
|
|
|
return type(None) in args
|
|
|
|
return False
|
|
|
|
|
|
def unwrap_optional_type(typ: Type[Optional[T]]) -> Type[T]:
|
|
"""
|
|
Extracts the inner type of an optional type.
|
|
|
|
:param typ: The optional type `Optional[T]`.
|
|
:returns: The inner type `T`.
|
|
"""
|
|
|
|
return rewrap_annotated_type(_unwrap_optional_type, typ)
|
|
|
|
|
|
def _unwrap_optional_type(typ: Type[Optional[T]]) -> Type[T]:
|
|
"Extracts the type qualified as optional (e.g. returns `T` for `Optional[T]`)."
|
|
|
|
# Optional[T] is represented internally as Union[T, None]
|
|
if not _is_union_like(typ):
|
|
raise TypeError("optional type must have un-subscripted type of Union")
|
|
|
|
# will automatically unwrap Union[T] into T
|
|
return Union[tuple(filter(lambda item: item is not type(None), typing.get_args(typ)))] # type: ignore[return-value]
|
|
|
|
|
|
def is_type_union(typ: object) -> bool:
|
|
"True if the type annotation corresponds to a union type (e.g. `Union[T1,T2,T3]`)."
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
if _is_union_like(typ):
|
|
args = typing.get_args(typ)
|
|
return len(args) > 2 or type(None) not in args
|
|
|
|
return False
|
|
|
|
|
|
def unwrap_union_types(typ: object) -> Tuple[object, ...]:
|
|
"""
|
|
Extracts the inner types of a union type.
|
|
|
|
:param typ: The union type `Union[T1, T2, ...]`.
|
|
:returns: The inner types `T1`, `T2`, etc.
|
|
"""
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
return _unwrap_union_types(typ)
|
|
|
|
|
|
def _unwrap_union_types(typ: object) -> Tuple[object, ...]:
|
|
"Extracts the types in a union (e.g. returns a tuple of types `T1` and `T2` for `Union[T1, T2]`)."
|
|
|
|
if not _is_union_like(typ):
|
|
raise TypeError("union type must have un-subscripted type of Union")
|
|
|
|
return typing.get_args(typ)
|
|
|
|
|
|
def is_type_literal(typ: object) -> bool:
|
|
"True if the specified type is a literal of one or more constant values, e.g. `Literal['string']` or `Literal[42]`."
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
return typing.get_origin(typ) is Literal
|
|
|
|
|
|
def unwrap_literal_value(typ: object) -> Any:
|
|
"""
|
|
Extracts the single constant value captured by a literal type.
|
|
|
|
:param typ: The literal type `Literal[value]`.
|
|
:returns: The values captured by the literal type.
|
|
"""
|
|
|
|
args = unwrap_literal_values(typ)
|
|
if len(args) != 1:
|
|
raise TypeError("too many values in literal type")
|
|
|
|
return args[0]
|
|
|
|
|
|
def unwrap_literal_values(typ: object) -> Tuple[Any, ...]:
|
|
"""
|
|
Extracts the constant values captured by a literal type.
|
|
|
|
:param typ: The literal type `Literal[value, ...]`.
|
|
:returns: A tuple of values captured by the literal type.
|
|
"""
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
return typing.get_args(typ)
|
|
|
|
|
|
def unwrap_literal_types(typ: object) -> Tuple[type, ...]:
|
|
"""
|
|
Extracts the types of the constant values captured by a literal type.
|
|
|
|
:param typ: The literal type `Literal[value, ...]`.
|
|
:returns: A tuple of item types `T` such that `type(value) == T`.
|
|
"""
|
|
|
|
return tuple(type(t) for t in unwrap_literal_values(typ))
|
|
|
|
|
|
def is_generic_list(typ: object) -> TypeGuard[Type[list]]:
|
|
"True if the specified type is a generic list, i.e. `List[T]`."
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
return typing.get_origin(typ) is list
|
|
|
|
|
|
def unwrap_generic_list(typ: Type[List[T]]) -> Type[T]:
|
|
"""
|
|
Extracts the item type of a list type.
|
|
|
|
:param typ: The list type `List[T]`.
|
|
:returns: The item type `T`.
|
|
"""
|
|
|
|
return rewrap_annotated_type(_unwrap_generic_list, typ)
|
|
|
|
|
|
def _unwrap_generic_list(typ: Type[List[T]]) -> Type[T]:
|
|
"Extracts the item type of a list type (e.g. returns `T` for `List[T]`)."
|
|
|
|
(list_type,) = typing.get_args(typ) # unpack single tuple element
|
|
return list_type # type: ignore[no-any-return]
|
|
|
|
|
|
def is_generic_set(typ: object) -> TypeGuard[Type[set]]:
|
|
"True if the specified type is a generic set, i.e. `Set[T]`."
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
return typing.get_origin(typ) is set
|
|
|
|
|
|
def unwrap_generic_set(typ: Type[Set[T]]) -> Type[T]:
|
|
"""
|
|
Extracts the item type of a set type.
|
|
|
|
:param typ: The set type `Set[T]`.
|
|
:returns: The item type `T`.
|
|
"""
|
|
|
|
return rewrap_annotated_type(_unwrap_generic_set, typ)
|
|
|
|
|
|
def _unwrap_generic_set(typ: Type[Set[T]]) -> Type[T]:
|
|
"Extracts the item type of a set type (e.g. returns `T` for `Set[T]`)."
|
|
|
|
(set_type,) = typing.get_args(typ) # unpack single tuple element
|
|
return set_type # type: ignore[no-any-return]
|
|
|
|
|
|
def is_generic_dict(typ: object) -> TypeGuard[Type[dict]]:
|
|
"True if the specified type is a generic dictionary, i.e. `Dict[KeyType, ValueType]`."
|
|
|
|
typ = unwrap_annotated_type(typ)
|
|
return typing.get_origin(typ) is dict
|
|
|
|
|
|
def unwrap_generic_dict(typ: Type[Dict[K, V]]) -> Tuple[Type[K], Type[V]]:
|
|
"""
|
|
Extracts the key and value types of a dictionary type as a tuple.
|
|
|
|
:param typ: The dictionary type `Dict[K, V]`.
|
|
:returns: The key and value types `K` and `V`.
|
|
"""
|
|
|
|
return _unwrap_generic_dict(unwrap_annotated_type(typ))
|
|
|
|
|
|
def _unwrap_generic_dict(typ: Type[Dict[K, V]]) -> Tuple[Type[K], Type[V]]:
|
|
"Extracts the key and value types of a dict type (e.g. returns (`K`, `V`) for `Dict[K, V]`)."
|
|
|
|
key_type, value_type = typing.get_args(typ)
|
|
return key_type, value_type
|
|
|
|
|
|
def is_type_annotated(typ: TypeLike) -> bool:
|
|
"True if the type annotation corresponds to an annotated type (i.e. `Annotated[T, ...]`)."
|
|
|
|
return getattr(typ, "__metadata__", None) is not None
|
|
|
|
|
|
def get_annotation(data_type: TypeLike, annotation_type: Type[T]) -> Optional[T]:
|
|
"""
|
|
Returns the first annotation on a data type that matches the expected annotation type.
|
|
|
|
:param data_type: The annotated type from which to extract the annotation.
|
|
:param annotation_type: The annotation class to look for.
|
|
:returns: The annotation class instance found (if any).
|
|
"""
|
|
|
|
metadata = getattr(data_type, "__metadata__", None)
|
|
if metadata is not None:
|
|
for annotation in metadata:
|
|
if isinstance(annotation, annotation_type):
|
|
return annotation
|
|
|
|
return None
|
|
|
|
|
|
def unwrap_annotated_type(typ: T) -> T:
|
|
"Extracts the wrapped type from an annotated type (e.g. returns `T` for `Annotated[T, ...]`)."
|
|
|
|
if is_type_annotated(typ):
|
|
# type is Annotated[T, ...]
|
|
return typing.get_args(typ)[0] # type: ignore[no-any-return]
|
|
else:
|
|
# type is a regular type
|
|
return typ
|
|
|
|
|
|
def rewrap_annotated_type(transform: Callable[[Type[S]], Type[T]], typ: Type[S]) -> Type[T]:
|
|
"""
|
|
Un-boxes, transforms and re-boxes an optionally annotated type.
|
|
|
|
:param transform: A function that maps an un-annotated type to another type.
|
|
:param typ: A type to un-box (if necessary), transform, and re-box (if necessary).
|
|
"""
|
|
|
|
metadata = getattr(typ, "__metadata__", None)
|
|
if metadata is not None:
|
|
# type is Annotated[T, ...]
|
|
inner_type = typing.get_args(typ)[0]
|
|
else:
|
|
# type is a regular type
|
|
inner_type = typ
|
|
|
|
transformed_type = transform(inner_type)
|
|
|
|
if metadata is not None:
|
|
return Annotated[(transformed_type, *metadata)] # type: ignore[return-value]
|
|
else:
|
|
return transformed_type
|
|
|
|
|
|
def get_module_classes(module: types.ModuleType) -> List[type]:
|
|
"Returns all classes declared directly in a module."
|
|
|
|
def is_class_member(member: object) -> TypeGuard[type]:
|
|
return inspect.isclass(member) and member.__module__ == module.__name__
|
|
|
|
return [class_type for _, class_type in inspect.getmembers(module, is_class_member)]
|
|
|
|
|
|
if sys.version_info >= (3, 9):
|
|
|
|
def get_resolved_hints(typ: type) -> Dict[str, type]:
|
|
return typing.get_type_hints(typ, include_extras=True)
|
|
|
|
else:
|
|
|
|
def get_resolved_hints(typ: type) -> Dict[str, type]:
|
|
return typing.get_type_hints(typ)
|
|
|
|
|
|
def get_class_properties(typ: type) -> Iterable[Tuple[str, type | str]]:
|
|
"Returns all properties of a class."
|
|
|
|
if is_dataclass_type(typ):
|
|
return ((field.name, field.type) for field in dataclasses.fields(typ))
|
|
else:
|
|
resolved_hints = get_resolved_hints(typ)
|
|
return resolved_hints.items()
|
|
|
|
|
|
def get_class_property(typ: type, name: str) -> Optional[type | str]:
|
|
"Looks up the annotated type of a property in a class by its property name."
|
|
|
|
for property_name, property_type in get_class_properties(typ):
|
|
if name == property_name:
|
|
return property_type
|
|
return None
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class _ROOT:
|
|
pass
|
|
|
|
|
|
def get_referenced_types(typ: TypeLike, module: Optional[types.ModuleType] = None) -> Set[type]:
|
|
"""
|
|
Extracts types directly or indirectly referenced by this type.
|
|
|
|
For example, extract `T` from `List[T]`, `Optional[T]` or `Annotated[T, ...]`, `K` and `V` from `Dict[K,V]`,
|
|
`A` and `B` from `Union[A,B]`.
|
|
|
|
:param typ: A type or special form.
|
|
:param module: The context in which types are evaluated.
|
|
:returns: Types referenced by the given type or special form.
|
|
"""
|
|
|
|
collector = TypeCollector()
|
|
collector.run(typ, _ROOT, module)
|
|
return collector.references
|
|
|
|
|
|
class TypeCollector:
|
|
"""
|
|
Collects types directly or indirectly referenced by a type.
|
|
|
|
:param graph: The type dependency graph, linking types to types they depend on.
|
|
"""
|
|
|
|
graph: Dict[type, Set[type]]
|
|
|
|
@property
|
|
def references(self) -> Set[type]:
|
|
"Types collected by the type collector."
|
|
|
|
dependencies = set()
|
|
for edges in self.graph.values():
|
|
dependencies.update(edges)
|
|
return dependencies
|
|
|
|
def __init__(self) -> None:
|
|
self.graph = {_ROOT: set()}
|
|
|
|
def traverse(self, typ: type) -> None:
|
|
"Finds all dependent types of a type."
|
|
|
|
self.run(typ, _ROOT, sys.modules[typ.__module__])
|
|
|
|
def traverse_all(self, types: Iterable[type]) -> None:
|
|
"Finds all dependent types of a list of types."
|
|
|
|
for typ in types:
|
|
self.traverse(typ)
|
|
|
|
def run(
|
|
self,
|
|
typ: TypeLike,
|
|
cls: Type[DataclassInstance],
|
|
module: Optional[types.ModuleType],
|
|
) -> None:
|
|
"""
|
|
Extracts types indirectly referenced by this type.
|
|
|
|
For example, extract `T` from `List[T]`, `Optional[T]` or `Annotated[T, ...]`, `K` and `V` from `Dict[K,V]`,
|
|
`A` and `B` from `Union[A,B]`.
|
|
|
|
:param typ: A type or special form.
|
|
:param cls: A dataclass type being expanded for dependent types.
|
|
:param module: The context in which types are evaluated.
|
|
:returns: Types referenced by the given type or special form.
|
|
"""
|
|
|
|
if typ is type(None) or typ is Any:
|
|
return
|
|
|
|
if isinstance(typ, type):
|
|
self.graph[cls].add(typ)
|
|
|
|
if typ in self.graph:
|
|
return
|
|
|
|
self.graph[typ] = set()
|
|
|
|
metadata = getattr(typ, "__metadata__", None)
|
|
if metadata is not None:
|
|
# type is Annotated[T, ...]
|
|
arg = typing.get_args(typ)[0]
|
|
return self.run(arg, cls, module)
|
|
|
|
# type is a forward reference
|
|
if isinstance(typ, str) or isinstance(typ, typing.ForwardRef):
|
|
if module is None:
|
|
raise ValueError("missing context for evaluating types")
|
|
|
|
evaluated_type = evaluate_type(typ, module)
|
|
return self.run(evaluated_type, cls, module)
|
|
|
|
# type is a special form
|
|
origin = typing.get_origin(typ)
|
|
if origin in [list, dict, frozenset, set, tuple, Union]:
|
|
for arg in typing.get_args(typ):
|
|
self.run(arg, cls, module)
|
|
return
|
|
elif origin is Literal:
|
|
return
|
|
|
|
# type is optional or a union type
|
|
if is_type_optional(typ):
|
|
return self.run(unwrap_optional_type(typ), cls, module)
|
|
if is_type_union(typ):
|
|
for union_type in unwrap_union_types(typ):
|
|
self.run(union_type, cls, module)
|
|
return
|
|
|
|
# type is a regular type
|
|
elif is_dataclass_type(typ) or is_type_enum(typ) or isinstance(typ, type):
|
|
context = sys.modules[typ.__module__]
|
|
if is_dataclass_type(typ):
|
|
for field in dataclass_fields(typ):
|
|
self.run(field.type, typ, context)
|
|
else:
|
|
for field_name, field_type in get_resolved_hints(typ).items():
|
|
self.run(field_type, typ, context)
|
|
return
|
|
|
|
raise TypeError(f"expected: type-like; got: {typ}")
|
|
|
|
|
|
if sys.version_info >= (3, 10):
|
|
|
|
def get_signature(fn: Callable[..., Any]) -> inspect.Signature:
|
|
"Extracts the signature of a function."
|
|
|
|
return inspect.signature(fn, eval_str=True)
|
|
|
|
else:
|
|
|
|
def get_signature(fn: Callable[..., Any]) -> inspect.Signature:
|
|
"Extracts the signature of a function."
|
|
|
|
return inspect.signature(fn)
|
|
|
|
|
|
def is_reserved_property(name: str) -> bool:
|
|
"True if the name stands for an internal property."
|
|
|
|
# filter built-in and special properties
|
|
if re.match(r"^__.+__$", name):
|
|
return True
|
|
|
|
# filter built-in special names
|
|
if name in ["_abc_impl"]:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def create_module(name: str) -> types.ModuleType:
|
|
"""
|
|
Creates a new module dynamically at run-time.
|
|
|
|
:param name: Fully qualified name of the new module (with dot notation).
|
|
"""
|
|
|
|
if name in sys.modules:
|
|
raise KeyError(f"{name!r} already in sys.modules")
|
|
|
|
spec = importlib.machinery.ModuleSpec(name, None)
|
|
module = importlib.util.module_from_spec(spec)
|
|
sys.modules[name] = module
|
|
if spec.loader is not None:
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
if sys.version_info >= (3, 10):
|
|
|
|
def create_data_type(class_name: str, fields: List[Tuple[str, type]]) -> type:
|
|
"""
|
|
Creates a new data-class type dynamically.
|
|
|
|
:param class_name: The name of new data-class type.
|
|
:param fields: A list of fields (and their type) that the new data-class type is expected to have.
|
|
:returns: The newly created data-class type.
|
|
"""
|
|
|
|
# has the `slots` parameter
|
|
return dataclasses.make_dataclass(class_name, fields, slots=True)
|
|
|
|
else:
|
|
|
|
def create_data_type(class_name: str, fields: List[Tuple[str, type]]) -> type:
|
|
"""
|
|
Creates a new data-class type dynamically.
|
|
|
|
:param class_name: The name of new data-class type.
|
|
:param fields: A list of fields (and their type) that the new data-class type is expected to have.
|
|
:returns: The newly created data-class type.
|
|
"""
|
|
|
|
cls = dataclasses.make_dataclass(class_name, fields)
|
|
|
|
cls_dict = dict(cls.__dict__)
|
|
field_names = tuple(field.name for field in dataclasses.fields(cls))
|
|
|
|
cls_dict["__slots__"] = field_names
|
|
|
|
for field_name in field_names:
|
|
cls_dict.pop(field_name, None)
|
|
cls_dict.pop("__dict__", None)
|
|
|
|
qualname = getattr(cls, "__qualname__", None)
|
|
cls = type(cls)(cls.__name__, (), cls_dict)
|
|
if qualname is not None:
|
|
cls.__qualname__ = qualname
|
|
|
|
return cls
|
|
|
|
|
|
def create_object(typ: Type[T]) -> T:
|
|
"Creates an instance of a type."
|
|
|
|
if issubclass(typ, Exception):
|
|
# exception types need special treatment
|
|
e = typ.__new__(typ)
|
|
return typing.cast(T, e)
|
|
else:
|
|
return object.__new__(typ)
|
|
|
|
|
|
if sys.version_info >= (3, 9):
|
|
TypeOrGeneric = Union[type, types.GenericAlias]
|
|
|
|
else:
|
|
TypeOrGeneric = object
|
|
|
|
|
|
def is_generic_instance(obj: Any, typ: TypeLike) -> bool:
|
|
"""
|
|
Returns whether an object is an instance of a generic class, a standard class or of a subclass thereof.
|
|
|
|
This function checks the following items recursively:
|
|
* items of a list
|
|
* keys and values of a dictionary
|
|
* members of a set
|
|
* items of a tuple
|
|
* members of a union type
|
|
|
|
:param obj: The (possibly generic container) object to check recursively.
|
|
:param typ: The expected type of the object.
|
|
"""
|
|
|
|
if isinstance(typ, typing.ForwardRef):
|
|
fwd: typing.ForwardRef = typ
|
|
identifier = fwd.__forward_arg__
|
|
typ = eval(identifier)
|
|
if isinstance(typ, type):
|
|
return isinstance(obj, typ)
|
|
else:
|
|
return False
|
|
|
|
# generic types (e.g. list, dict, set, etc.)
|
|
origin_type = typing.get_origin(typ)
|
|
if origin_type is list:
|
|
if not isinstance(obj, list):
|
|
return False
|
|
(list_item_type,) = typing.get_args(typ) # unpack single tuple element
|
|
list_obj: list = obj
|
|
return all(is_generic_instance(item, list_item_type) for item in list_obj)
|
|
elif origin_type is dict:
|
|
if not isinstance(obj, dict):
|
|
return False
|
|
key_type, value_type = typing.get_args(typ)
|
|
dict_obj: dict = obj
|
|
return all(
|
|
is_generic_instance(key, key_type) and is_generic_instance(value, value_type)
|
|
for key, value in dict_obj.items()
|
|
)
|
|
elif origin_type is set:
|
|
if not isinstance(obj, set):
|
|
return False
|
|
(set_member_type,) = typing.get_args(typ) # unpack single tuple element
|
|
set_obj: set = obj
|
|
return all(is_generic_instance(item, set_member_type) for item in set_obj)
|
|
elif origin_type is tuple:
|
|
if not isinstance(obj, tuple):
|
|
return False
|
|
return all(
|
|
is_generic_instance(item, tuple_item_type)
|
|
for tuple_item_type, item in zip(
|
|
(tuple_item_type for tuple_item_type in typing.get_args(typ)),
|
|
(item for item in obj),
|
|
strict=False,
|
|
)
|
|
)
|
|
elif origin_type is Union:
|
|
return any(is_generic_instance(obj, member_type) for member_type in typing.get_args(typ))
|
|
elif isinstance(typ, type):
|
|
return isinstance(obj, typ)
|
|
else:
|
|
raise TypeError(f"expected `type` but got: {typ}")
|
|
|
|
|
|
class RecursiveChecker:
|
|
_pred: Optional[Callable[[type, Any], bool]]
|
|
|
|
def __init__(self, pred: Callable[[type, Any], bool]) -> None:
|
|
"""
|
|
Creates a checker to verify if a predicate applies to all nested member properties of an object recursively.
|
|
|
|
:param pred: The predicate to test on member properties. Takes a property type and a property value.
|
|
"""
|
|
|
|
self._pred = pred
|
|
|
|
def pred(self, typ: type, obj: Any) -> bool:
|
|
"Acts as a workaround for the type checker mypy."
|
|
|
|
assert self._pred is not None
|
|
return self._pred(typ, obj)
|
|
|
|
def check(self, typ: TypeLike, obj: Any) -> bool:
|
|
"""
|
|
Checks if a predicate applies to all nested member properties of an object recursively.
|
|
|
|
:param typ: The type to recurse into.
|
|
:param obj: The object to inspect recursively. Must be an instance of the given type.
|
|
:returns: True if all member properties pass the filter predicate.
|
|
"""
|
|
|
|
# check for well-known types
|
|
if (
|
|
typ is type(None)
|
|
or typ is bool
|
|
or typ is int
|
|
or typ is float
|
|
or typ is str
|
|
or typ is bytes
|
|
or typ is datetime.datetime
|
|
or typ is datetime.date
|
|
or typ is datetime.time
|
|
or typ is uuid.UUID
|
|
):
|
|
return self.pred(typing.cast(type, typ), obj)
|
|
|
|
# generic types (e.g. list, dict, set, etc.)
|
|
origin_type = typing.get_origin(typ)
|
|
if origin_type is list:
|
|
if not isinstance(obj, list):
|
|
raise TypeError(f"expected `list` but got: {obj}")
|
|
(list_item_type,) = typing.get_args(typ) # unpack single tuple element
|
|
list_obj: list = obj
|
|
return all(self.check(list_item_type, item) for item in list_obj)
|
|
elif origin_type is dict:
|
|
if not isinstance(obj, dict):
|
|
raise TypeError(f"expected `dict` but got: {obj}")
|
|
key_type, value_type = typing.get_args(typ)
|
|
dict_obj: dict = obj
|
|
return all(self.check(value_type, item) for item in dict_obj.values())
|
|
elif origin_type is set:
|
|
if not isinstance(obj, set):
|
|
raise TypeError(f"expected `set` but got: {obj}")
|
|
(set_member_type,) = typing.get_args(typ) # unpack single tuple element
|
|
set_obj: set = obj
|
|
return all(self.check(set_member_type, item) for item in set_obj)
|
|
elif origin_type is tuple:
|
|
if not isinstance(obj, tuple):
|
|
raise TypeError(f"expected `tuple` but got: {obj}")
|
|
return all(
|
|
self.check(tuple_item_type, item)
|
|
for tuple_item_type, item in zip(
|
|
(tuple_item_type for tuple_item_type in typing.get_args(typ)),
|
|
(item for item in obj),
|
|
strict=False,
|
|
)
|
|
)
|
|
elif origin_type is Union:
|
|
return self.pred(typ, obj) # type: ignore[arg-type]
|
|
|
|
if not inspect.isclass(typ):
|
|
raise TypeError(f"expected `type` but got: {typ}")
|
|
|
|
# enumeration type
|
|
if issubclass(typ, enum.Enum):
|
|
if not isinstance(obj, enum.Enum):
|
|
raise TypeError(f"expected `{typ}` but got: {obj}")
|
|
return self.pred(typ, obj)
|
|
|
|
# class types with properties
|
|
if is_named_tuple_type(typ):
|
|
if not isinstance(obj, tuple):
|
|
raise TypeError(f"expected `NamedTuple` but got: {obj}")
|
|
return all(
|
|
self.check(field_type, getattr(obj, field_name))
|
|
for field_name, field_type in typing.get_type_hints(typ).items()
|
|
)
|
|
elif is_dataclass_type(typ):
|
|
if not isinstance(obj, typ):
|
|
raise TypeError(f"expected `{typ}` but got: {obj}")
|
|
resolved_hints = get_resolved_hints(typ)
|
|
return all(
|
|
self.check(resolved_hints[field.name], getattr(obj, field.name)) for field in dataclasses.fields(typ)
|
|
)
|
|
else:
|
|
if not isinstance(obj, typ):
|
|
raise TypeError(f"expected `{typ}` but got: {obj}")
|
|
return all(
|
|
self.check(property_type, getattr(obj, property_name))
|
|
for property_name, property_type in get_class_properties(typ)
|
|
)
|
|
|
|
|
|
def check_recursive(
|
|
obj: object,
|
|
/,
|
|
*,
|
|
pred: Optional[Callable[[type, Any], bool]] = None,
|
|
type_pred: Optional[Callable[[type], bool]] = None,
|
|
value_pred: Optional[Callable[[Any], bool]] = None,
|
|
) -> bool:
|
|
"""
|
|
Checks if a predicate applies to all nested member properties of an object recursively.
|
|
|
|
:param obj: The object to inspect recursively.
|
|
:param pred: The predicate to test on member properties. Takes a property type and a property value.
|
|
:param type_pred: Constrains the check to properties of an expected type. Properties of other types pass automatically.
|
|
:param value_pred: Verifies a condition on member property values (of an expected type).
|
|
:returns: True if all member properties pass the filter predicate(s).
|
|
"""
|
|
|
|
if type_pred is not None and value_pred is not None:
|
|
if pred is not None:
|
|
raise TypeError("filter predicate not permitted when type and value predicates are present")
|
|
|
|
type_p: Callable[[Type[T]], bool] = type_pred
|
|
value_p: Callable[[T], bool] = value_pred
|
|
pred = lambda typ, obj: not type_p(typ) or value_p(obj) # noqa: E731
|
|
|
|
elif value_pred is not None:
|
|
if pred is not None:
|
|
raise TypeError("filter predicate not permitted when value predicate is present")
|
|
|
|
value_only_p: Callable[[T], bool] = value_pred
|
|
pred = lambda typ, obj: value_only_p(obj) # noqa: E731
|
|
|
|
elif type_pred is not None:
|
|
raise TypeError("value predicate required when type predicate is present")
|
|
|
|
elif pred is None:
|
|
pred = lambda typ, obj: True # noqa: E731
|
|
|
|
return RecursiveChecker(pred).check(type(obj), obj)
|