# What this tests ? ## Tests /user endpoints. import pytest import asyncio import aiohttp import time from openai import AsyncOpenAI from test_team import list_teams from typing import Optional from test_keys import generate_key from fastapi import HTTPException async def new_user( session, i, user_id=None, budget=None, budget_duration=None, models=None ): url = "http://0.0.0.0:4000/user/new" headers = {"Authorization": "Bearer sk-1234", "Content-Type": "application/json"} data = { "models": models or ["azure-models"], "aliases": {"mistral-7b": "gpt-3.5-turbo"}, "duration": None, "max_budget": budget, "budget_duration": budget_duration, } if user_id is not None: data["user_id"] = user_id async with session.post(url, headers=headers, json=data) as response: status = response.status response_text = await response.text() print(f"Response {i} (Status code: {status}):") print(response_text) print() if status != 200: raise Exception(f"Request {i} did not return a 200 status code: {status}") return await response.json() async def generate_key( session, i, budget=None, budget_duration=None, models=["azure-models", "gpt-4", "dall-e-3"], max_parallel_requests: Optional[int] = None, user_id: Optional[str] = None, team_id: Optional[str] = None, metadata: Optional[dict] = None, calling_key="sk-1234", ): url = "http://0.0.0.0:4000/key/generate" headers = { "Authorization": f"Bearer {calling_key}", "Content-Type": "application/json", } data = { "models": models, "aliases": {"mistral-7b": "gpt-3.5-turbo"}, "duration": None, "max_budget": budget, "budget_duration": budget_duration, "max_parallel_requests": max_parallel_requests, "user_id": user_id, "team_id": team_id, "metadata": metadata, } print(f"data: {data}") async with session.post(url, headers=headers, json=data) as response: status = response.status response_text = await response.text() print(f"Response {i} (Status code: {status}):") print(response_text) print() if status != 200: raise Exception(f"Request {i} did not return a 200 status code: {status}") return await response.json() @pytest.mark.asyncio async def test_user_new(): """ Make 20 parallel calls to /user/new. Assert all worked. """ async with aiohttp.ClientSession() as session: tasks = [new_user(session, i) for i in range(1, 11)] await asyncio.gather(*tasks) async def get_user_info(session, get_user, call_user, view_all: Optional[bool] = None): """ Make sure only models user has access to are returned """ if view_all is True: url = "http://0.0.0.0:4000/user/info" else: url = f"http://0.0.0.0:4000/user/info?user_id={get_user}" headers = { "Authorization": f"Bearer {call_user}", "Content-Type": "application/json", } async with session.get(url, headers=headers) as response: status = response.status response_text = await response.text() print(response_text) print() if status != 200: if call_user != get_user: return status else: print(f"call_user: {call_user}; get_user: {get_user}") raise Exception(f"Request did not return a 200 status code: {status}") return await response.json() @pytest.mark.asyncio async def test_user_info(): """ Get user info - as admin - as user themself - as random """ get_user = f"krrish_{time.time()}@berri.ai" async with aiohttp.ClientSession() as session: key_gen = await new_user(session, 0, user_id=get_user) key = key_gen["key"] ## as admin ## resp = await get_user_info( session=session, get_user=get_user, call_user="sk-1234" ) assert isinstance(resp["user_info"], dict) assert len(resp["user_info"]) > 0 ## as user themself ## resp = await get_user_info(session=session, get_user=get_user, call_user=key) assert isinstance(resp["user_info"], dict) assert len(resp["user_info"]) > 0 # as random user # key_gen = await new_user(session=session, i=0) random_key = key_gen["key"] status = await get_user_info( session=session, get_user=get_user, call_user=random_key ) assert status == 403 @pytest.mark.asyncio async def test_user_update(): """ Create user Update user access to new model Make chat completion call """ pass @pytest.mark.skip(reason="Frequent check on ci/cd leads to read timeout issue.") @pytest.mark.asyncio async def test_users_budgets_reset(): """ - Create key with budget and 5s duration - Get 'reset_at' value - wait 5s - Check if value updated """ get_user = f"krrish_{time.time()}@berri.ai" async with aiohttp.ClientSession() as session: key_gen = await new_user( session, 0, user_id=get_user, budget=10, budget_duration="5s" ) key = key_gen["key"] user_info = await get_user_info( session=session, get_user=get_user, call_user=key ) reset_at_init_value = user_info["user_info"]["budget_reset_at"] i = 0 reset_at_new_value = None while i < 3: await asyncio.sleep(70) user_info = await get_user_info( session=session, get_user=get_user, call_user=key ) reset_at_new_value = user_info["user_info"]["budget_reset_at"] try: assert reset_at_init_value != reset_at_new_value break except Exception: i + 1 assert reset_at_init_value != reset_at_new_value async def chat_completion(session, key, model="gpt-4"): client = AsyncOpenAI(api_key=key, base_url="http://0.0.0.0:4000") messages = [ {"role": "system", "content": "You are a helpful assistant"}, {"role": "user", "content": f"Hello! {time.time()}"}, ] data = { "model": model, "messages": messages, } response = await client.chat.completions.create(**data) async def chat_completion_streaming(session, key, model="gpt-4"): client = AsyncOpenAI(api_key=key, base_url="http://0.0.0.0:4000") messages = [ {"role": "system", "content": "You are a helpful assistant"}, {"role": "user", "content": f"Hello! {time.time()}"}, ] data = {"model": model, "messages": messages, "stream": True} response = await client.chat.completions.create(**data) async for chunk in response: continue @pytest.mark.skip(reason="Global proxy now tracked via `/global/spend/logs`") @pytest.mark.asyncio async def test_global_proxy_budget_update(): """ - Get proxy current spend - Make chat completion call (normal) - Assert spend increased - Make chat completion call (streaming) - Assert spend increased """ get_user = f"litellm-proxy-budget" async with aiohttp.ClientSession() as session: user_info = await get_user_info( session=session, get_user=get_user, call_user="sk-1234" ) original_spend = user_info["user_info"]["spend"] await chat_completion(session=session, key="sk-1234") await asyncio.sleep(5) # let db update user_info = await get_user_info( session=session, get_user=get_user, call_user="sk-1234" ) new_spend = user_info["user_info"]["spend"] print(f"new_spend: {new_spend}; original_spend: {original_spend}") assert new_spend > original_spend await chat_completion_streaming(session=session, key="sk-1234") await asyncio.sleep(5) # let db update user_info = await get_user_info( session=session, get_user=get_user, call_user="sk-1234" ) new_new_spend = user_info["user_info"]["spend"] print(f"new_spend: {new_spend}; original_spend: {original_spend}") assert new_new_spend > new_spend @pytest.mark.asyncio async def test_user_model_access(): """ - Create user with model access - Create key with user - Call model that user has access to -> should work - Call wildcard model that user has access to -> should work - Call model that user does not have access to -> should fail - Call wildcard model that user does not have access to -> should fail """ import openai async with aiohttp.ClientSession() as session: get_user = f"krrish_{time.time()}@berri.ai" await new_user( session=session, i=0, user_id=get_user, models=["good-model", "anthropic/*"], ) result = await generate_key( session=session, i=0, user_id=get_user, models=[], # assign no models. Allow inheritance from user ) key = result["key"] await chat_completion( session=session, key=key, model="anthropic/claude-3-5-haiku-20241022", ) await chat_completion( session=session, key=key, model="good-model", ) with pytest.raises(openai.AuthenticationError): await chat_completion( session=session, key=key, model="bedrock/anthropic.claude-3-sonnet-20240229-v1:0", ) with pytest.raises(openai.AuthenticationError): await chat_completion( session=session, key=key, model="groq/claude-3-5-haiku-20241022", ) import json import uuid import pytest import aiohttp from typing import Dict, Tuple async def setup_test_users(session: aiohttp.ClientSession) -> Tuple[Dict, Dict]: """ Create two test users and an additional key for the first user. Returns tuple of (user1_data, user2_data) where each contains user info and keys. """ # Create two test users user1 = await new_user( session=session, i=0, budget=100, budget_duration="30d", models=["anthropic.claude-3-5-sonnet-20240620-v1:0"], ) user2 = await new_user( session=session, i=1, budget=100, budget_duration="30d", models=["anthropic.claude-3-5-sonnet-20240620-v1:0"], ) print("\nCreated two test users:") print(f"User 1 ID: {user1['user_id']}") print(f"User 2 ID: {user2['user_id']}") # Create an additional key for user1 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {user1['key']}", } key_payload = { "user_id": user1["user_id"], "duration": "7d", "key_alias": f"test_key_{uuid.uuid4()}", "models": ["anthropic.claude-3-5-sonnet-20240620-v1:0"], } print("\nGenerating additional key for user1...") key_response = await session.post( f"http://0.0.0.0:4000/key/generate", headers=headers, json=key_payload ) assert key_response.status == 200, "Failed to generate additional key for user1" user1_additional_key = await key_response.json() print(f"\nGenerated key details:") print(json.dumps(user1_additional_key, indent=2)) # Return both users' data including the additional key return { "user_data": user1, "additional_key": user1_additional_key, "headers": headers, }, { "user_data": user2, "headers": { "Content-Type": "application/json", "Authorization": f"Bearer {user2['key']}", }, } async def print_response_details(response: aiohttp.ClientResponse) -> None: """Helper function to print response details""" print("\nResponse Details:") print(f"Status Code: {response.status}") print("\nResponse Content:") try: formatted_json = json.dumps(await response.json(), indent=2) print(formatted_json) except json.JSONDecodeError: print(await response.text()) @pytest.mark.asyncio async def test_key_update_user_isolation(): """Test that a user cannot update a key that belongs to another user""" async with aiohttp.ClientSession() as session: user1_data, user2_data = await setup_test_users(session) # Try to update the key to belong to user2 update_payload = { "key": user1_data["additional_key"]["key"], "user_id": user2_data["user_data"][ "user_id" ], # Attempting to change ownership "metadata": {"purpose": "testing_user_isolation", "environment": "test"}, } print("\nAttempting to update key ownership to user2...") update_response = await session.post( f"http://0.0.0.0:4000/key/update", headers=user1_data["headers"], # Using user1's headers json=update_payload, ) await print_response_details(update_response) # Verify update attempt was rejected assert ( update_response.status == 403 ), "Request should have been rejected with 403 status code" @pytest.mark.asyncio async def test_key_delete_user_isolation(): """Test that a user cannot delete a key that belongs to another user""" async with aiohttp.ClientSession() as session: user1_data, user2_data = await setup_test_users(session) # Try to delete user1's additional key using user2's credentials delete_payload = { "keys": [user1_data["additional_key"]["key"]], } print("\nAttempting to delete user1's key using user2's credentials...") delete_response = await session.post( f"http://0.0.0.0:4000/key/delete", headers=user2_data["headers"], json=delete_payload, ) await print_response_details(delete_response) # Verify delete attempt was rejected assert ( delete_response.status == 403 ), "Request should have been rejected with 403 status code"