# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. """ Type-safe data interchange for Python data classes. :see: https://github.com/hunyadi/strong_typing """ import dataclasses import datetime import decimal import enum import functools import inspect import json import types import typing import uuid from copy import deepcopy from typing import ( Any, Callable, ClassVar, Dict, List, Literal, Optional, Tuple, Type, TypeVar, Union, overload, ) import jsonschema from typing_extensions import Annotated from . import docstring from .auxiliary import ( Alias, IntegerRange, MaxLength, MinLength, Precision, get_auxiliary_format, ) from .core import JsonArray, JsonObject, JsonType, Schema, StrictJsonType from .inspection import ( TypeLike, enum_value_types, get_annotation, get_class_properties, is_type_enum, is_type_like, is_type_optional, unwrap_optional_type, ) from .name import python_type_to_name from .serialization import object_to_json # determines the maximum number of distinct enum members up to which a Dict[EnumType, Any] is converted into a JSON # schema with explicitly listed properties (rather than employing a pattern constraint on property names) OBJECT_ENUM_EXPANSION_LIMIT = 4 T = TypeVar("T") def get_class_docstrings(data_type: type) -> Tuple[Optional[str], Optional[str]]: docstr = docstring.parse_type(data_type) # check if class has a doc-string other than the auto-generated string assigned by @dataclass if docstring.has_default_docstring(data_type): return None, None return docstr.short_description, docstr.long_description def get_class_property_docstrings( data_type: type, transform_fun: Optional[Callable[[type, str, str], str]] = None ) -> Dict[str, str]: """ Extracts the documentation strings associated with the properties of a composite type. :param data_type: The object whose properties to iterate over. :param transform_fun: An optional function that maps a property documentation string to a custom tailored string. :returns: A dictionary mapping property names to descriptions. """ result = {} for base in inspect.getmro(data_type): docstr = docstring.parse_type(base) for param in docstr.params.values(): if param.name in result: continue if transform_fun: description = transform_fun(data_type, param.name, param.description) else: description = param.description result[param.name] = description return result def docstring_to_schema(data_type: type) -> Schema: short_description, long_description = get_class_docstrings(data_type) schema: Schema = { "title": python_type_to_name(data_type), } description = "\n".join(filter(None, [short_description, long_description])) if description: schema["description"] = description return schema def id_from_ref(data_type: Union[typing.ForwardRef, str, type]) -> str: "Extracts the name of a possibly forward-referenced type." if isinstance(data_type, typing.ForwardRef): forward_type: typing.ForwardRef = data_type return forward_type.__forward_arg__ elif isinstance(data_type, str): return data_type else: return data_type.__name__ def type_from_ref(data_type: Union[typing.ForwardRef, str, type]) -> Tuple[str, type]: "Creates a type from a forward reference." if isinstance(data_type, typing.ForwardRef): forward_type: typing.ForwardRef = data_type true_type = eval(forward_type.__forward_code__) return forward_type.__forward_arg__, true_type elif isinstance(data_type, str): true_type = eval(data_type) return data_type, true_type else: return data_type.__name__, data_type @dataclasses.dataclass class TypeCatalogEntry: schema: Optional[Schema] identifier: str examples: Optional[JsonType] = None class TypeCatalog: "Maintains an association of well-known Python types to their JSON schema." _by_type: Dict[TypeLike, TypeCatalogEntry] _by_name: Dict[str, TypeCatalogEntry] def __init__(self) -> None: self._by_type = {} self._by_name = {} def __contains__(self, data_type: TypeLike) -> bool: if isinstance(data_type, typing.ForwardRef): fwd: typing.ForwardRef = data_type name = fwd.__forward_arg__ return name in self._by_name else: return data_type in self._by_type def add( self, data_type: TypeLike, schema: Optional[Schema], identifier: str, examples: Optional[List[JsonType]] = None, ) -> None: if isinstance(data_type, typing.ForwardRef): raise TypeError("forward references cannot be used to register a type") if data_type in self._by_type: raise ValueError(f"type {data_type} is already registered in the catalog") entry = TypeCatalogEntry(schema, identifier, examples) self._by_type[data_type] = entry self._by_name[identifier] = entry def get(self, data_type: TypeLike) -> TypeCatalogEntry: if isinstance(data_type, typing.ForwardRef): fwd: typing.ForwardRef = data_type name = fwd.__forward_arg__ return self._by_name[name] else: return self._by_type[data_type] @dataclasses.dataclass class SchemaOptions: definitions_path: str = "#/definitions/" use_descriptions: bool = True use_examples: bool = True property_description_fun: Optional[Callable[[type, str, str], str]] = None class JsonSchemaGenerator: "Creates a JSON schema with user-defined type definitions." type_catalog: ClassVar[TypeCatalog] = TypeCatalog() types_used: Dict[str, TypeLike] options: SchemaOptions def __init__(self, options: Optional[SchemaOptions] = None): if options is None: self.options = SchemaOptions() else: self.options = options self.types_used = {} @functools.singledispatchmethod def _metadata_to_schema(self, arg: object) -> Schema: # unrecognized annotation return {} @_metadata_to_schema.register def _(self, arg: IntegerRange) -> Schema: return {"minimum": arg.minimum, "maximum": arg.maximum} @_metadata_to_schema.register def _(self, arg: Precision) -> Schema: return { "multipleOf": 10 ** (-arg.decimal_digits), "exclusiveMinimum": -(10**arg.integer_digits), "exclusiveMaximum": (10**arg.integer_digits), } @_metadata_to_schema.register def _(self, arg: MinLength) -> Schema: return {"minLength": arg.value} @_metadata_to_schema.register def _(self, arg: MaxLength) -> Schema: return {"maxLength": arg.value} def _with_metadata(self, type_schema: Schema, metadata: Optional[Tuple[Any, ...]]) -> Schema: if metadata: for m in metadata: type_schema.update(self._metadata_to_schema(m)) return type_schema def _simple_type_to_schema(self, typ: TypeLike, json_schema_extra: Optional[dict] = None) -> Optional[Schema]: """ Returns the JSON schema associated with a simple, unrestricted type. :returns: The schema for a simple type, or `None`. """ if typ is type(None): return {"type": "null"} elif typ is bool: return {"type": "boolean"} elif typ is int: return {"type": "integer"} elif typ is float: return {"type": "number"} elif typ is str: if json_schema_extra and "contentEncoding" in json_schema_extra: return { "type": "string", "contentEncoding": json_schema_extra["contentEncoding"], } return {"type": "string"} elif typ is bytes: return {"type": "string", "contentEncoding": "base64"} elif typ is datetime.datetime: # 2018-11-13T20:20:39+00:00 return { "type": "string", "format": "date-time", } elif typ is datetime.date: # 2018-11-13 return {"type": "string", "format": "date"} elif typ is datetime.time: # 20:20:39+00:00 return {"type": "string", "format": "time"} elif typ is decimal.Decimal: return {"type": "number"} elif typ is uuid.UUID: # f81d4fae-7dec-11d0-a765-00a0c91e6bf6 return {"type": "string", "format": "uuid"} elif typ is Any: return { "oneOf": [ {"type": "null"}, {"type": "boolean"}, {"type": "number"}, {"type": "string"}, {"type": "array"}, {"type": "object"}, ] } elif typ is JsonObject: return {"type": "object"} elif typ is JsonArray: return {"type": "array"} else: # not a simple type return None def type_to_schema( self, data_type: TypeLike, force_expand: bool = False, json_schema_extra: Optional[dict] = None, ) -> Schema: common_info = {} if json_schema_extra and "deprecated" in json_schema_extra: common_info["deprecated"] = json_schema_extra["deprecated"] return self._type_to_schema(data_type, force_expand, json_schema_extra) | common_info def _type_to_schema( self, data_type: TypeLike, force_expand: bool = False, json_schema_extra: Optional[dict] = None, ) -> Schema: """ Returns the JSON schema associated with a type. :param data_type: The Python type whose JSON schema to return. :param force_expand: Forces a JSON schema to be returned even if the type is registered in the catalog of known types. :returns: The JSON schema associated with the type. """ # short-circuit for common simple types schema = self._simple_type_to_schema(data_type, json_schema_extra) if schema is not None: return schema # types registered in the type catalog of well-known types type_catalog = JsonSchemaGenerator.type_catalog if not force_expand and data_type in type_catalog: # user-defined type identifier = type_catalog.get(data_type).identifier self.types_used.setdefault(identifier, data_type) return {"$ref": f"{self.options.definitions_path}{identifier}"} # unwrap annotated types metadata = getattr(data_type, "__metadata__", None) if metadata is not None: # type is Annotated[T, ...] typ = typing.get_args(data_type)[0] schema = self._simple_type_to_schema(typ) if schema is not None: # recognize well-known auxiliary types fmt = get_auxiliary_format(data_type) if fmt is not None: schema.update({"format": fmt}) return schema else: return self._with_metadata(schema, metadata) else: # type is a regular type typ = data_type if isinstance(typ, typing.ForwardRef) or isinstance(typ, str): if force_expand: identifier, true_type = type_from_ref(typ) return self.type_to_schema(true_type, force_expand=True) else: try: identifier, true_type = type_from_ref(typ) self.types_used[identifier] = true_type except NameError: identifier = id_from_ref(typ) return {"$ref": f"{self.options.definitions_path}{identifier}"} if is_type_enum(typ): enum_type: Type[enum.Enum] = typ value_types = enum_value_types(enum_type) if len(value_types) != 1: raise ValueError( f"enumerations must have a consistent member value type but several types found: {value_types}" ) enum_value_type = value_types.pop() enum_schema: Schema if enum_value_type is bool or enum_value_type is int or enum_value_type is float or enum_value_type is str: if enum_value_type is bool: enum_schema_type = "boolean" elif enum_value_type is int: enum_schema_type = "integer" elif enum_value_type is float: enum_schema_type = "number" elif enum_value_type is str: enum_schema_type = "string" enum_schema = { "type": enum_schema_type, "enum": [object_to_json(e.value) for e in enum_type], } if self.options.use_descriptions: enum_schema.update(docstring_to_schema(typ)) return enum_schema else: enum_schema = self.type_to_schema(enum_value_type) if self.options.use_descriptions: enum_schema.update(docstring_to_schema(typ)) return enum_schema origin_type = typing.get_origin(typ) if origin_type is list: (list_type,) = typing.get_args(typ) # unpack single tuple element return {"type": "array", "items": self.type_to_schema(list_type)} elif origin_type is dict: key_type, value_type = typing.get_args(typ) if not (key_type is str or key_type is int or is_type_enum(key_type)): raise ValueError("`dict` with key type not coercible to `str` is not supported") dict_schema: Schema value_schema = self.type_to_schema(value_type) if is_type_enum(key_type): enum_values = [str(e.value) for e in key_type] if len(enum_values) > OBJECT_ENUM_EXPANSION_LIMIT: dict_schema = { "propertyNames": {"pattern": "^(" + "|".join(enum_values) + ")$"}, "additionalProperties": value_schema, } else: dict_schema = { "properties": {value: value_schema for value in enum_values}, "additionalProperties": False, } else: dict_schema = {"additionalProperties": value_schema} schema = {"type": "object"} schema.update(dict_schema) return schema elif origin_type is set: (set_type,) = typing.get_args(typ) # unpack single tuple element return { "type": "array", "items": self.type_to_schema(set_type), "uniqueItems": True, } elif origin_type is tuple: args = typing.get_args(typ) return { "type": "array", "minItems": len(args), "maxItems": len(args), "prefixItems": [self.type_to_schema(member_type) for member_type in args], } elif origin_type in (Union, types.UnionType): discriminator = None if typing.get_origin(data_type) is Annotated: discriminator = typing.get_args(data_type)[1].discriminator ret: Schema = {"oneOf": [self.type_to_schema(union_type) for union_type in typing.get_args(typ)]} if discriminator: # for each union type, we need to read the value of the discriminator mapping: dict[str, JsonType] = {} for union_type in typing.get_args(typ): props = self.type_to_schema(union_type, force_expand=True)["properties"] # mypy is confused here because JsonType allows multiple types, some of them # not indexable (bool?) or not indexable by string (list?). The correctness of # types depends on correct model definitions. Hence multiple ignore statements below. discriminator_value = props[discriminator]["default"] # type: ignore[index,call-overload] mapping[discriminator_value] = self.type_to_schema(union_type)["$ref"] # type: ignore[index] ret["discriminator"] = { "propertyName": discriminator, "mapping": mapping, } return ret elif origin_type is Literal: if len(typing.get_args(typ)) != 1: raise ValueError(f"Literal type {typ} has {len(typing.get_args(typ))} arguments") (literal_value,) = typing.get_args(typ) # unpack value of literal type schema = self.type_to_schema(type(literal_value)) schema["const"] = literal_value return schema elif origin_type is type: (concrete_type,) = typing.get_args(typ) # unpack single tuple element return {"const": self.type_to_schema(concrete_type, force_expand=True)} # dictionary of class attributes members = dict(inspect.getmembers(typ, lambda a: not inspect.isroutine(a))) property_docstrings = get_class_property_docstrings(typ, self.options.property_description_fun) properties: Dict[str, Schema] = {} required: List[str] = [] for property_name, property_type in get_class_properties(typ): # rename property if an alias name is specified alias = get_annotation(property_type, Alias) if alias: output_name = alias.name else: output_name = property_name defaults = {} json_schema_extra = None if "model_fields" in members: f = members["model_fields"] defaults = {k: finfo.default for k, finfo in f.items()} if output_name in f: finfo = f[output_name] json_schema_extra = finfo.json_schema_extra or {} if finfo.deprecated: json_schema_extra["deprecated"] = True if is_type_optional(property_type): optional_type: type = unwrap_optional_type(property_type) property_def = self.type_to_schema(optional_type, json_schema_extra=json_schema_extra) else: property_def = self.type_to_schema(property_type, json_schema_extra=json_schema_extra) required.append(output_name) # check if attribute has a default value initializer if defaults.get(property_name) is not None: def_value = defaults[property_name] # check if value can be directly represented in JSON if isinstance( def_value, ( bool, int, float, str, enum.Enum, datetime.datetime, datetime.date, datetime.time, ), ): property_def["default"] = object_to_json(def_value) # add property docstring if available property_doc = property_docstrings.get(property_name) if property_doc: # print(output_name, property_doc) property_def.pop("title", None) property_def["description"] = property_doc properties[output_name] = property_def schema = {"type": "object"} if len(properties) > 0: schema["properties"] = typing.cast(JsonType, properties) schema["additionalProperties"] = False if len(required) > 0: schema["required"] = typing.cast(JsonType, required) if self.options.use_descriptions: schema.update(docstring_to_schema(typ)) return schema def _type_to_schema_with_lookup(self, data_type: TypeLike) -> Schema: """ Returns the JSON schema associated with a type that may be registered in the catalog of known types. :param data_type: The type whose JSON schema we seek. :returns: The JSON schema associated with the type. """ entry = JsonSchemaGenerator.type_catalog.get(data_type) if entry.schema is None: type_schema = self.type_to_schema(data_type, force_expand=True) else: type_schema = deepcopy(entry.schema) # add descriptive text (if present) if self.options.use_descriptions: if isinstance(data_type, type) and not isinstance(data_type, typing.ForwardRef): type_schema.update(docstring_to_schema(data_type)) # add example (if present) if self.options.use_examples and entry.examples: type_schema["examples"] = entry.examples return type_schema def classdef_to_schema(self, data_type: TypeLike, force_expand: bool = False) -> Tuple[Schema, Dict[str, Schema]]: """ Returns the JSON schema associated with a type and any nested types. :param data_type: The type whose JSON schema to return. :param force_expand: True if a full JSON schema is to be returned even for well-known types; false if a schema reference is to be used for well-known types. :returns: A tuple of the JSON schema, and a mapping between nested type names and their corresponding schema. """ if not is_type_like(data_type): raise TypeError(f"expected a type-like object but got: {data_type}") self.types_used = {} try: type_schema = self.type_to_schema(data_type, force_expand=force_expand) types_defined: Dict[str, Schema] = {} while len(self.types_used) > len(types_defined): # make a snapshot copy; original collection is going to be modified types_undefined = { sub_name: sub_type for sub_name, sub_type in self.types_used.items() if sub_name not in types_defined } # expand undefined types, which may lead to additional types to be defined for sub_name, sub_type in types_undefined.items(): types_defined[sub_name] = self._type_to_schema_with_lookup(sub_type) type_definitions = dict(sorted(types_defined.items())) finally: self.types_used = {} return type_schema, type_definitions class Validator(enum.Enum): "Defines constants for JSON schema standards." Draft7 = jsonschema.Draft7Validator Draft201909 = jsonschema.Draft201909Validator Draft202012 = jsonschema.Draft202012Validator Latest = jsonschema.Draft202012Validator def classdef_to_schema( data_type: TypeLike, options: Optional[SchemaOptions] = None, validator: Validator = Validator.Latest, ) -> Schema: """ Returns the JSON schema corresponding to the given type. :param data_type: The Python type used to generate the JSON schema :returns: A JSON object that you can serialize to a JSON string with json.dump or json.dumps :raises TypeError: Indicates that the generated JSON schema does not validate against the desired meta-schema. """ # short-circuit with an error message when passing invalid data if not is_type_like(data_type): raise TypeError(f"expected a type-like object but got: {data_type}") generator = JsonSchemaGenerator(options) type_schema, type_definitions = generator.classdef_to_schema(data_type) class_schema: Schema = {} if type_definitions: class_schema["definitions"] = typing.cast(JsonType, type_definitions) class_schema.update(type_schema) validator_id = validator.value.META_SCHEMA["$id"] try: validator.value.check_schema(class_schema) except jsonschema.exceptions.SchemaError: raise TypeError(f"schema does not validate against meta-schema <{validator_id}>") schema = {"$schema": validator_id} schema.update(class_schema) return schema def validate_object(data_type: TypeLike, json_dict: JsonType) -> None: """ Validates if the JSON dictionary object conforms to the expected type. :param data_type: The type to match against. :param json_dict: A JSON object obtained with `json.load` or `json.loads`. :raises jsonschema.exceptions.ValidationError: Indicates that the JSON object cannot represent the type. """ schema_dict = classdef_to_schema(data_type) jsonschema.validate(json_dict, schema_dict, format_checker=jsonschema.FormatChecker()) def print_schema(data_type: type) -> None: """Pretty-prints the JSON schema corresponding to the type.""" s = classdef_to_schema(data_type) print(json.dumps(s, indent=4)) def get_schema_identifier(data_type: type) -> Optional[str]: if data_type in JsonSchemaGenerator.type_catalog: return JsonSchemaGenerator.type_catalog.get(data_type).identifier else: return None def register_schema( data_type: T, schema: Optional[Schema] = None, name: Optional[str] = None, examples: Optional[List[JsonType]] = None, ) -> T: """ Associates a type with a JSON schema definition. :param data_type: The type to associate with a JSON schema. :param schema: The schema to associate the type with. Derived automatically if omitted. :param name: The name used for looking uo the type. Determined automatically if omitted. :returns: The input type. """ JsonSchemaGenerator.type_catalog.add( data_type, schema, name if name is not None else python_type_to_name(data_type), examples, ) return data_type @overload def json_schema_type(cls: Type[T], /) -> Type[T]: ... @overload def json_schema_type(cls: None, *, schema: Optional[Schema] = None) -> Callable[[Type[T]], Type[T]]: ... def json_schema_type( cls: Optional[Type[T]] = None, *, schema: Optional[Schema] = None, examples: Optional[List[JsonType]] = None, ) -> Union[Type[T], Callable[[Type[T]], Type[T]]]: """Decorator to add user-defined schema definition to a class.""" def wrap(cls: Type[T]) -> Type[T]: return register_schema(cls, schema, examples=examples) # see if decorator is used as @json_schema_type or @json_schema_type() if cls is None: # called with parentheses return wrap else: # called as @json_schema_type without parentheses return wrap(cls) register_schema(JsonObject, name="JsonObject") register_schema(JsonArray, name="JsonArray") register_schema( JsonType, name="JsonType", examples=[ { "property1": None, "property2": True, "property3": 64, "property4": "string", "property5": ["item"], "property6": {"key": "value"}, } ], ) register_schema( StrictJsonType, name="StrictJsonType", examples=[ { "property1": True, "property2": 64, "property3": "string", "property4": ["item"], "property5": {"key": "value"}, } ], )