mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-27 03:34:10 +00:00
test: initial test to enforce all functions in user_api_key_auth.py h… (#7797)
* test: initial test to enforce all functions in user_api_key_auth.py have direct testing * test(test_user_api_key_auth.py): add is_allowed_route unit test * test(test_user_api_key_auth.py): add more tests * test(test_user_api_key_auth.py): add complete testing coverage for all functions in `user_api_key_auth.py` * test(test_db_schema_changes.py): add a unit test to ensure all db schema changes are backwards compatible gives user an easy rollback path * test: fix schema compatibility test filepath * test: fix test
This commit is contained in:
parent
6473f9ad02
commit
fbdd88d79c
6 changed files with 417 additions and 5 deletions
115
tests/proxy_unit_tests/test_db_schema_changes.py
Normal file
115
tests/proxy_unit_tests/test_db_schema_changes.py
Normal file
|
@ -0,0 +1,115 @@
|
|||
import pytest
|
||||
import subprocess
|
||||
import re
|
||||
from typing import Dict, List, Set
|
||||
|
||||
|
||||
def get_schema_from_branch(branch: str = "main") -> str:
|
||||
"""Get schema from specified git branch"""
|
||||
result = subprocess.run(
|
||||
["git", "show", f"{branch}:schema.prisma"], capture_output=True, text=True
|
||||
)
|
||||
return result.stdout
|
||||
|
||||
|
||||
def parse_model_fields(schema: str) -> Dict[str, Dict[str, str]]:
|
||||
"""Parse Prisma schema into dict of models and their fields"""
|
||||
models = {}
|
||||
current_model = None
|
||||
|
||||
for line in schema.split("\n"):
|
||||
line = line.strip()
|
||||
|
||||
# Find model definition
|
||||
if line.startswith("model "):
|
||||
current_model = line.split(" ")[1]
|
||||
models[current_model] = {}
|
||||
continue
|
||||
|
||||
# Inside model definition
|
||||
if current_model and line and not line.startswith("}"):
|
||||
# Split field definition into name and type
|
||||
parts = line.split()
|
||||
if len(parts) >= 2:
|
||||
field_name = parts[0]
|
||||
field_type = " ".join(parts[1:])
|
||||
models[current_model][field_name] = field_type
|
||||
|
||||
# End of model definition
|
||||
if line.startswith("}"):
|
||||
current_model = None
|
||||
|
||||
return models
|
||||
|
||||
|
||||
def check_breaking_changes(
|
||||
old_schema: Dict[str, Dict[str, str]], new_schema: Dict[str, Dict[str, str]]
|
||||
) -> List[str]:
|
||||
"""Check for breaking changes between schemas"""
|
||||
breaking_changes = []
|
||||
|
||||
# Check each model in old schema
|
||||
for model_name, old_fields in old_schema.items():
|
||||
if model_name not in new_schema:
|
||||
breaking_changes.append(f"Breaking: Model {model_name} was removed")
|
||||
continue
|
||||
|
||||
new_fields = new_schema[model_name]
|
||||
|
||||
# Check each field in old model
|
||||
for field_name, old_type in old_fields.items():
|
||||
if field_name not in new_fields:
|
||||
breaking_changes.append(
|
||||
f"Breaking: Field {model_name}.{field_name} was removed"
|
||||
)
|
||||
continue
|
||||
|
||||
new_type = new_fields[field_name]
|
||||
|
||||
# Check for type changes
|
||||
if old_type != new_type:
|
||||
# Check specific type changes that are breaking
|
||||
if "?" in old_type and "?" not in new_type:
|
||||
breaking_changes.append(
|
||||
f"Breaking: Field {model_name}.{field_name} changed from optional to required"
|
||||
)
|
||||
if not old_type.startswith(new_type.split("?")[0]):
|
||||
breaking_changes.append(
|
||||
f"Breaking: Field {model_name}.{field_name} changed type from {old_type} to {new_type}"
|
||||
)
|
||||
|
||||
return breaking_changes
|
||||
|
||||
|
||||
def test_aaaaaschema_compatibility():
|
||||
"""Test if current schema has breaking changes compared to main"""
|
||||
import os
|
||||
|
||||
print("Current directory:", os.getcwd())
|
||||
|
||||
# Get schemas
|
||||
old_schema = get_schema_from_branch("main")
|
||||
with open("./schema.prisma", "r") as f:
|
||||
new_schema = f.read()
|
||||
|
||||
# Parse schemas
|
||||
old_models = parse_model_fields(old_schema)
|
||||
new_models = parse_model_fields(new_schema)
|
||||
|
||||
# Check for breaking changes
|
||||
breaking_changes = check_breaking_changes(old_models, new_models)
|
||||
|
||||
# Fail if breaking changes found
|
||||
if breaking_changes:
|
||||
pytest.fail("\n".join(breaking_changes))
|
||||
|
||||
# Print informational diff
|
||||
print("\nNon-breaking changes detected:")
|
||||
for model_name, new_fields in new_models.items():
|
||||
if model_name not in old_models:
|
||||
print(f"Added new model: {model_name}")
|
||||
continue
|
||||
|
||||
for field_name, new_type in new_fields.items():
|
||||
if field_name not in old_models[model_name]:
|
||||
print(f"Added new field: {model_name}.{field_name}")
|
Loading…
Add table
Add a link
Reference in a new issue