diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 13d91d55b..58e785314 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -3,6 +3,7 @@ import asyncio import os import random +import time import traceback import types import uuid @@ -80,6 +81,9 @@ class LangsmithLogger(CustomLogger): ) self.batch_size = int(_batch_size) self.log_queue = [] + self.flush_interval = 10 # 5 seconds + self.last_flush_time = time.time() + asyncio.create_task(self.periodic_flush()) def _prepare_log_data(self, kwargs, response_obj, start_time, end_time): import datetime @@ -182,7 +186,7 @@ class LangsmithLogger(CustomLogger): if dotted_order: data["dotted_order"] = dotted_order - if data["id"] is None: + if "id" not in data or data["id"] is None: """ for /batch langsmith requires id, trace_id and dotted_order passed as params """ @@ -245,6 +249,9 @@ class LangsmithLogger(CustomLogger): ) data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) self.log_queue.append(data) + verbose_logger.debug( + f"Langsmith, event added to queue. Will flush in {self.flush_interval}seconds..." + ) if len(self.log_queue) >= self.batch_size: self._send_batch() @@ -284,6 +291,7 @@ class LangsmithLogger(CustomLogger): if len(self.log_queue) >= self.batch_size: await self._async_send_batch() self.log_queue.clear() + self.last_flush_time = time.time() except: verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") @@ -351,3 +359,14 @@ class LangsmithLogger(CustomLogger): st = datetime.now(timezone.utc) id_ = run_id return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_) + + async def periodic_flush(self): + while True: + await asyncio.sleep(self.flush_interval) + if self.log_queue and len(self.log_queue) > 0: + verbose_logger.debug( + f"Langsmith: Waited for {self.flush_interval} seconds. flushing in memory logs to langsmith" + ) + await self._async_send_batch() + self.log_queue.clear() + self.last_flush_time = time.time()