litellm-mirror/litellm/integrations/custom_batch_logger.py
Krish Dholakia 1c8438d475
All checks were successful
Read Version from pyproject.toml / read-version (push) Successful in 11s
Litellm dev 11 30 2024 (#6974)
* feat(cohere/chat.py): return citations in model response

Closes https://github.com/BerriAI/litellm/issues/6814

* fix(cohere/chat.py): fix linting errors

* fix(langsmith.py): support 'run_id' for langsmith

Fixes https://github.com/BerriAI/litellm/issues/6862

* fix(langsmith.py): fix langsmith quickstart

Fixes https://github.com/BerriAI/litellm/issues/6861

* fix: suppress linting error

* LiteLLM Minor Fixes & Improvements (11/29/2024)  (#6965)

* fix(factory.py): ensure tool call converts image url

Fixes https://github.com/BerriAI/litellm/issues/6953

* fix(transformation.py): support mp4 + pdf url's for vertex ai

Fixes https://github.com/BerriAI/litellm/issues/6936

* fix(http_handler.py): mask gemini api key in error logs

Fixes https://github.com/BerriAI/litellm/issues/6963

* docs(prometheus.md): update prometheus FAQs

* feat(auth_checks.py): ensure specific model access > wildcard model access

if wildcard model is in access group, but specific model is not - deny access

* fix(auth_checks.py): handle auth checks for team based model access groups

handles scenario where model access group used for wildcard models

* fix(internal_user_endpoints.py): support adding guardrails on `/user/update`

Fixes https://github.com/BerriAI/litellm/issues/6942

* fix(key_management_endpoints.py): fix prepare_metadata_fields helper

* fix: fix tests

* build(requirements.txt): bump openai dep version

fixes proxies argument

* test: fix tests

* fix(http_handler.py): fix error message masking

* fix(bedrock_guardrails.py): pass in prepped data

* test: fix test

* test: fix nvidia nim test

* fix(http_handler.py): return original response headers

* fix: revert maskedhttpstatuserror

* test: update tests

* test: cleanup test

* fix(key_management_endpoints.py): fix metadata field update logic

* fix(key_management_endpoints.py): maintain initial order of guardrails in key update

* fix(key_management_endpoints.py): handle prepare metadata

* fix: fix linting errors

* fix: fix linting errors

* fix: fix linting errors

* fix: fix key management errors

* fix(key_management_endpoints.py): update metadata

* test: update test

* refactor: add more debug statements

* test: skip flaky test

* test: fix test

* fix: fix test

* fix: fix update metadata logic

* fix: fix test

* ci(config.yml): change db url for e2e ui testing

* test: add more debug logs to langsmith

* fix: test change

* build(config.yml): fix db url

'
2024-12-02 21:03:33 -08:00

59 lines
1.8 KiB
Python

"""
Custom Logger that handles batching logic
Use this if you want your logs to be stored in memory and flushed periodically
"""
import asyncio
import time
from typing import List, Literal, Optional
import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_logger import CustomLogger
class CustomBatchLogger(CustomLogger):
def __init__(
self,
flush_lock: Optional[asyncio.Lock] = None,
batch_size: Optional[int] = None,
flush_interval: Optional[int] = None,
**kwargs,
) -> None:
"""
Args:
flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching
"""
self.log_queue: List = []
self.flush_interval = flush_interval or litellm.DEFAULT_FLUSH_INTERVAL_SECONDS
self.batch_size: int = batch_size or litellm.DEFAULT_BATCH_SIZE
self.last_flush_time = time.time()
self.flush_lock = flush_lock
super().__init__(**kwargs)
async def periodic_flush(self):
while True:
await asyncio.sleep(self.flush_interval)
verbose_logger.debug(
f"CustomLogger periodic flush after {self.flush_interval} seconds"
)
await self.flush_queue()
async def flush_queue(self):
if self.flush_lock is None:
return
async with self.flush_lock:
if self.log_queue:
verbose_logger.debug(
"CustomLogger: Flushing batch of %s events", len(self.log_queue)
)
await self.async_send_batch()
self.log_queue.clear()
self.last_flush_time = time.time()
async def async_send_batch(self, *args, **kwargs):
pass