fix move logic to custom_batch_logger

This commit is contained in:
Ishaan Jaff 2024-09-11 16:19:24 -07:00
parent e681619381
commit 368a5fd052
2 changed files with 61 additions and 32 deletions

View file

@ -0,0 +1,53 @@
"""
Custom Logger that handles batching logic
Use this if you want your logs to be stored in memory and flushed periodically
"""
import asyncio
import time
from typing import List, Literal, Optional
from litellm._logging import verbose_logger
from litellm.integrations.custom_logger import CustomLogger
DEFAULT_BATCH_SIZE = 512
DEFAULT_FLUSH_INTERVAL_SECONDS = 5
class CustomBatchLogger(CustomLogger):
def __init__(self, flush_lock: Optional[asyncio.Lock] = None, **kwargs) -> None:
"""
Args:
flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching
"""
self.log_queue: List = []
self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds
self.batch_size = DEFAULT_BATCH_SIZE
self.last_flush_time = time.time()
self.flush_lock = flush_lock
super().__init__(**kwargs)
pass
async def periodic_flush(self):
while True:
await asyncio.sleep(self.flush_interval)
verbose_logger.debug(
f"CustomLogger periodic flush after {self.flush_interval} seconds"
)
await self.flush_queue()
async def flush_queue(self):
async with self.flush_lock:
if self.log_queue:
verbose_logger.debug(
"CustomLogger: Flushing batch of %s events", self.batch_size
)
await self.async_send_batch()
self.log_queue.clear()
self.last_flush_time = time.time()
async def async_send_batch(self):
pass

View file

@ -17,16 +17,13 @@ from pydantic import BaseModel # type: ignore
import litellm import litellm
from litellm._logging import verbose_logger from litellm._logging import verbose_logger
from litellm.integrations.custom_logger import CustomLogger from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.llms.custom_httpx.http_handler import ( from litellm.llms.custom_httpx.http_handler import (
AsyncHTTPHandler, AsyncHTTPHandler,
get_async_httpx_client, get_async_httpx_client,
httpxSpecialProvider, httpxSpecialProvider,
) )
DEFAULT_BATCH_SIZE = 512
DEFAULT_FLUSH_INTERVAL_SECONDS = 5
class LangsmithInputs(BaseModel): class LangsmithInputs(BaseModel):
model: Optional[str] = None model: Optional[str] = None
@ -59,8 +56,8 @@ def is_serializable(value):
return not isinstance(value, non_serializable_types) return not isinstance(value, non_serializable_types)
class LangsmithLogger(CustomLogger): class LangsmithLogger(CustomBatchLogger):
def __init__(self): def __init__(self, **kwargs):
self.langsmith_api_key = os.getenv("LANGSMITH_API_KEY") self.langsmith_api_key = os.getenv("LANGSMITH_API_KEY")
self.langsmith_project = os.getenv("LANGSMITH_PROJECT", "litellm-completion") self.langsmith_project = os.getenv("LANGSMITH_PROJECT", "litellm-completion")
self.langsmith_default_run_name = os.getenv( self.langsmith_default_run_name = os.getenv(
@ -72,17 +69,14 @@ class LangsmithLogger(CustomLogger):
self.async_httpx_client = get_async_httpx_client( self.async_httpx_client = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback llm_provider=httpxSpecialProvider.LoggingCallback
) )
_batch_size = ( _batch_size = (
os.getenv("LANGSMITH_BATCH_SIZE", DEFAULT_BATCH_SIZE) os.getenv("LANGSMITH_BATCH_SIZE", None) or litellm.langsmith_batch_size
or litellm.langsmith_batch_size
) )
self.batch_size = int(_batch_size) if _batch_size:
self.log_queue = [] self.batch_size = int(_batch_size)
self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds
self.last_flush_time = time.time()
asyncio.create_task(self.periodic_flush()) asyncio.create_task(self.periodic_flush())
self.flush_lock = asyncio.Lock() self.flush_lock = asyncio.Lock()
super().__init__(**kwargs, flush_lock=self.flush_lock)
def _prepare_log_data(self, kwargs, response_obj, start_time, end_time): def _prepare_log_data(self, kwargs, response_obj, start_time, end_time):
import datetime import datetime
@ -291,17 +285,7 @@ class LangsmithLogger(CustomLogger):
except: except:
verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}")
async def flush_queue(self): async def async_send_batch(self):
async with self.flush_lock:
if self.log_queue:
verbose_logger.debug(
"Langsmith: Flushing batch of %s events", self.batch_size
)
await self._async_send_batch()
self.log_queue.clear()
self.last_flush_time = time.time()
async def _async_send_batch(self):
""" """
sends runs to /batch endpoint sends runs to /batch endpoint
@ -360,11 +344,3 @@ class LangsmithLogger(CustomLogger):
st = datetime.now(timezone.utc) st = datetime.now(timezone.utc)
id_ = run_id id_ = run_id
return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_) return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_)
async def periodic_flush(self):
while True:
await asyncio.sleep(self.flush_interval)
verbose_logger.debug(
f"Langsmith periodic flush after {self.flush_interval} seconds"
)
await self.flush_queue()