litellm-mirror/litellm/integrations/custom_batch_logger.py
Krish Dholakia 9b7ebb6a7d
build(pyproject.toml): add new dev dependencies - for type checking (#9631)
* build(pyproject.toml): add new dev dependencies - for type checking

* build: reformat files to fit black

* ci: reformat to fit black

* ci(test-litellm.yml): make tests run clear

* build(pyproject.toml): add ruff

* fix: fix ruff checks

* build(mypy/): fix mypy linting errors

* fix(hashicorp_secret_manager.py): fix passing cert for tls auth

* build(mypy/): resolve all mypy errors

* test: update test

* fix: fix black formatting

* build(pre-commit-config.yaml): use poetry run black

* fix(proxy_server.py): fix linting error

* fix: fix ruff safe representation error
2025-03-29 11:02:13 -07:00

58 lines
1.8 KiB
Python

"""
Custom Logger that handles batching logic
Use this if you want your logs to be stored in memory and flushed periodically.
"""
import asyncio
import time
from typing import List, Optional
import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_logger import CustomLogger
class CustomBatchLogger(CustomLogger):
def __init__(
self,
flush_lock: Optional[asyncio.Lock] = None,
batch_size: Optional[int] = None,
flush_interval: Optional[int] = None,
**kwargs,
) -> None:
"""
Args:
flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching
"""
self.log_queue: List = []
self.flush_interval = flush_interval or litellm.DEFAULT_FLUSH_INTERVAL_SECONDS
self.batch_size: int = batch_size or litellm.DEFAULT_BATCH_SIZE
self.last_flush_time = time.time()
self.flush_lock = flush_lock
super().__init__(**kwargs)
async def periodic_flush(self):
while True:
await asyncio.sleep(self.flush_interval)
verbose_logger.debug(
f"CustomLogger periodic flush after {self.flush_interval} seconds"
)
await self.flush_queue()
async def flush_queue(self):
if self.flush_lock is None:
return
async with self.flush_lock:
if self.log_queue:
verbose_logger.debug(
"CustomLogger: Flushing batch of %s events", len(self.log_queue)
)
await self.async_send_batch()
self.log_queue.clear()
self.last_flush_time = time.time()
async def async_send_batch(self, *args, **kwargs):
pass