forked from phoenix/litellm-mirror
use lock to flush events to langsmith
This commit is contained in:
parent
7cd7675458
commit
ede33230f2
2 changed files with 72 additions and 12 deletions
|
@ -84,6 +84,7 @@ class LangsmithLogger(CustomLogger):
|
||||||
self.flush_interval = 10 # 5 seconds
|
self.flush_interval = 10 # 5 seconds
|
||||||
self.last_flush_time = time.time()
|
self.last_flush_time = time.time()
|
||||||
asyncio.create_task(self.periodic_flush())
|
asyncio.create_task(self.periodic_flush())
|
||||||
|
self.flush_lock = asyncio.Lock()
|
||||||
|
|
||||||
def _prepare_log_data(self, kwargs, response_obj, start_time, end_time):
|
def _prepare_log_data(self, kwargs, response_obj, start_time, end_time):
|
||||||
import datetime
|
import datetime
|
||||||
|
@ -250,7 +251,7 @@ class LangsmithLogger(CustomLogger):
|
||||||
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
|
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
|
||||||
self.log_queue.append(data)
|
self.log_queue.append(data)
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
f"Langsmith, event added to queue. Will flush in {self.flush_interval}seconds..."
|
f"Langsmith, event added to queue. Will flush in {self.flush_interval} seconds..."
|
||||||
)
|
)
|
||||||
|
|
||||||
if len(self.log_queue) >= self.batch_size:
|
if len(self.log_queue) >= self.batch_size:
|
||||||
|
@ -283,17 +284,24 @@ class LangsmithLogger(CustomLogger):
|
||||||
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
|
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
|
||||||
self.log_queue.append(data)
|
self.log_queue.append(data)
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
"Langsmith logging: queue length",
|
"Langsmith logging: queue length %s, batch size %s",
|
||||||
len(self.log_queue),
|
len(self.log_queue),
|
||||||
"batch size",
|
|
||||||
self.batch_size,
|
self.batch_size,
|
||||||
)
|
)
|
||||||
if len(self.log_queue) >= self.batch_size:
|
if len(self.log_queue) >= self.batch_size:
|
||||||
|
await self.flush_queue()
|
||||||
|
except:
|
||||||
|
verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}")
|
||||||
|
|
||||||
|
async def flush_queue(self):
|
||||||
|
async with self.flush_lock:
|
||||||
|
if self.log_queue:
|
||||||
|
verbose_logger.debug(
|
||||||
|
"Langsmith: Flushing batch of %s events", self.batch_size
|
||||||
|
)
|
||||||
await self._async_send_batch()
|
await self._async_send_batch()
|
||||||
self.log_queue.clear()
|
self.log_queue.clear()
|
||||||
self.last_flush_time = time.time()
|
self.last_flush_time = time.time()
|
||||||
except:
|
|
||||||
verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}")
|
|
||||||
|
|
||||||
@retry(
|
@retry(
|
||||||
stop=stop_after_attempt(3),
|
stop=stop_after_attempt(3),
|
||||||
|
@ -363,10 +371,7 @@ class LangsmithLogger(CustomLogger):
|
||||||
async def periodic_flush(self):
|
async def periodic_flush(self):
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(self.flush_interval)
|
await asyncio.sleep(self.flush_interval)
|
||||||
if self.log_queue and len(self.log_queue) > 0:
|
verbose_logger.debug(
|
||||||
verbose_logger.debug(
|
f"Langsmith periodic flush after {self.flush_interval} seconds"
|
||||||
f"Langsmith: Waited for {self.flush_interval} seconds. flushing in memory logs to langsmith"
|
)
|
||||||
)
|
await self.flush_queue()
|
||||||
await self._async_send_batch()
|
|
||||||
self.log_queue.clear()
|
|
||||||
self.last_flush_time = time.time()
|
|
||||||
|
|
|
@ -22,6 +22,61 @@ litellm.set_verbose = True
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_langsmith_queue_logging():
|
||||||
|
try:
|
||||||
|
# Initialize LangsmithLogger
|
||||||
|
test_langsmith_logger = LangsmithLogger()
|
||||||
|
|
||||||
|
litellm.callbacks = [test_langsmith_logger]
|
||||||
|
test_langsmith_logger.batch_size = 6
|
||||||
|
litellm.set_verbose = True
|
||||||
|
|
||||||
|
# Make multiple calls to ensure we don't hit the batch size
|
||||||
|
for _ in range(5):
|
||||||
|
response = await litellm.acompletion(
|
||||||
|
model="gpt-3.5-turbo",
|
||||||
|
messages=[{"role": "user", "content": "Test message"}],
|
||||||
|
max_tokens=10,
|
||||||
|
temperature=0.2,
|
||||||
|
mock_response="This is a mock response",
|
||||||
|
)
|
||||||
|
|
||||||
|
await asyncio.sleep(3)
|
||||||
|
|
||||||
|
# Check that logs are in the queue
|
||||||
|
assert len(test_langsmith_logger.log_queue) == 5
|
||||||
|
|
||||||
|
# Now make calls to exceed the batch size
|
||||||
|
for _ in range(3):
|
||||||
|
response = await litellm.acompletion(
|
||||||
|
model="gpt-3.5-turbo",
|
||||||
|
messages=[{"role": "user", "content": "Test message"}],
|
||||||
|
max_tokens=10,
|
||||||
|
temperature=0.2,
|
||||||
|
mock_response="This is a mock response",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Wait a short time for any asynchronous operations to complete
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
print(
|
||||||
|
"Length of langsmith log queue: {}".format(
|
||||||
|
len(test_langsmith_logger.log_queue)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# Check that the queue was flushed after exceeding batch size
|
||||||
|
assert len(test_langsmith_logger.log_queue) < 5
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
for cb in litellm.callbacks:
|
||||||
|
if isinstance(cb, LangsmithLogger):
|
||||||
|
await cb.async_httpx_client.client.aclose()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
pytest.fail(f"Error occurred: {e}")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(reason="Flaky test. covered by unit tests on custom logger.")
|
@pytest.mark.skip(reason="Flaky test. covered by unit tests on custom logger.")
|
||||||
@pytest.mark.asyncio()
|
@pytest.mark.asyncio()
|
||||||
async def test_async_langsmith_logging():
|
async def test_async_langsmith_logging():
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue