diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 58e785314..7a29bbc2c 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -84,6 +84,7 @@ class LangsmithLogger(CustomLogger): self.flush_interval = 10 # 5 seconds self.last_flush_time = time.time() asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() def _prepare_log_data(self, kwargs, response_obj, start_time, end_time): import datetime @@ -250,7 +251,7 @@ class LangsmithLogger(CustomLogger): data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) self.log_queue.append(data) verbose_logger.debug( - f"Langsmith, event added to queue. Will flush in {self.flush_interval}seconds..." + f"Langsmith, event added to queue. Will flush in {self.flush_interval} seconds..." ) if len(self.log_queue) >= self.batch_size: @@ -283,17 +284,24 @@ class LangsmithLogger(CustomLogger): data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) self.log_queue.append(data) verbose_logger.debug( - "Langsmith logging: queue length", + "Langsmith logging: queue length %s, batch size %s", len(self.log_queue), - "batch size", self.batch_size, ) if len(self.log_queue) >= self.batch_size: + await self.flush_queue() + except: + verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") + + async def flush_queue(self): + async with self.flush_lock: + if self.log_queue: + verbose_logger.debug( + "Langsmith: Flushing batch of %s events", self.batch_size + ) await self._async_send_batch() self.log_queue.clear() self.last_flush_time = time.time() - except: - verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") @retry( stop=stop_after_attempt(3), @@ -363,10 +371,7 @@ class LangsmithLogger(CustomLogger): async def periodic_flush(self): while True: await asyncio.sleep(self.flush_interval) - if self.log_queue and len(self.log_queue) > 0: - verbose_logger.debug( - f"Langsmith: Waited for {self.flush_interval} seconds. flushing in memory logs to langsmith" - ) - await self._async_send_batch() - self.log_queue.clear() - self.last_flush_time = time.time() + verbose_logger.debug( + f"Langsmith periodic flush after {self.flush_interval} seconds" + ) + await self.flush_queue() diff --git a/litellm/tests/test_langsmith.py b/litellm/tests/test_langsmith.py index 9575c18da..347044592 100644 --- a/litellm/tests/test_langsmith.py +++ b/litellm/tests/test_langsmith.py @@ -22,6 +22,61 @@ litellm.set_verbose = True import time +@pytest.mark.asyncio +async def test_langsmith_queue_logging(): + try: + # Initialize LangsmithLogger + test_langsmith_logger = LangsmithLogger() + + litellm.callbacks = [test_langsmith_logger] + test_langsmith_logger.batch_size = 6 + litellm.set_verbose = True + + # Make multiple calls to ensure we don't hit the batch size + for _ in range(5): + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Test message"}], + max_tokens=10, + temperature=0.2, + mock_response="This is a mock response", + ) + + await asyncio.sleep(3) + + # Check that logs are in the queue + assert len(test_langsmith_logger.log_queue) == 5 + + # Now make calls to exceed the batch size + for _ in range(3): + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Test message"}], + max_tokens=10, + temperature=0.2, + mock_response="This is a mock response", + ) + + # Wait a short time for any asynchronous operations to complete + await asyncio.sleep(1) + + print( + "Length of langsmith log queue: {}".format( + len(test_langsmith_logger.log_queue) + ) + ) + # Check that the queue was flushed after exceeding batch size + assert len(test_langsmith_logger.log_queue) < 5 + + # Clean up + for cb in litellm.callbacks: + if isinstance(cb, LangsmithLogger): + await cb.async_httpx_client.client.aclose() + + except Exception as e: + pytest.fail(f"Error occurred: {e}") + + @pytest.mark.skip(reason="Flaky test. covered by unit tests on custom logger.") @pytest.mark.asyncio() async def test_async_langsmith_logging():