mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 11:14:04 +00:00
use lock to flush events to langsmith
This commit is contained in:
parent
c5f64ef99e
commit
d0ae85a7bb
2 changed files with 72 additions and 12 deletions
|
@ -84,6 +84,7 @@ class LangsmithLogger(CustomLogger):
|
|||
self.flush_interval = 10 # 5 seconds
|
||||
self.last_flush_time = time.time()
|
||||
asyncio.create_task(self.periodic_flush())
|
||||
self.flush_lock = asyncio.Lock()
|
||||
|
||||
def _prepare_log_data(self, kwargs, response_obj, start_time, end_time):
|
||||
import datetime
|
||||
|
@ -250,7 +251,7 @@ class LangsmithLogger(CustomLogger):
|
|||
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
|
||||
self.log_queue.append(data)
|
||||
verbose_logger.debug(
|
||||
f"Langsmith, event added to queue. Will flush in {self.flush_interval}seconds..."
|
||||
f"Langsmith, event added to queue. Will flush in {self.flush_interval} seconds..."
|
||||
)
|
||||
|
||||
if len(self.log_queue) >= self.batch_size:
|
||||
|
@ -283,17 +284,24 @@ class LangsmithLogger(CustomLogger):
|
|||
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
|
||||
self.log_queue.append(data)
|
||||
verbose_logger.debug(
|
||||
"Langsmith logging: queue length",
|
||||
"Langsmith logging: queue length %s, batch size %s",
|
||||
len(self.log_queue),
|
||||
"batch size",
|
||||
self.batch_size,
|
||||
)
|
||||
if len(self.log_queue) >= self.batch_size:
|
||||
await self.flush_queue()
|
||||
except:
|
||||
verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}")
|
||||
|
||||
async def flush_queue(self):
|
||||
async with self.flush_lock:
|
||||
if self.log_queue:
|
||||
verbose_logger.debug(
|
||||
"Langsmith: Flushing batch of %s events", self.batch_size
|
||||
)
|
||||
await self._async_send_batch()
|
||||
self.log_queue.clear()
|
||||
self.last_flush_time = time.time()
|
||||
except:
|
||||
verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}")
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
|
@ -363,10 +371,7 @@ class LangsmithLogger(CustomLogger):
|
|||
async def periodic_flush(self):
|
||||
while True:
|
||||
await asyncio.sleep(self.flush_interval)
|
||||
if self.log_queue and len(self.log_queue) > 0:
|
||||
verbose_logger.debug(
|
||||
f"Langsmith: Waited for {self.flush_interval} seconds. flushing in memory logs to langsmith"
|
||||
)
|
||||
await self._async_send_batch()
|
||||
self.log_queue.clear()
|
||||
self.last_flush_time = time.time()
|
||||
verbose_logger.debug(
|
||||
f"Langsmith periodic flush after {self.flush_interval} seconds"
|
||||
)
|
||||
await self.flush_queue()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue