Ollama ssl verify = False + Spend Logs reliability fixes (#7931)
All checks were successful
Read Version from pyproject.toml / read-version (push) Successful in 13s

* fix(http_handler.py): support passing ssl verify dynamically and using the correct httpx client based on passed ssl verify param

Fixes https://github.com/BerriAI/litellm/issues/6499

* feat(llm_http_handler.py): support passing `ssl_verify=False` dynamically in call args

Closes https://github.com/BerriAI/litellm/issues/6499

* fix(proxy/utils.py): prevent bad logs from breaking all cost tracking + reset list regardless of success/failure

prevents malformed logs from causing all spend tracking to break since they're constantly retried

* test(test_proxy_utils.py): add test to ensure bad log is dropped

* test(test_proxy_utils.py): ensure in-memory spend logs reset after bad log error

* test(test_user_api_key_auth.py): add unit test to ensure end user id as str works

* fix(auth_utils.py): ensure extracted end user id is always a str

prevents db cost tracking errors

* test(test_auth_utils.py): ensure get end user id from request body always returns a string

* test: update tests

* test: skip bedrock test- behaviour now supported

* test: fix testing

* refactor(spend_tracking_utils.py): reduce size of get_logging_payload

* test: fix test

* bump: version 1.59.4 → 1.59.5

* Revert "bump: version 1.59.4 → 1.59.5"

This reverts commit 1182b46b2e.

* fix(utils.py): fix spend logs retry logic

* fix(spend_tracking_utils.py): fix get tags

* fix(spend_tracking_utils.py): fix end user id spend tracking on pass-through endpoints
This commit is contained in:
Krish Dholakia 2025-01-23 23:05:41 -08:00 committed by GitHub
parent 851b0c4c4d
commit 1e011b66d3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 406 additions and 187 deletions

View file

@ -2432,6 +2432,126 @@ async def reset_budget(prisma_client: PrismaClient):
)
class ProxyUpdateSpend:
@staticmethod
async def update_end_user_spend(
n_retry_times: int, prisma_client: PrismaClient, proxy_logging_obj: ProxyLogging
):
for i in range(n_retry_times + 1):
start_time = time.time()
try:
async with prisma_client.db.tx(
timeout=timedelta(seconds=60)
) as transaction:
async with transaction.batch_() as batcher:
for (
end_user_id,
response_cost,
) in prisma_client.end_user_list_transactons.items():
if litellm.max_end_user_budget is not None:
pass
batcher.litellm_endusertable.upsert(
where={"user_id": end_user_id},
data={
"create": {
"user_id": end_user_id,
"spend": response_cost,
"blocked": False,
},
"update": {"spend": {"increment": response_cost}},
},
)
break
except DB_CONNECTION_ERROR_TYPES as e:
if i >= n_retry_times: # If we've reached the maximum number of retries
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)
# Optionally, sleep for a bit before retrying
await asyncio.sleep(2**i) # Exponential backoff
except Exception as e:
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)
finally:
prisma_client.end_user_list_transactons = (
{}
) # reset the end user list transactions - prevent bad data from causing issues
@staticmethod
async def update_spend_logs(
n_retry_times: int,
prisma_client: PrismaClient,
db_writer_client: Optional[HTTPHandler],
proxy_logging_obj: ProxyLogging,
):
BATCH_SIZE = 100 # Preferred size of each batch to write to the database
MAX_LOGS_PER_INTERVAL = (
1000 # Maximum number of logs to flush in a single interval
)
# Get initial logs to process
logs_to_process = prisma_client.spend_log_transactions[:MAX_LOGS_PER_INTERVAL]
start_time = time.time()
try:
for i in range(n_retry_times + 1):
try:
base_url = os.getenv("SPEND_LOGS_URL", None)
if (
len(logs_to_process) > 0
and base_url is not None
and db_writer_client is not None
):
if not base_url.endswith("/"):
base_url += "/"
verbose_proxy_logger.debug("base_url: {}".format(base_url))
response = await db_writer_client.post(
url=base_url + "spend/update",
data=json.dumps(logs_to_process),
headers={"Content-Type": "application/json"},
)
if response.status_code == 200:
prisma_client.spend_log_transactions = (
prisma_client.spend_log_transactions[
len(logs_to_process) :
]
)
else:
for j in range(0, len(logs_to_process), BATCH_SIZE):
batch = logs_to_process[j : j + BATCH_SIZE]
batch_with_dates = [
prisma_client.jsonify_object({**entry})
for entry in batch
]
await prisma_client.db.litellm_spendlogs.create_many(
data=batch_with_dates, skip_duplicates=True
)
verbose_proxy_logger.debug(
f"Flushed {len(batch)} logs to the DB."
)
prisma_client.spend_log_transactions = (
prisma_client.spend_log_transactions[len(logs_to_process) :]
)
verbose_proxy_logger.debug(
f"{len(logs_to_process)} logs processed. Remaining in queue: {len(prisma_client.spend_log_transactions)}"
)
break
except DB_CONNECTION_ERROR_TYPES:
if i is None:
i = 0
if i >= n_retry_times:
raise
await asyncio.sleep(2**i)
except Exception as e:
prisma_client.spend_log_transactions = prisma_client.spend_log_transactions[
len(logs_to_process) :
]
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)
async def update_spend( # noqa: PLR0915
prisma_client: PrismaClient,
db_writer_client: Optional[HTTPHandler],
@ -2490,47 +2610,11 @@ async def update_spend( # noqa: PLR0915
)
)
if len(prisma_client.end_user_list_transactons.keys()) > 0:
for i in range(n_retry_times + 1):
start_time = time.time()
try:
async with prisma_client.db.tx(
timeout=timedelta(seconds=60)
) as transaction:
async with transaction.batch_() as batcher:
for (
end_user_id,
response_cost,
) in prisma_client.end_user_list_transactons.items():
if litellm.max_end_user_budget is not None:
pass
batcher.litellm_endusertable.upsert(
where={"user_id": end_user_id},
data={
"create": {
"user_id": end_user_id,
"spend": response_cost,
"blocked": False,
},
"update": {"spend": {"increment": response_cost}},
},
)
prisma_client.end_user_list_transactons = (
{}
) # Clear the remaining transactions after processing all batches in the loop.
break
except DB_CONNECTION_ERROR_TYPES as e:
if i >= n_retry_times: # If we've reached the maximum number of retries
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)
# Optionally, sleep for a bit before retrying
await asyncio.sleep(2**i) # Exponential backoff
except Exception as e:
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)
await ProxyUpdateSpend.update_end_user_spend(
n_retry_times=n_retry_times,
prisma_client=prisma_client,
proxy_logging_obj=proxy_logging_obj,
)
### UPDATE KEY TABLE ###
verbose_proxy_logger.debug(
"KEY Spend transactions: {}".format(
@ -2687,80 +2771,13 @@ async def update_spend( # noqa: PLR0915
"Spend Logs transactions: {}".format(len(prisma_client.spend_log_transactions))
)
BATCH_SIZE = 100 # Preferred size of each batch to write to the database
MAX_LOGS_PER_INTERVAL = 1000 # Maximum number of logs to flush in a single interval
if len(prisma_client.spend_log_transactions) > 0:
for i in range(n_retry_times + 1):
start_time = time.time()
try:
base_url = os.getenv("SPEND_LOGS_URL", None)
## WRITE TO SEPARATE SERVER ##
if (
len(prisma_client.spend_log_transactions) > 0
and base_url is not None
and db_writer_client is not None
):
if not base_url.endswith("/"):
base_url += "/"
verbose_proxy_logger.debug("base_url: {}".format(base_url))
response = await db_writer_client.post(
url=base_url + "spend/update",
data=json.dumps(prisma_client.spend_log_transactions), # type: ignore
headers={"Content-Type": "application/json"},
)
if response.status_code == 200:
prisma_client.spend_log_transactions = []
else: ## (default) WRITE TO DB ##
logs_to_process = prisma_client.spend_log_transactions[
:MAX_LOGS_PER_INTERVAL
]
for j in range(0, len(logs_to_process), BATCH_SIZE):
# Create sublist for current batch, ensuring it doesn't exceed the BATCH_SIZE
batch = logs_to_process[j : j + BATCH_SIZE]
# Convert datetime strings to Date objects
batch_with_dates = [
prisma_client.jsonify_object(
{
**entry,
}
)
for entry in batch
]
await prisma_client.db.litellm_spendlogs.create_many(
data=batch_with_dates, skip_duplicates=True # type: ignore
)
verbose_proxy_logger.debug(
f"Flushed {len(batch)} logs to the DB."
)
# Remove the processed logs from spend_logs
prisma_client.spend_log_transactions = (
prisma_client.spend_log_transactions[len(logs_to_process) :]
)
verbose_proxy_logger.debug(
f"{len(logs_to_process)} logs processed. Remaining in queue: {len(prisma_client.spend_log_transactions)}"
)
break
except DB_CONNECTION_ERROR_TYPES as e:
if i is None:
i = 0
if (
i >= n_retry_times
): # If we've reached the maximum number of retries raise the exception
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)
# Optionally, sleep for a bit before retrying
await asyncio.sleep(2**i) # type: ignore
except Exception as e:
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)
await ProxyUpdateSpend.update_spend_logs(
n_retry_times=n_retry_times,
prisma_client=prisma_client,
proxy_logging_obj=proxy_logging_obj,
db_writer_client=db_writer_client,
)
def _raise_failed_update_spend_exception(