forked from phoenix-oss/llama-stack-mirror
See https://swagger.io/docs/specification/v3_0/data-models/inheritance-and-polymorphism/#discriminator When specifying discriminators, mapping must be specified unless the value of the discriminator is the subtype itself (which in our case is not.) The changes in the YAML are self-explanatory.
774 lines
26 KiB
Python
774 lines
26 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the terms described in the LICENSE file in
|
|
# the root directory of this source tree.
|
|
|
|
"""
|
|
Type-safe data interchange for Python data classes.
|
|
|
|
:see: https://github.com/hunyadi/strong_typing
|
|
"""
|
|
|
|
import dataclasses
|
|
import datetime
|
|
import decimal
|
|
import enum
|
|
import functools
|
|
import inspect
|
|
import json
|
|
import typing
|
|
import uuid
|
|
from copy import deepcopy
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
ClassVar,
|
|
Dict,
|
|
List,
|
|
Literal,
|
|
Optional,
|
|
overload,
|
|
Tuple,
|
|
Type,
|
|
TypeVar,
|
|
Union,
|
|
)
|
|
|
|
import jsonschema
|
|
from typing_extensions import Annotated
|
|
|
|
from . import docstring
|
|
from .auxiliary import (
|
|
Alias,
|
|
get_auxiliary_format,
|
|
IntegerRange,
|
|
MaxLength,
|
|
MinLength,
|
|
Precision,
|
|
)
|
|
from .core import JsonArray, JsonObject, JsonType, Schema, StrictJsonType
|
|
from .inspection import (
|
|
enum_value_types,
|
|
get_annotation,
|
|
get_class_properties,
|
|
is_type_enum,
|
|
is_type_like,
|
|
is_type_optional,
|
|
TypeLike,
|
|
unwrap_optional_type,
|
|
)
|
|
from .name import python_type_to_name
|
|
from .serialization import object_to_json
|
|
|
|
# determines the maximum number of distinct enum members up to which a Dict[EnumType, Any] is converted into a JSON
|
|
# schema with explicitly listed properties (rather than employing a pattern constraint on property names)
|
|
OBJECT_ENUM_EXPANSION_LIMIT = 4
|
|
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
def get_class_docstrings(data_type: type) -> Tuple[Optional[str], Optional[str]]:
|
|
docstr = docstring.parse_type(data_type)
|
|
|
|
# check if class has a doc-string other than the auto-generated string assigned by @dataclass
|
|
if docstring.has_default_docstring(data_type):
|
|
return None, None
|
|
|
|
return docstr.short_description, docstr.long_description
|
|
|
|
|
|
def get_class_property_docstrings(
|
|
data_type: type, transform_fun: Optional[Callable[[type, str, str], str]] = None
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Extracts the documentation strings associated with the properties of a composite type.
|
|
|
|
:param data_type: The object whose properties to iterate over.
|
|
:param transform_fun: An optional function that maps a property documentation string to a custom tailored string.
|
|
:returns: A dictionary mapping property names to descriptions.
|
|
"""
|
|
|
|
result = {}
|
|
for base in inspect.getmro(data_type):
|
|
docstr = docstring.parse_type(base)
|
|
for param in docstr.params.values():
|
|
if param.name in result:
|
|
continue
|
|
|
|
if transform_fun:
|
|
description = transform_fun(data_type, param.name, param.description)
|
|
else:
|
|
description = param.description
|
|
|
|
result[param.name] = description
|
|
return result
|
|
|
|
|
|
def docstring_to_schema(data_type: type) -> Schema:
|
|
short_description, long_description = get_class_docstrings(data_type)
|
|
schema: Schema = {}
|
|
if short_description:
|
|
schema["title"] = short_description
|
|
if long_description:
|
|
schema["description"] = long_description
|
|
return schema
|
|
|
|
|
|
def id_from_ref(data_type: Union[typing.ForwardRef, str, type]) -> str:
|
|
"Extracts the name of a possibly forward-referenced type."
|
|
|
|
if isinstance(data_type, typing.ForwardRef):
|
|
forward_type: typing.ForwardRef = data_type
|
|
return forward_type.__forward_arg__
|
|
elif isinstance(data_type, str):
|
|
return data_type
|
|
else:
|
|
return data_type.__name__
|
|
|
|
|
|
def type_from_ref(data_type: Union[typing.ForwardRef, str, type]) -> Tuple[str, type]:
|
|
"Creates a type from a forward reference."
|
|
|
|
if isinstance(data_type, typing.ForwardRef):
|
|
forward_type: typing.ForwardRef = data_type
|
|
true_type = eval(forward_type.__forward_code__)
|
|
return forward_type.__forward_arg__, true_type
|
|
elif isinstance(data_type, str):
|
|
true_type = eval(data_type)
|
|
return data_type, true_type
|
|
else:
|
|
return data_type.__name__, data_type
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class TypeCatalogEntry:
|
|
schema: Optional[Schema]
|
|
identifier: str
|
|
examples: Optional[JsonType] = None
|
|
|
|
|
|
class TypeCatalog:
|
|
"Maintains an association of well-known Python types to their JSON schema."
|
|
|
|
_by_type: Dict[TypeLike, TypeCatalogEntry]
|
|
_by_name: Dict[str, TypeCatalogEntry]
|
|
|
|
def __init__(self) -> None:
|
|
self._by_type = {}
|
|
self._by_name = {}
|
|
|
|
def __contains__(self, data_type: TypeLike) -> bool:
|
|
if isinstance(data_type, typing.ForwardRef):
|
|
fwd: typing.ForwardRef = data_type
|
|
name = fwd.__forward_arg__
|
|
return name in self._by_name
|
|
else:
|
|
return data_type in self._by_type
|
|
|
|
def add(
|
|
self,
|
|
data_type: TypeLike,
|
|
schema: Optional[Schema],
|
|
identifier: str,
|
|
examples: Optional[List[JsonType]] = None,
|
|
) -> None:
|
|
if isinstance(data_type, typing.ForwardRef):
|
|
raise TypeError("forward references cannot be used to register a type")
|
|
|
|
if data_type in self._by_type:
|
|
raise ValueError(f"type {data_type} is already registered in the catalog")
|
|
|
|
entry = TypeCatalogEntry(schema, identifier, examples)
|
|
self._by_type[data_type] = entry
|
|
self._by_name[identifier] = entry
|
|
|
|
def get(self, data_type: TypeLike) -> TypeCatalogEntry:
|
|
if isinstance(data_type, typing.ForwardRef):
|
|
fwd: typing.ForwardRef = data_type
|
|
name = fwd.__forward_arg__
|
|
return self._by_name[name]
|
|
else:
|
|
return self._by_type[data_type]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class SchemaOptions:
|
|
definitions_path: str = "#/definitions/"
|
|
use_descriptions: bool = True
|
|
use_examples: bool = True
|
|
property_description_fun: Optional[Callable[[type, str, str], str]] = None
|
|
|
|
|
|
class JsonSchemaGenerator:
|
|
"Creates a JSON schema with user-defined type definitions."
|
|
|
|
type_catalog: ClassVar[TypeCatalog] = TypeCatalog()
|
|
types_used: Dict[str, TypeLike]
|
|
options: SchemaOptions
|
|
|
|
def __init__(self, options: Optional[SchemaOptions] = None):
|
|
if options is None:
|
|
self.options = SchemaOptions()
|
|
else:
|
|
self.options = options
|
|
self.types_used = {}
|
|
|
|
@functools.singledispatchmethod
|
|
def _metadata_to_schema(self, arg: object) -> Schema:
|
|
# unrecognized annotation
|
|
return {}
|
|
|
|
@_metadata_to_schema.register
|
|
def _(self, arg: IntegerRange) -> Schema:
|
|
return {"minimum": arg.minimum, "maximum": arg.maximum}
|
|
|
|
@_metadata_to_schema.register
|
|
def _(self, arg: Precision) -> Schema:
|
|
return {
|
|
"multipleOf": 10 ** (-arg.decimal_digits),
|
|
"exclusiveMinimum": -(10**arg.integer_digits),
|
|
"exclusiveMaximum": (10**arg.integer_digits),
|
|
}
|
|
|
|
@_metadata_to_schema.register
|
|
def _(self, arg: MinLength) -> Schema:
|
|
return {"minLength": arg.value}
|
|
|
|
@_metadata_to_schema.register
|
|
def _(self, arg: MaxLength) -> Schema:
|
|
return {"maxLength": arg.value}
|
|
|
|
def _with_metadata(
|
|
self, type_schema: Schema, metadata: Optional[Tuple[Any, ...]]
|
|
) -> Schema:
|
|
if metadata:
|
|
for m in metadata:
|
|
type_schema.update(self._metadata_to_schema(m))
|
|
return type_schema
|
|
|
|
def _simple_type_to_schema(self, typ: TypeLike) -> Optional[Schema]:
|
|
"""
|
|
Returns the JSON schema associated with a simple, unrestricted type.
|
|
|
|
:returns: The schema for a simple type, or `None`.
|
|
"""
|
|
|
|
if typ is type(None):
|
|
return {"type": "null"}
|
|
elif typ is bool:
|
|
return {"type": "boolean"}
|
|
elif typ is int:
|
|
return {"type": "integer"}
|
|
elif typ is float:
|
|
return {"type": "number"}
|
|
elif typ is str:
|
|
return {"type": "string"}
|
|
elif typ is bytes:
|
|
return {"type": "string", "contentEncoding": "base64"}
|
|
elif typ is datetime.datetime:
|
|
# 2018-11-13T20:20:39+00:00
|
|
return {
|
|
"type": "string",
|
|
"format": "date-time",
|
|
}
|
|
elif typ is datetime.date:
|
|
# 2018-11-13
|
|
return {"type": "string", "format": "date"}
|
|
elif typ is datetime.time:
|
|
# 20:20:39+00:00
|
|
return {"type": "string", "format": "time"}
|
|
elif typ is decimal.Decimal:
|
|
return {"type": "number"}
|
|
elif typ is uuid.UUID:
|
|
# f81d4fae-7dec-11d0-a765-00a0c91e6bf6
|
|
return {"type": "string", "format": "uuid"}
|
|
elif typ is Any:
|
|
return {
|
|
"oneOf": [
|
|
{"type": "null"},
|
|
{"type": "boolean"},
|
|
{"type": "number"},
|
|
{"type": "string"},
|
|
{"type": "array"},
|
|
{"type": "object"},
|
|
]
|
|
}
|
|
elif typ is JsonObject:
|
|
return {"type": "object"}
|
|
elif typ is JsonArray:
|
|
return {"type": "array"}
|
|
else:
|
|
# not a simple type
|
|
return None
|
|
|
|
def type_to_schema(self, data_type: TypeLike, force_expand: bool = False) -> Schema:
|
|
"""
|
|
Returns the JSON schema associated with a type.
|
|
|
|
:param data_type: The Python type whose JSON schema to return.
|
|
:param force_expand: Forces a JSON schema to be returned even if the type is registered in the catalog of known types.
|
|
:returns: The JSON schema associated with the type.
|
|
"""
|
|
|
|
# short-circuit for common simple types
|
|
schema = self._simple_type_to_schema(data_type)
|
|
if schema is not None:
|
|
return schema
|
|
|
|
# types registered in the type catalog of well-known types
|
|
type_catalog = JsonSchemaGenerator.type_catalog
|
|
if not force_expand and data_type in type_catalog:
|
|
# user-defined type
|
|
identifier = type_catalog.get(data_type).identifier
|
|
self.types_used.setdefault(identifier, data_type)
|
|
return {"$ref": f"{self.options.definitions_path}{identifier}"}
|
|
|
|
# unwrap annotated types
|
|
metadata = getattr(data_type, "__metadata__", None)
|
|
if metadata is not None:
|
|
# type is Annotated[T, ...]
|
|
typ = typing.get_args(data_type)[0]
|
|
schema = self._simple_type_to_schema(typ)
|
|
if schema is not None:
|
|
# recognize well-known auxiliary types
|
|
fmt = get_auxiliary_format(data_type)
|
|
if fmt is not None:
|
|
schema.update({"format": fmt})
|
|
return schema
|
|
else:
|
|
return self._with_metadata(schema, metadata)
|
|
|
|
else:
|
|
# type is a regular type
|
|
typ = data_type
|
|
|
|
if isinstance(typ, typing.ForwardRef) or isinstance(typ, str):
|
|
if force_expand:
|
|
identifier, true_type = type_from_ref(typ)
|
|
return self.type_to_schema(true_type, force_expand=True)
|
|
else:
|
|
try:
|
|
identifier, true_type = type_from_ref(typ)
|
|
self.types_used[identifier] = true_type
|
|
except NameError:
|
|
identifier = id_from_ref(typ)
|
|
|
|
return {"$ref": f"{self.options.definitions_path}{identifier}"}
|
|
|
|
if is_type_enum(typ):
|
|
enum_type: Type[enum.Enum] = typ
|
|
value_types = enum_value_types(enum_type)
|
|
if len(value_types) != 1:
|
|
raise ValueError(
|
|
f"enumerations must have a consistent member value type but several types found: {value_types}"
|
|
)
|
|
enum_value_type = value_types.pop()
|
|
|
|
enum_schema: Schema
|
|
if (
|
|
enum_value_type is bool
|
|
or enum_value_type is int
|
|
or enum_value_type is float
|
|
or enum_value_type is str
|
|
):
|
|
if enum_value_type is bool:
|
|
enum_schema_type = "boolean"
|
|
elif enum_value_type is int:
|
|
enum_schema_type = "integer"
|
|
elif enum_value_type is float:
|
|
enum_schema_type = "number"
|
|
elif enum_value_type is str:
|
|
enum_schema_type = "string"
|
|
|
|
enum_schema = {
|
|
"type": enum_schema_type,
|
|
"enum": [object_to_json(e.value) for e in enum_type],
|
|
}
|
|
if self.options.use_descriptions:
|
|
enum_schema.update(docstring_to_schema(typ))
|
|
return enum_schema
|
|
else:
|
|
enum_schema = self.type_to_schema(enum_value_type)
|
|
if self.options.use_descriptions:
|
|
enum_schema.update(docstring_to_schema(typ))
|
|
return enum_schema
|
|
|
|
origin_type = typing.get_origin(typ)
|
|
if origin_type is list:
|
|
(list_type,) = typing.get_args(typ) # unpack single tuple element
|
|
return {"type": "array", "items": self.type_to_schema(list_type)}
|
|
elif origin_type is dict:
|
|
key_type, value_type = typing.get_args(typ)
|
|
if not (key_type is str or key_type is int or is_type_enum(key_type)):
|
|
raise ValueError(
|
|
"`dict` with key type not coercible to `str` is not supported"
|
|
)
|
|
|
|
dict_schema: Schema
|
|
value_schema = self.type_to_schema(value_type)
|
|
if is_type_enum(key_type):
|
|
enum_values = [str(e.value) for e in key_type]
|
|
if len(enum_values) > OBJECT_ENUM_EXPANSION_LIMIT:
|
|
dict_schema = {
|
|
"propertyNames": {
|
|
"pattern": "^(" + "|".join(enum_values) + ")$"
|
|
},
|
|
"additionalProperties": value_schema,
|
|
}
|
|
else:
|
|
dict_schema = {
|
|
"properties": {value: value_schema for value in enum_values},
|
|
"additionalProperties": False,
|
|
}
|
|
else:
|
|
dict_schema = {"additionalProperties": value_schema}
|
|
|
|
schema = {"type": "object"}
|
|
schema.update(dict_schema)
|
|
return schema
|
|
elif origin_type is set:
|
|
(set_type,) = typing.get_args(typ) # unpack single tuple element
|
|
return {
|
|
"type": "array",
|
|
"items": self.type_to_schema(set_type),
|
|
"uniqueItems": True,
|
|
}
|
|
elif origin_type is tuple:
|
|
args = typing.get_args(typ)
|
|
return {
|
|
"type": "array",
|
|
"minItems": len(args),
|
|
"maxItems": len(args),
|
|
"prefixItems": [
|
|
self.type_to_schema(member_type) for member_type in args
|
|
],
|
|
}
|
|
elif origin_type is Union:
|
|
discriminator = None
|
|
if typing.get_origin(data_type) is Annotated:
|
|
discriminator = typing.get_args(data_type)[1].discriminator
|
|
ret = {
|
|
"oneOf": [
|
|
self.type_to_schema(union_type)
|
|
for union_type in typing.get_args(typ)
|
|
]
|
|
}
|
|
if discriminator:
|
|
# for each union type, we need to read the value of the discriminator
|
|
mapping = {}
|
|
for union_type in typing.get_args(typ):
|
|
props = self.type_to_schema(union_type, force_expand=True)[
|
|
"properties"
|
|
]
|
|
mapping[props[discriminator]["default"]] = self.type_to_schema(
|
|
union_type
|
|
)["$ref"]
|
|
|
|
ret["discriminator"] = {
|
|
"propertyName": discriminator,
|
|
"mapping": mapping,
|
|
}
|
|
return ret
|
|
elif origin_type is Literal:
|
|
(literal_value,) = typing.get_args(typ) # unpack value of literal type
|
|
schema = self.type_to_schema(type(literal_value))
|
|
schema["const"] = literal_value
|
|
return schema
|
|
elif origin_type is type:
|
|
(concrete_type,) = typing.get_args(typ) # unpack single tuple element
|
|
return {"const": self.type_to_schema(concrete_type, force_expand=True)}
|
|
|
|
# dictionary of class attributes
|
|
members = dict(inspect.getmembers(typ, lambda a: not inspect.isroutine(a)))
|
|
|
|
property_docstrings = get_class_property_docstrings(
|
|
typ, self.options.property_description_fun
|
|
)
|
|
|
|
properties: Dict[str, Schema] = {}
|
|
required: List[str] = []
|
|
for property_name, property_type in get_class_properties(typ):
|
|
defaults = {}
|
|
if "model_fields" in members:
|
|
f = members["model_fields"]
|
|
defaults = {k: finfo.default for k, finfo in f.items()}
|
|
|
|
# rename property if an alias name is specified
|
|
alias = get_annotation(property_type, Alias)
|
|
if alias:
|
|
output_name = alias.name
|
|
else:
|
|
output_name = property_name
|
|
|
|
if is_type_optional(property_type):
|
|
optional_type: type = unwrap_optional_type(property_type)
|
|
property_def = self.type_to_schema(optional_type)
|
|
else:
|
|
property_def = self.type_to_schema(property_type)
|
|
required.append(output_name)
|
|
|
|
# check if attribute has a default value initializer
|
|
if defaults.get(property_name) is not None:
|
|
def_value = defaults[property_name]
|
|
# check if value can be directly represented in JSON
|
|
if isinstance(
|
|
def_value,
|
|
(
|
|
bool,
|
|
int,
|
|
float,
|
|
str,
|
|
enum.Enum,
|
|
datetime.datetime,
|
|
datetime.date,
|
|
datetime.time,
|
|
),
|
|
):
|
|
property_def["default"] = object_to_json(def_value)
|
|
|
|
# add property docstring if available
|
|
property_doc = property_docstrings.get(property_name)
|
|
if property_doc:
|
|
property_def.pop("title", None)
|
|
property_def["description"] = property_doc
|
|
|
|
properties[output_name] = property_def
|
|
|
|
schema = {"type": "object"}
|
|
if len(properties) > 0:
|
|
schema["properties"] = typing.cast(JsonType, properties)
|
|
schema["additionalProperties"] = False
|
|
if len(required) > 0:
|
|
schema["required"] = typing.cast(JsonType, required)
|
|
if self.options.use_descriptions:
|
|
schema.update(docstring_to_schema(typ))
|
|
return schema
|
|
|
|
def _type_to_schema_with_lookup(self, data_type: TypeLike) -> Schema:
|
|
"""
|
|
Returns the JSON schema associated with a type that may be registered in the catalog of known types.
|
|
|
|
:param data_type: The type whose JSON schema we seek.
|
|
:returns: The JSON schema associated with the type.
|
|
"""
|
|
|
|
entry = JsonSchemaGenerator.type_catalog.get(data_type)
|
|
if entry.schema is None:
|
|
type_schema = self.type_to_schema(data_type, force_expand=True)
|
|
else:
|
|
type_schema = deepcopy(entry.schema)
|
|
|
|
# add descriptive text (if present)
|
|
if self.options.use_descriptions:
|
|
if isinstance(data_type, type) and not isinstance(
|
|
data_type, typing.ForwardRef
|
|
):
|
|
type_schema.update(docstring_to_schema(data_type))
|
|
|
|
# add example (if present)
|
|
if self.options.use_examples and entry.examples:
|
|
type_schema["examples"] = entry.examples
|
|
|
|
return type_schema
|
|
|
|
def classdef_to_schema(
|
|
self, data_type: TypeLike, force_expand: bool = False
|
|
) -> Tuple[Schema, Dict[str, Schema]]:
|
|
"""
|
|
Returns the JSON schema associated with a type and any nested types.
|
|
|
|
:param data_type: The type whose JSON schema to return.
|
|
:param force_expand: True if a full JSON schema is to be returned even for well-known types; false if a schema
|
|
reference is to be used for well-known types.
|
|
:returns: A tuple of the JSON schema, and a mapping between nested type names and their corresponding schema.
|
|
"""
|
|
|
|
if not is_type_like(data_type):
|
|
raise TypeError(f"expected a type-like object but got: {data_type}")
|
|
|
|
self.types_used = {}
|
|
try:
|
|
type_schema = self.type_to_schema(data_type, force_expand=force_expand)
|
|
|
|
types_defined: Dict[str, Schema] = {}
|
|
while len(self.types_used) > len(types_defined):
|
|
# make a snapshot copy; original collection is going to be modified
|
|
types_undefined = {
|
|
sub_name: sub_type
|
|
for sub_name, sub_type in self.types_used.items()
|
|
if sub_name not in types_defined
|
|
}
|
|
|
|
# expand undefined types, which may lead to additional types to be defined
|
|
for sub_name, sub_type in types_undefined.items():
|
|
types_defined[sub_name] = self._type_to_schema_with_lookup(sub_type)
|
|
|
|
type_definitions = dict(sorted(types_defined.items()))
|
|
finally:
|
|
self.types_used = {}
|
|
|
|
return type_schema, type_definitions
|
|
|
|
|
|
class Validator(enum.Enum):
|
|
"Defines constants for JSON schema standards."
|
|
|
|
Draft7 = jsonschema.Draft7Validator
|
|
Draft201909 = jsonschema.Draft201909Validator
|
|
Draft202012 = jsonschema.Draft202012Validator
|
|
Latest = jsonschema.Draft202012Validator
|
|
|
|
|
|
def classdef_to_schema(
|
|
data_type: TypeLike,
|
|
options: Optional[SchemaOptions] = None,
|
|
validator: Validator = Validator.Latest,
|
|
) -> Schema:
|
|
"""
|
|
Returns the JSON schema corresponding to the given type.
|
|
|
|
:param data_type: The Python type used to generate the JSON schema
|
|
:returns: A JSON object that you can serialize to a JSON string with json.dump or json.dumps
|
|
:raises TypeError: Indicates that the generated JSON schema does not validate against the desired meta-schema.
|
|
"""
|
|
|
|
# short-circuit with an error message when passing invalid data
|
|
if not is_type_like(data_type):
|
|
raise TypeError(f"expected a type-like object but got: {data_type}")
|
|
|
|
generator = JsonSchemaGenerator(options)
|
|
type_schema, type_definitions = generator.classdef_to_schema(data_type)
|
|
|
|
class_schema: Schema = {}
|
|
if type_definitions:
|
|
class_schema["definitions"] = typing.cast(JsonType, type_definitions)
|
|
class_schema.update(type_schema)
|
|
|
|
validator_id = validator.value.META_SCHEMA["$id"]
|
|
try:
|
|
validator.value.check_schema(class_schema)
|
|
except jsonschema.exceptions.SchemaError:
|
|
raise TypeError(
|
|
f"schema does not validate against meta-schema <{validator_id}>"
|
|
)
|
|
|
|
schema = {"$schema": validator_id}
|
|
schema.update(class_schema)
|
|
return schema
|
|
|
|
|
|
def validate_object(data_type: TypeLike, json_dict: JsonType) -> None:
|
|
"""
|
|
Validates if the JSON dictionary object conforms to the expected type.
|
|
|
|
:param data_type: The type to match against.
|
|
:param json_dict: A JSON object obtained with `json.load` or `json.loads`.
|
|
:raises jsonschema.exceptions.ValidationError: Indicates that the JSON object cannot represent the type.
|
|
"""
|
|
|
|
schema_dict = classdef_to_schema(data_type)
|
|
jsonschema.validate(
|
|
json_dict, schema_dict, format_checker=jsonschema.FormatChecker()
|
|
)
|
|
|
|
|
|
def print_schema(data_type: type) -> None:
|
|
"""Pretty-prints the JSON schema corresponding to the type."""
|
|
|
|
s = classdef_to_schema(data_type)
|
|
print(json.dumps(s, indent=4))
|
|
|
|
|
|
def get_schema_identifier(data_type: type) -> Optional[str]:
|
|
if data_type in JsonSchemaGenerator.type_catalog:
|
|
return JsonSchemaGenerator.type_catalog.get(data_type).identifier
|
|
else:
|
|
return None
|
|
|
|
|
|
def register_schema(
|
|
data_type: T,
|
|
schema: Optional[Schema] = None,
|
|
name: Optional[str] = None,
|
|
examples: Optional[List[JsonType]] = None,
|
|
) -> T:
|
|
"""
|
|
Associates a type with a JSON schema definition.
|
|
|
|
:param data_type: The type to associate with a JSON schema.
|
|
:param schema: The schema to associate the type with. Derived automatically if omitted.
|
|
:param name: The name used for looking uo the type. Determined automatically if omitted.
|
|
:returns: The input type.
|
|
"""
|
|
|
|
JsonSchemaGenerator.type_catalog.add(
|
|
data_type,
|
|
schema,
|
|
name if name is not None else python_type_to_name(data_type),
|
|
examples,
|
|
)
|
|
return data_type
|
|
|
|
|
|
@overload
|
|
def json_schema_type(cls: Type[T], /) -> Type[T]: ...
|
|
|
|
|
|
@overload
|
|
def json_schema_type(
|
|
cls: None, *, schema: Optional[Schema] = None
|
|
) -> Callable[[Type[T]], Type[T]]: ...
|
|
|
|
|
|
def json_schema_type(
|
|
cls: Optional[Type[T]] = None,
|
|
*,
|
|
schema: Optional[Schema] = None,
|
|
examples: Optional[List[JsonType]] = None,
|
|
) -> Union[Type[T], Callable[[Type[T]], Type[T]]]:
|
|
"""Decorator to add user-defined schema definition to a class."""
|
|
|
|
def wrap(cls: Type[T]) -> Type[T]:
|
|
return register_schema(cls, schema, examples=examples)
|
|
|
|
# see if decorator is used as @json_schema_type or @json_schema_type()
|
|
if cls is None:
|
|
# called with parentheses
|
|
return wrap
|
|
else:
|
|
# called as @json_schema_type without parentheses
|
|
return wrap(cls)
|
|
|
|
|
|
register_schema(JsonObject, name="JsonObject")
|
|
register_schema(JsonArray, name="JsonArray")
|
|
|
|
register_schema(
|
|
JsonType,
|
|
name="JsonType",
|
|
examples=[
|
|
{
|
|
"property1": None,
|
|
"property2": True,
|
|
"property3": 64,
|
|
"property4": "string",
|
|
"property5": ["item"],
|
|
"property6": {"key": "value"},
|
|
}
|
|
],
|
|
)
|
|
register_schema(
|
|
StrictJsonType,
|
|
name="StrictJsonType",
|
|
examples=[
|
|
{
|
|
"property1": True,
|
|
"property2": 64,
|
|
"property3": "string",
|
|
"property4": ["item"],
|
|
"property5": {"key": "value"},
|
|
}
|
|
],
|
|
)
|