fix(proxy_server.py): prisma client fixes for high traffic

This commit is contained in:
Krrish Dholakia 2024-02-06 17:30:36 -08:00
parent d1549cb2f3
commit b6adeec347
6 changed files with 224 additions and 114 deletions

View file

@ -3,7 +3,7 @@ import logging
set_verbose = False set_verbose = False
# Create a handler for the logger (you may need to adapt this based on your needs) # Create a handler for the logger (you may need to adapt this based on your needs)
handler = logging.StreamHandler() handler = logging.FileHandler("log_file.txt")
handler.setLevel(logging.DEBUG) handler.setLevel(logging.DEBUG)
# Create a formatter and set it for the handler # Create a formatter and set it for the handler

View file

@ -113,7 +113,7 @@ class LangFuseLogger:
elif response_obj is not None: elif response_obj is not None:
input = prompt input = prompt
output = response_obj["choices"][0]["message"].json() output = response_obj["choices"][0]["message"].json()
print(f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}") print_verbose(f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}")
if self._is_langfuse_v2(): if self._is_langfuse_v2():
self._log_langfuse_v2( self._log_langfuse_v2(
user_id, user_id,

View file

@ -391,6 +391,10 @@ class LiteLLM_SpendLogs(LiteLLMBase):
startTime: Union[str, datetime, None] startTime: Union[str, datetime, None]
endTime: Union[str, datetime, None] endTime: Union[str, datetime, None]
user: Optional[str] = "" user: Optional[str] = ""
metadata: Optional[Json] = {} metadata: Optional[dict] = {}
cache_hit: Optional[str] = "False" cache_hit: Optional[str] = "False"
cache_key: Optional[str] = None cache_key: Optional[str] = None
class LiteLLM_SpendLogs_ResponseObject(LiteLLMBase):
response: Optional[List[Union[LiteLLM_SpendLogs, Any]]] = None

View file

@ -432,15 +432,25 @@ async def user_api_key_auth(
# Check 2. If user_id for this token is in budget # Check 2. If user_id for this token is in budget
## Check 2.5 If global proxy is in budget ## Check 2.5 If global proxy is in budget
if valid_token.user_id is not None: if valid_token.user_id is not None:
if prisma_client is not None: user_id_information = user_api_key_cache.get_cache(
user_id_information = await prisma_client.get_data( key=valid_token.user_id
user_id_list=[valid_token.user_id, litellm_proxy_budget_name], )
table_name="user", if user_id_information is None:
query_type="find_all", if prisma_client is not None:
) user_id_information = await prisma_client.get_data(
if custom_db_client is not None: user_id_list=[
user_id_information = await custom_db_client.get_data( valid_token.user_id,
key=valid_token.user_id, table_name="user" litellm_proxy_budget_name,
],
table_name="user",
query_type="find_all",
)
if custom_db_client is not None:
user_id_information = await custom_db_client.get_data(
key=valid_token.user_id, table_name="user"
)
user_api_key_cache.set_cache(
key=valid_token.user_id, value=user_id_information, ttl=600
) )
verbose_proxy_logger.debug( verbose_proxy_logger.debug(
@ -544,7 +554,7 @@ async def user_api_key_auth(
api_key = valid_token.token api_key = valid_token.token
# Add hashed token to cache # Add hashed token to cache
user_api_key_cache.set_cache(key=api_key, value=valid_token, ttl=60) user_api_key_cache.set_cache(key=api_key, value=valid_token, ttl=600)
valid_token_dict = _get_pydantic_json_dict(valid_token) valid_token_dict = _get_pydantic_json_dict(valid_token)
valid_token_dict.pop("token", None) valid_token_dict.pop("token", None)
""" """
@ -837,114 +847,127 @@ async def update_database(
- Update that user's row - Update that user's row
- Update litellm-proxy-budget row (global proxy spend) - Update litellm-proxy-budget row (global proxy spend)
""" """
user_ids = [user_id, litellm_proxy_budget_name] try:
data_list = [] user_ids = [user_id, litellm_proxy_budget_name]
for id in user_ids: data_list = []
if id is None: for id in user_ids:
continue if id is None:
continue
if prisma_client is not None:
existing_spend_obj = await prisma_client.get_data(user_id=id)
elif (
custom_db_client is not None and id != litellm_proxy_budget_name
):
existing_spend_obj = await custom_db_client.get_data(
key=id, table_name="user"
)
verbose_proxy_logger.debug(
f"Updating existing_spend_obj: {existing_spend_obj}"
)
if existing_spend_obj is None:
existing_spend = 0
existing_spend_obj = LiteLLM_UserTable(
user_id=id, spend=0, max_budget=None, user_email=None
)
else:
existing_spend = existing_spend_obj.spend
# Calculate the new cost by adding the existing cost and response_cost
existing_spend_obj.spend = existing_spend + response_cost
verbose_proxy_logger.debug(f"new cost: {existing_spend_obj.spend}")
data_list.append(existing_spend_obj)
# Update the cost column for the given user id
if prisma_client is not None: if prisma_client is not None:
existing_spend_obj = await prisma_client.get_data(user_id=id) await prisma_client.update_data(
elif custom_db_client is not None and id != litellm_proxy_budget_name: data_list=data_list, query_type="update_many", table_name="user"
existing_spend_obj = await custom_db_client.get_data(
key=id, table_name="user"
) )
verbose_proxy_logger.debug( elif custom_db_client is not None and user_id is not None:
f"Updating existing_spend_obj: {existing_spend_obj}" new_spend = data_list[0].spend
) await custom_db_client.update_data(
if existing_spend_obj is None: key=user_id, value={"spend": new_spend}, table_name="user"
existing_spend = 0
existing_spend_obj = LiteLLM_UserTable(
user_id=id, spend=0, max_budget=None, user_email=None
) )
else: except Exception as e:
existing_spend = existing_spend_obj.spend verbose_proxy_logger.info(f"Update User DB call failed to execute")
# Calculate the new cost by adding the existing cost and response_cost
existing_spend_obj.spend = existing_spend + response_cost
verbose_proxy_logger.debug(f"new cost: {existing_spend_obj.spend}")
data_list.append(existing_spend_obj)
# Update the cost column for the given user id
if prisma_client is not None:
await prisma_client.update_data(
data_list=data_list, query_type="update_many", table_name="user"
)
elif custom_db_client is not None and user_id is not None:
new_spend = data_list[0].spend
await custom_db_client.update_data(
key=user_id, value={"spend": new_spend}, table_name="user"
)
### UPDATE KEY SPEND ### ### UPDATE KEY SPEND ###
async def _update_key_db(): async def _update_key_db():
verbose_proxy_logger.debug( try:
f"adding spend to key db. Response cost: {response_cost}. Token: {token}."
)
if prisma_client is not None:
# Fetch the existing cost for the given token
existing_spend_obj = await prisma_client.get_data(token=token)
verbose_proxy_logger.debug( verbose_proxy_logger.debug(
f"_update_key_db: existing spend: {existing_spend_obj}" f"adding spend to key db. Response cost: {response_cost}. Token: {token}."
) )
if existing_spend_obj is None: if prisma_client is not None:
existing_spend = 0 # Fetch the existing cost for the given token
else: existing_spend_obj = await prisma_client.get_data(token=token)
existing_spend = existing_spend_obj.spend verbose_proxy_logger.debug(
# Calculate the new cost by adding the existing cost and response_cost f"_update_key_db: existing spend: {existing_spend_obj}"
new_spend = existing_spend + response_cost )
if existing_spend_obj is None:
existing_spend = 0
else:
existing_spend = existing_spend_obj.spend
# Calculate the new cost by adding the existing cost and response_cost
new_spend = existing_spend + response_cost
verbose_proxy_logger.debug(f"new cost: {new_spend}") verbose_proxy_logger.debug(f"new cost: {new_spend}")
# Update the cost column for the given token # Update the cost column for the given token
await prisma_client.update_data(token=token, data={"spend": new_spend}) await prisma_client.update_data(
token=token, data={"spend": new_spend}
)
valid_token = user_api_key_cache.get_cache(key=token) valid_token = user_api_key_cache.get_cache(key=token)
if valid_token is not None: if valid_token is not None:
valid_token.spend = new_spend valid_token.spend = new_spend
user_api_key_cache.set_cache(key=token, value=valid_token) user_api_key_cache.set_cache(key=token, value=valid_token)
elif custom_db_client is not None: elif custom_db_client is not None:
# Fetch the existing cost for the given token # Fetch the existing cost for the given token
existing_spend_obj = await custom_db_client.get_data( existing_spend_obj = await custom_db_client.get_data(
key=token, table_name="key" key=token, table_name="key"
) )
verbose_proxy_logger.debug( verbose_proxy_logger.debug(
f"_update_key_db existing spend: {existing_spend_obj}" f"_update_key_db existing spend: {existing_spend_obj}"
) )
if existing_spend_obj is None: if existing_spend_obj is None:
existing_spend = 0 existing_spend = 0
else: else:
existing_spend = existing_spend_obj.spend existing_spend = existing_spend_obj.spend
# Calculate the new cost by adding the existing cost and response_cost # Calculate the new cost by adding the existing cost and response_cost
new_spend = existing_spend + response_cost new_spend = existing_spend + response_cost
verbose_proxy_logger.debug(f"new cost: {new_spend}") verbose_proxy_logger.debug(f"new cost: {new_spend}")
# Update the cost column for the given token # Update the cost column for the given token
await custom_db_client.update_data( await custom_db_client.update_data(
key=token, value={"spend": new_spend}, table_name="key" key=token, value={"spend": new_spend}, table_name="key"
) )
valid_token = user_api_key_cache.get_cache(key=token) valid_token = user_api_key_cache.get_cache(key=token)
if valid_token is not None: if valid_token is not None:
valid_token.spend = new_spend valid_token.spend = new_spend
user_api_key_cache.set_cache(key=token, value=valid_token) user_api_key_cache.set_cache(key=token, value=valid_token)
except Exception as e:
verbose_proxy_logger.info(f"Update Key DB Call failed to execute")
### UPDATE SPEND LOGS ### ### UPDATE SPEND LOGS ###
async def _insert_spend_log_to_db(): async def _insert_spend_log_to_db():
# Helper to generate payload to log try:
verbose_proxy_logger.debug("inserting spend log to db") # Helper to generate payload to log
payload = get_logging_payload( verbose_proxy_logger.debug("inserting spend log to db")
kwargs=kwargs, payload = get_logging_payload(
response_obj=completion_response, kwargs=kwargs,
start_time=start_time, response_obj=completion_response,
end_time=end_time, start_time=start_time,
) end_time=end_time,
)
payload["spend"] = response_cost payload["spend"] = response_cost
if prisma_client is not None: if prisma_client is not None:
await prisma_client.insert_data(data=payload, table_name="spend") await prisma_client.insert_data(data=payload, table_name="spend")
elif custom_db_client is not None: elif custom_db_client is not None:
await custom_db_client.insert_data(payload, table_name="spend") await custom_db_client.insert_data(payload, table_name="spend")
except Exception as e:
verbose_proxy_logger.info(f"Update Spend Logs DB failed to execute")
asyncio.create_task(_update_user_db()) asyncio.create_task(_update_user_db())
asyncio.create_task(_update_key_db()) asyncio.create_task(_update_key_db())
@ -1534,7 +1557,7 @@ async def generate_key_helper_fn(
user_api_key_cache.set_cache( user_api_key_cache.set_cache(
key=hashed_token, key=hashed_token,
value=LiteLLM_VerificationToken(**saved_token), # type: ignore value=LiteLLM_VerificationToken(**saved_token), # type: ignore
ttl=60, ttl=600,
) )
if prisma_client is not None: if prisma_client is not None:
## CREATE USER (If necessary) ## CREATE USER (If necessary)
@ -2979,6 +3002,9 @@ async def spend_user_fn(
"/spend/logs", "/spend/logs",
tags=["budget & spend Tracking"], tags=["budget & spend Tracking"],
dependencies=[Depends(user_api_key_auth)], dependencies=[Depends(user_api_key_auth)],
responses={
200: {"model": List[LiteLLM_SpendLogs]},
},
) )
async def view_spend_logs( async def view_spend_logs(
api_key: Optional[str] = fastapi.Query( api_key: Optional[str] = fastapi.Query(
@ -3049,7 +3075,8 @@ async def view_spend_logs(
query_type="find_unique", query_type="find_unique",
key_val={"key": "request_id", "value": request_id}, key_val={"key": "request_id", "value": request_id},
) )
return [spend_log] response = LiteLLM_SpendLogs_ResponseObject(response=[spend_log])
return response
elif user_id is not None: elif user_id is not None:
spend_log = await prisma_client.get_data( spend_log = await prisma_client.get_data(
table_name="spend", table_name="spend",
@ -3065,7 +3092,7 @@ async def view_spend_logs(
table_name="spend", query_type="find_all" table_name="spend", query_type="find_all"
) )
return spend_logs return spend_log
return None return None

View file

@ -490,7 +490,7 @@ async def test_key_crossing_budget():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_key_zinfo_spend_values_sagemaker(): async def test_key_info_spend_values_sagemaker():
""" """
Tests the sync streaming loop to ensure spend is correctly calculated. Tests the sync streaming loop to ensure spend is correctly calculated.
- create key - create key

View file

@ -1,7 +1,7 @@
# What this tests? # What this tests?
## Tests /spend endpoints. ## Tests /spend endpoints.
import pytest import pytest, time, uuid
import asyncio import asyncio
import aiohttp import aiohttp
@ -26,17 +26,17 @@ async def generate_key(session, models=[]):
return await response.json() return await response.json()
async def chat_completion(session, key): async def chat_completion(session, key, model="gpt-3.5-turbo"):
url = "http://0.0.0.0:4000/chat/completions" url = "http://0.0.0.0:4000/chat/completions"
headers = { headers = {
"Authorization": f"Bearer {key}", "Authorization": f"Bearer {key}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
data = { data = {
"model": "gpt-3.5-turbo", "model": model,
"messages": [ "messages": [
{"role": "system", "content": "You are a helpful assistant."}, {"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}, {"role": "user", "content": f"Hello! {uuid.uuid4()}"},
], ],
} }
@ -53,8 +53,37 @@ async def chat_completion(session, key):
return await response.json() return await response.json()
async def get_spend_logs(session, request_id): async def chat_completion_high_traffic(session, key, model="gpt-3.5-turbo"):
url = f"http://0.0.0.0:4000/spend/logs?request_id={request_id}" url = "http://0.0.0.0:4000/chat/completions"
headers = {
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
}
data = {
"model": model,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": f"Hello! {uuid.uuid4()}"},
],
}
try:
async with session.post(url, headers=headers, json=data) as response:
status = response.status
response_text = await response.text()
if status != 200:
raise Exception(f"Request did not return a 200 status code: {status}")
return await response.json()
except Exception as e:
return None
async def get_spend_logs(session, request_id=None, api_key=None):
if api_key is not None:
url = f"http://0.0.0.0:4000/spend/logs?api_key={api_key}"
else:
url = f"http://0.0.0.0:4000/spend/logs?request_id={request_id}"
headers = {"Authorization": "Bearer sk-1234", "Content-Type": "application/json"} headers = {"Authorization": "Bearer sk-1234", "Content-Type": "application/json"}
async with session.get(url, headers=headers) as response: async with session.get(url, headers=headers) as response:
@ -82,3 +111,53 @@ async def test_spend_logs():
response = await chat_completion(session=session, key=key) response = await chat_completion(session=session, key=key)
await asyncio.sleep(5) await asyncio.sleep(5)
await get_spend_logs(session=session, request_id=response["id"]) await get_spend_logs(session=session, request_id=response["id"])
@pytest.mark.asyncio
async def test_spend_logs_high_traffic():
"""
- Create key
- Make 30 concurrent calls
- Get all logs for that key
- Wait 10s
- Assert it's 30
"""
async def retry_request(func, *args, _max_attempts=5, **kwargs):
for attempt in range(_max_attempts):
try:
return await func(*args, **kwargs)
except (
aiohttp.client_exceptions.ClientOSError,
aiohttp.client_exceptions.ServerDisconnectedError,
) as e:
if attempt + 1 == _max_attempts:
raise # re-raise the last ClientOSError if all attempts failed
print(f"Attempt {attempt+1} failed, retrying...")
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=600)
) as session:
start = time.time()
key_gen = await generate_key(session=session)
key = key_gen["key"]
n = 1000
tasks = [
retry_request(
chat_completion_high_traffic,
session=session,
key=key,
model="azure-gpt-3.5",
)
for _ in range(n)
]
chat_completions = await asyncio.gather(*tasks)
successful_completions = [c for c in chat_completions if c is not None]
print(f"Num successful completions: {len(successful_completions)}")
await asyncio.sleep(10)
response = await get_spend_logs(session=session, api_key=key)
print(f"response: {response}")
print(f"len responses: {len(response)}")
assert len(response) == n
print(n, time.time() - start, len(response))
raise Exception("it worked!")