From 2fa9709af080844cc89c106c6c4b2aae44030d93 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 08:06:56 -0700 Subject: [PATCH 01/14] stash - langsmith use batching for logging --- litellm/integrations/langsmith.py | 128 +++++++++++++++++++----------- litellm/proxy/proxy_config.yaml | 3 + 2 files changed, 84 insertions(+), 47 deletions(-) diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 3ee55b3e9..1111c440e 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -55,8 +55,7 @@ def is_serializable(value): class LangsmithLogger(CustomLogger): - # Class variables or attributes - def __init__(self): + def __init__(self, batch_size=1): self.langsmith_api_key = os.getenv("LANGSMITH_API_KEY") self.langsmith_project = os.getenv("LANGSMITH_PROJECT", "litellm-completion") self.langsmith_default_run_name = os.getenv( @@ -68,6 +67,8 @@ class LangsmithLogger(CustomLogger): self.async_httpx_client = get_async_httpx_client( llm_provider=httpxSpecialProvider.LoggingCallback ) + self.batch_size = batch_size + self.log_queue = [] def _prepare_log_data(self, kwargs, response_obj, start_time, end_time): import datetime @@ -102,7 +103,7 @@ class LangsmithLogger(CustomLogger): project_name = metadata.get("project_name", self.langsmith_project) run_name = metadata.get("run_name", self.langsmith_default_run_name) - run_id = metadata.get("id", None) + run_id = metadata.get("id", None) or str(random.randint(1000, 9999)) parent_run_id = metadata.get("parent_run_id", None) trace_id = metadata.get("trace_id", None) session_id = metadata.get("session_id", None) @@ -174,48 +175,31 @@ class LangsmithLogger(CustomLogger): return data - async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): - try: - sampling_rate = ( - float(os.getenv("LANGSMITH_SAMPLING_RATE")) - if os.getenv("LANGSMITH_SAMPLING_RATE") is not None - and os.getenv("LANGSMITH_SAMPLING_RATE").strip().isdigit() - else 1.0 - ) - random_sample = random.random() - if random_sample > sampling_rate: - verbose_logger.info( - "Skipping Langsmith logging. Sampling rate={}, random_sample={}".format( - sampling_rate, random_sample - ) - ) - return # Skip logging - verbose_logger.debug( - "Langsmith Async Layer Logging - kwargs: %s, response_obj: %s", - kwargs, - response_obj, - ) - data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) - url = f"{self.langsmith_base_url}/runs" - verbose_logger.debug(f"Langsmith Logging - About to send data to {url} ...") + def _send_batch(self): + if not self.log_queue: + return - headers = {"x-api-key": self.langsmith_api_key} - response = await self.async_httpx_client.post( - url=url, json=data, headers=headers + url = f"{self.langsmith_base_url}/runs/batch" + headers = {"x-api-key": self.langsmith_api_key} + + try: + response = requests.post( + url=url, + json=self.log_queue, + headers=headers, ) if response.status_code >= 300: verbose_logger.error( - f"Langmsith Error: {response.status_code} - {response.text}" + f"Langsmith Error: {response.status_code} - {response.text}" ) else: verbose_logger.debug( - "Run successfully created, response=%s", response.text + f"Batch of {len(self.log_queue)} runs successfully created" ) - verbose_logger.debug( - f"Langsmith Layer Logging - final response object: {response_obj}. Response text from langsmith={response.text}" - ) - except: + + self.log_queue.clear() + except Exception as e: verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") def log_success_event(self, kwargs, response_obj, start_time, end_time): @@ -240,23 +224,73 @@ class LangsmithLogger(CustomLogger): response_obj, ) data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) - url = f"{self.langsmith_base_url}/runs" - verbose_logger.debug(f"Langsmith Logging - About to send data to {url} ...") + self.log_queue.append(data) - response = requests.post( + if len(self.log_queue) >= self.batch_size: + self._send_batch() + + except: + verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + sampling_rate = ( + float(os.getenv("LANGSMITH_SAMPLING_RATE")) + if os.getenv("LANGSMITH_SAMPLING_RATE") is not None + and os.getenv("LANGSMITH_SAMPLING_RATE").strip().isdigit() + else 1.0 + ) + random_sample = random.random() + if random_sample > sampling_rate: + verbose_logger.info( + "Skipping Langsmith logging. Sampling rate={}, random_sample={}".format( + sampling_rate, random_sample + ) + ) + return # Skip logging + verbose_logger.debug( + "Langsmith Async Layer Logging - kwargs: %s, response_obj: %s", + kwargs, + response_obj, + ) + data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) + self.log_queue.append(data) + + if len(self.log_queue) >= self.batch_size: + await self._async_send_batch() + + except: + verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") + + async def _async_send_batch(self): + import json + + if not self.log_queue: + return + + url = f"{self.langsmith_base_url}/runs/batch" + headers = {"x-api-key": self.langsmith_api_key} + + try: + response = await self.async_httpx_client.post( url=url, - json=data, - headers={"x-api-key": self.langsmith_api_key}, + json={ + "post": self.log_queue, + }, + headers=headers, ) if response.status_code >= 300: - verbose_logger.error(f"Error: {response.status_code} - {response.text}") + verbose_logger.error( + f"Langsmith Error: {response.status_code} - {response.text}" + ) else: - verbose_logger.debug("Run successfully created") - verbose_logger.debug( - f"Langsmith Layer Logging - final response object: {response_obj}. Response text from langsmith={response.text}" - ) - except: + verbose_logger.debug( + f"Batch of {len(self.log_queue)} runs successfully created" + ) + + self.log_queue.clear() + except Exception as e: verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") def get_run_by_id(self, run_id): diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index defde3772..98ca86db3 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -14,3 +14,6 @@ model_list: general_settings: master_key: sk-1234 + +litellm_settings: + success_callback: ["langsmith"] From 2f670a16fcd16939865b1c98844384f07b86c056 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 11:25:33 -0700 Subject: [PATCH 02/14] fix langsmith_batch_size --- litellm/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/litellm/__init__.py b/litellm/__init__.py index 57b9f6a71..95c276edf 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -55,6 +55,7 @@ _known_custom_logger_compatible_callbacks: List = list( ) callbacks: List[Union[Callable, _custom_logger_compatible_callbacks_literal]] = [] langfuse_default_tags: Optional[List[str]] = None +langsmith_batch_size: Optional[int] = None _async_input_callback: List[Callable] = ( [] ) # internal variable - async custom callbacks are routed here. From 860516c843cfc5fb18056e7987c42becff6d093e Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 11:28:27 -0700 Subject: [PATCH 03/14] langsmith use batching for logging --- litellm/integrations/langsmith.py | 39 +++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 1111c440e..4c98788cb 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -5,7 +5,8 @@ import os import random import traceback import types -from datetime import datetime +import uuid +from datetime import datetime, timezone from typing import Any, List, Optional, Union import dotenv # type: ignore @@ -55,7 +56,7 @@ def is_serializable(value): class LangsmithLogger(CustomLogger): - def __init__(self, batch_size=1): + def __init__(self): self.langsmith_api_key = os.getenv("LANGSMITH_API_KEY") self.langsmith_project = os.getenv("LANGSMITH_PROJECT", "litellm-completion") self.langsmith_default_run_name = os.getenv( @@ -67,7 +68,11 @@ class LangsmithLogger(CustomLogger): self.async_httpx_client = get_async_httpx_client( llm_provider=httpxSpecialProvider.LoggingCallback ) - self.batch_size = batch_size + + _batch_size = ( + os.getenv("LANGSMITH_BATCH_SIZE", 100) or litellm.langsmith_batch_size + ) + self.batch_size = int(_batch_size) self.log_queue = [] def _prepare_log_data(self, kwargs, response_obj, start_time, end_time): @@ -103,7 +108,7 @@ class LangsmithLogger(CustomLogger): project_name = metadata.get("project_name", self.langsmith_project) run_name = metadata.get("run_name", self.langsmith_default_run_name) - run_id = metadata.get("id", None) or str(random.randint(1000, 9999)) + run_id = metadata.get("id", None) parent_run_id = metadata.get("parent_run_id", None) trace_id = metadata.get("trace_id", None) session_id = metadata.get("session_id", None) @@ -171,6 +176,15 @@ class LangsmithLogger(CustomLogger): if dotted_order: data["dotted_order"] = dotted_order + if data["id"] is None: + """ + for /batch langsmith requires id, trace_id and dotted_order passed as params + """ + run_id = uuid.uuid4() + data["id"] = str(run_id) + data["trace_id"] = str(run_id) + data["dotted_order"] = self.make_dot_order(run_id=run_id) + verbose_logger.debug("Langsmith Logging data on langsmith: %s", data) return data @@ -255,7 +269,12 @@ class LangsmithLogger(CustomLogger): ) data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) self.log_queue.append(data) - + verbose_logger.debug( + "Langsmith logging: queue length", + len(self.log_queue), + "batch size", + self.batch_size, + ) if len(self.log_queue) >= self.batch_size: await self._async_send_batch() @@ -279,6 +298,7 @@ class LangsmithLogger(CustomLogger): }, headers=headers, ) + response.raise_for_status() if response.status_code >= 300: verbose_logger.error( @@ -290,6 +310,10 @@ class LangsmithLogger(CustomLogger): ) self.log_queue.clear() + except httpx.HTTPStatusError as e: + verbose_logger.error( + f"Langsmith HTTP Error: {e.response.status_code} - {e.response.text}" + ) except Exception as e: verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") @@ -302,3 +326,8 @@ class LangsmithLogger(CustomLogger): ) return response.json() + + def make_dot_order(self, run_id: str): + st = datetime.now(timezone.utc) + id_ = run_id + return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_) From 15277aff1cc84d8cc5989b56609b4932a60822bb Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 11:56:24 -0700 Subject: [PATCH 04/14] fix langsmith clear logged queue on success --- litellm/integrations/langsmith.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 4c98788cb..424d18b6f 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -9,9 +9,12 @@ import uuid from datetime import datetime, timezone from typing import Any, List, Optional, Union +import backoff import dotenv # type: ignore import httpx import requests # type: ignore +from backoff import on_exception +from backoff._typing import Details from pydantic import BaseModel # type: ignore import litellm @@ -55,6 +58,12 @@ def is_serializable(value): return not isinstance(value, non_serializable_types) +def on_backoff(details: Details) -> None: + verbose_logger.warning( + f"Langsmith batch send failed. Retrying in {details['wait']} seconds. Attempt {details['tries']}/3" + ) + + class LangsmithLogger(CustomLogger): def __init__(self): self.langsmith_api_key = os.getenv("LANGSMITH_API_KEY") @@ -277,11 +286,23 @@ class LangsmithLogger(CustomLogger): ) if len(self.log_queue) >= self.batch_size: await self._async_send_batch() - + self.log_queue.clear() except: verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") + @on_exception( + backoff.expo, (httpx.HTTPError, Exception), max_tries=3, on_backoff=on_backoff + ) async def _async_send_batch(self): + """ + sends runs to /batch endpoint + + Sends runs from self.log_queue + + Returns: None + + Raises: Does not raise an exception, will only verbose_logger.exception() + """ import json if not self.log_queue: @@ -308,14 +329,14 @@ class LangsmithLogger(CustomLogger): verbose_logger.debug( f"Batch of {len(self.log_queue)} runs successfully created" ) - - self.log_queue.clear() except httpx.HTTPStatusError as e: - verbose_logger.error( + verbose_logger.exception( f"Langsmith HTTP Error: {e.response.status_code} - {e.response.text}" ) except Exception as e: - verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") + verbose_logger.exception( + f"Langsmith Layer Error - {traceback.format_exc()}" + ) def get_run_by_id(self, run_id): From 0a6a437e642dd19eab4d2bd8ab649ba4e1da312a Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 12:41:22 -0700 Subject: [PATCH 05/14] use tenacity for langsmith --- litellm/integrations/langsmith.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 424d18b6f..13d91d55b 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -9,13 +9,16 @@ import uuid from datetime import datetime, timezone from typing import Any, List, Optional, Union -import backoff import dotenv # type: ignore import httpx import requests # type: ignore -from backoff import on_exception -from backoff._typing import Details from pydantic import BaseModel # type: ignore +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) import litellm from litellm._logging import verbose_logger @@ -58,12 +61,6 @@ def is_serializable(value): return not isinstance(value, non_serializable_types) -def on_backoff(details: Details) -> None: - verbose_logger.warning( - f"Langsmith batch send failed. Retrying in {details['wait']} seconds. Attempt {details['tries']}/3" - ) - - class LangsmithLogger(CustomLogger): def __init__(self): self.langsmith_api_key = os.getenv("LANGSMITH_API_KEY") @@ -290,8 +287,10 @@ class LangsmithLogger(CustomLogger): except: verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") - @on_exception( - backoff.expo, (httpx.HTTPError, Exception), max_tries=3, on_backoff=on_backoff + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_if_exception_type((httpx.HTTPStatusError, Exception)), ) async def _async_send_batch(self): """ From a66f03f860761807f30e4f8c45d11bfdd0550e17 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 12:45:39 -0700 Subject: [PATCH 06/14] fix installing litellm --- .circleci/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index dccaa2b11..1871134a2 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -161,6 +161,7 @@ jobs: name: Install Dependencies command: | python -m pip install --upgrade pip + python -m pip install -r requirements.txt pip install python-dotenv pip install pytest pip install tiktoken From 7cd7675458defca0f66fed97f0deb1cb88e78a13 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 13:02:34 -0700 Subject: [PATCH 07/14] add better debugging for flush interval --- litellm/integrations/langsmith.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 13d91d55b..58e785314 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -3,6 +3,7 @@ import asyncio import os import random +import time import traceback import types import uuid @@ -80,6 +81,9 @@ class LangsmithLogger(CustomLogger): ) self.batch_size = int(_batch_size) self.log_queue = [] + self.flush_interval = 10 # 5 seconds + self.last_flush_time = time.time() + asyncio.create_task(self.periodic_flush()) def _prepare_log_data(self, kwargs, response_obj, start_time, end_time): import datetime @@ -182,7 +186,7 @@ class LangsmithLogger(CustomLogger): if dotted_order: data["dotted_order"] = dotted_order - if data["id"] is None: + if "id" not in data or data["id"] is None: """ for /batch langsmith requires id, trace_id and dotted_order passed as params """ @@ -245,6 +249,9 @@ class LangsmithLogger(CustomLogger): ) data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) self.log_queue.append(data) + verbose_logger.debug( + f"Langsmith, event added to queue. Will flush in {self.flush_interval}seconds..." + ) if len(self.log_queue) >= self.batch_size: self._send_batch() @@ -284,6 +291,7 @@ class LangsmithLogger(CustomLogger): if len(self.log_queue) >= self.batch_size: await self._async_send_batch() self.log_queue.clear() + self.last_flush_time = time.time() except: verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") @@ -351,3 +359,14 @@ class LangsmithLogger(CustomLogger): st = datetime.now(timezone.utc) id_ = run_id return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_) + + async def periodic_flush(self): + while True: + await asyncio.sleep(self.flush_interval) + if self.log_queue and len(self.log_queue) > 0: + verbose_logger.debug( + f"Langsmith: Waited for {self.flush_interval} seconds. flushing in memory logs to langsmith" + ) + await self._async_send_batch() + self.log_queue.clear() + self.last_flush_time = time.time() From ede33230f27eac678969e61e9d3edd2b0e1b8933 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 13:27:16 -0700 Subject: [PATCH 08/14] use lock to flush events to langsmith --- litellm/integrations/langsmith.py | 29 +++++++++------- litellm/tests/test_langsmith.py | 55 +++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 12 deletions(-) diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 58e785314..7a29bbc2c 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -84,6 +84,7 @@ class LangsmithLogger(CustomLogger): self.flush_interval = 10 # 5 seconds self.last_flush_time = time.time() asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() def _prepare_log_data(self, kwargs, response_obj, start_time, end_time): import datetime @@ -250,7 +251,7 @@ class LangsmithLogger(CustomLogger): data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) self.log_queue.append(data) verbose_logger.debug( - f"Langsmith, event added to queue. Will flush in {self.flush_interval}seconds..." + f"Langsmith, event added to queue. Will flush in {self.flush_interval} seconds..." ) if len(self.log_queue) >= self.batch_size: @@ -283,17 +284,24 @@ class LangsmithLogger(CustomLogger): data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) self.log_queue.append(data) verbose_logger.debug( - "Langsmith logging: queue length", + "Langsmith logging: queue length %s, batch size %s", len(self.log_queue), - "batch size", self.batch_size, ) if len(self.log_queue) >= self.batch_size: + await self.flush_queue() + except: + verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") + + async def flush_queue(self): + async with self.flush_lock: + if self.log_queue: + verbose_logger.debug( + "Langsmith: Flushing batch of %s events", self.batch_size + ) await self._async_send_batch() self.log_queue.clear() self.last_flush_time = time.time() - except: - verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") @retry( stop=stop_after_attempt(3), @@ -363,10 +371,7 @@ class LangsmithLogger(CustomLogger): async def periodic_flush(self): while True: await asyncio.sleep(self.flush_interval) - if self.log_queue and len(self.log_queue) > 0: - verbose_logger.debug( - f"Langsmith: Waited for {self.flush_interval} seconds. flushing in memory logs to langsmith" - ) - await self._async_send_batch() - self.log_queue.clear() - self.last_flush_time = time.time() + verbose_logger.debug( + f"Langsmith periodic flush after {self.flush_interval} seconds" + ) + await self.flush_queue() diff --git a/litellm/tests/test_langsmith.py b/litellm/tests/test_langsmith.py index 9575c18da..347044592 100644 --- a/litellm/tests/test_langsmith.py +++ b/litellm/tests/test_langsmith.py @@ -22,6 +22,61 @@ litellm.set_verbose = True import time +@pytest.mark.asyncio +async def test_langsmith_queue_logging(): + try: + # Initialize LangsmithLogger + test_langsmith_logger = LangsmithLogger() + + litellm.callbacks = [test_langsmith_logger] + test_langsmith_logger.batch_size = 6 + litellm.set_verbose = True + + # Make multiple calls to ensure we don't hit the batch size + for _ in range(5): + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Test message"}], + max_tokens=10, + temperature=0.2, + mock_response="This is a mock response", + ) + + await asyncio.sleep(3) + + # Check that logs are in the queue + assert len(test_langsmith_logger.log_queue) == 5 + + # Now make calls to exceed the batch size + for _ in range(3): + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Test message"}], + max_tokens=10, + temperature=0.2, + mock_response="This is a mock response", + ) + + # Wait a short time for any asynchronous operations to complete + await asyncio.sleep(1) + + print( + "Length of langsmith log queue: {}".format( + len(test_langsmith_logger.log_queue) + ) + ) + # Check that the queue was flushed after exceeding batch size + assert len(test_langsmith_logger.log_queue) < 5 + + # Clean up + for cb in litellm.callbacks: + if isinstance(cb, LangsmithLogger): + await cb.async_httpx_client.client.aclose() + + except Exception as e: + pytest.fail(f"Error occurred: {e}") + + @pytest.mark.skip(reason="Flaky test. covered by unit tests on custom logger.") @pytest.mark.asyncio() async def test_async_langsmith_logging(): From 1415bdd6fa1b819369c611768a3884c96a559695 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 13:30:42 -0700 Subject: [PATCH 09/14] fix testing + req.txt --- .circleci/config.yml | 1 - requirements.txt | 1 - 2 files changed, 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 1871134a2..ec6abc009 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -170,7 +170,6 @@ jobs: pip install "boto3==1.34.34" pip install jinja2 pip install tokenizers - pip install openai pip install jsonschema - run: name: Run tests diff --git a/requirements.txt b/requirements.txt index c69551147..ded2d040c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,6 @@ gunicorn==22.0.0 # server dep boto3==1.34.34 # aws bedrock/sagemaker calls redis==5.0.0 # caching numpy==1.24.3 # semantic caching -pandas==2.1.1 # for viewing clickhouse spend analytics prisma==0.11.0 # for db mangum==0.17.0 # for aws lambda functions pynacl==1.5.0 # for encrypting keys From f339f9614a6c1c2939eba98e139d088883bda529 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 13:35:37 -0700 Subject: [PATCH 10/14] fix requirements.txt --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ec6abc009..4ac8fbd48 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -161,11 +161,11 @@ jobs: name: Install Dependencies command: | python -m pip install --upgrade pip - python -m pip install -r requirements.txt pip install python-dotenv pip install pytest pip install tiktoken pip install aiohttp + pip install openai pip install click pip install "boto3==1.34.34" pip install jinja2 From d84fa05161ee774db1f08ddb5fd8ec3d38d3ca43 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 13:48:44 -0700 Subject: [PATCH 11/14] fix langsmith tenacity --- litellm/integrations/langsmith.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 7a29bbc2c..505f1ba78 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -14,12 +14,6 @@ import dotenv # type: ignore import httpx import requests # type: ignore from pydantic import BaseModel # type: ignore -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, -) import litellm from litellm._logging import verbose_logger @@ -303,11 +297,6 @@ class LangsmithLogger(CustomLogger): self.log_queue.clear() self.last_flush_time = time.time() - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - retry=retry_if_exception_type((httpx.HTTPStatusError, Exception)), - ) async def _async_send_batch(self): """ sends runs to /batch endpoint From 0070741529c6c48ee28d058938b52edb309c01d7 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 14:17:03 -0700 Subject: [PATCH 12/14] fix vtx test --- litellm/tests/test_amazing_vertex_completion.py | 1 + 1 file changed, 1 insertion(+) diff --git a/litellm/tests/test_amazing_vertex_completion.py b/litellm/tests/test_amazing_vertex_completion.py index 4c065b8d3..de2729db7 100644 --- a/litellm/tests/test_amazing_vertex_completion.py +++ b/litellm/tests/test_amazing_vertex_completion.py @@ -52,6 +52,7 @@ VERTEX_MODELS_TO_NOT_TEST = [ "gemini-1.5-pro-preview-0215", "gemini-pro-experimental", "gemini-flash-experimental", + "gemini-1.5-flash-exp-0827", "gemini-pro-flash", ] From e681619381d19f1800565503d044a5fd1122d622 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 14:40:58 -0700 Subject: [PATCH 13/14] use vars for batch size and flush interval seconds --- litellm/integrations/langsmith.py | 8 ++++++-- litellm/proxy/proxy_config.yaml | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 505f1ba78..ec5632e27 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -24,6 +24,9 @@ from litellm.llms.custom_httpx.http_handler import ( httpxSpecialProvider, ) +DEFAULT_BATCH_SIZE = 512 +DEFAULT_FLUSH_INTERVAL_SECONDS = 5 + class LangsmithInputs(BaseModel): model: Optional[str] = None @@ -71,11 +74,12 @@ class LangsmithLogger(CustomLogger): ) _batch_size = ( - os.getenv("LANGSMITH_BATCH_SIZE", 100) or litellm.langsmith_batch_size + os.getenv("LANGSMITH_BATCH_SIZE", DEFAULT_BATCH_SIZE) + or litellm.langsmith_batch_size ) self.batch_size = int(_batch_size) self.log_queue = [] - self.flush_interval = 10 # 5 seconds + self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds self.last_flush_time = time.time() asyncio.create_task(self.periodic_flush()) self.flush_lock = asyncio.Lock() diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index 98ca86db3..a0e22451b 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -16,4 +16,6 @@ general_settings: master_key: sk-1234 litellm_settings: - success_callback: ["langsmith"] + success_callback: ["langsmith", "prometheus"] + service_callback: ["prometheus_system"] + callbacks: ["otel"] From 368a5fd05295567ba307077373e37772745dbe9c Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 11 Sep 2024 16:19:24 -0700 Subject: [PATCH 14/14] fix move logic to custom_batch_logger --- litellm/integrations/custom_batch_logger.py | 53 +++++++++++++++++++++ litellm/integrations/langsmith.py | 40 ++++------------ 2 files changed, 61 insertions(+), 32 deletions(-) create mode 100644 litellm/integrations/custom_batch_logger.py diff --git a/litellm/integrations/custom_batch_logger.py b/litellm/integrations/custom_batch_logger.py new file mode 100644 index 000000000..5cbfac683 --- /dev/null +++ b/litellm/integrations/custom_batch_logger.py @@ -0,0 +1,53 @@ +""" +Custom Logger that handles batching logic + +Use this if you want your logs to be stored in memory and flushed periodically +""" + +import asyncio +import time +from typing import List, Literal, Optional + +from litellm._logging import verbose_logger +from litellm.integrations.custom_logger import CustomLogger + +DEFAULT_BATCH_SIZE = 512 +DEFAULT_FLUSH_INTERVAL_SECONDS = 5 + + +class CustomBatchLogger(CustomLogger): + + def __init__(self, flush_lock: Optional[asyncio.Lock] = None, **kwargs) -> None: + """ + Args: + flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching + """ + self.log_queue: List = [] + self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds + self.batch_size = DEFAULT_BATCH_SIZE + self.last_flush_time = time.time() + self.flush_lock = flush_lock + + super().__init__(**kwargs) + pass + + async def periodic_flush(self): + while True: + await asyncio.sleep(self.flush_interval) + verbose_logger.debug( + f"CustomLogger periodic flush after {self.flush_interval} seconds" + ) + await self.flush_queue() + + async def flush_queue(self): + async with self.flush_lock: + if self.log_queue: + verbose_logger.debug( + "CustomLogger: Flushing batch of %s events", self.batch_size + ) + await self.async_send_batch() + self.log_queue.clear() + self.last_flush_time = time.time() + + async def async_send_batch(self): + pass diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index ec5632e27..3c7280f88 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -17,16 +17,13 @@ from pydantic import BaseModel # type: ignore import litellm from litellm._logging import verbose_logger -from litellm.integrations.custom_logger import CustomLogger +from litellm.integrations.custom_batch_logger import CustomBatchLogger from litellm.llms.custom_httpx.http_handler import ( AsyncHTTPHandler, get_async_httpx_client, httpxSpecialProvider, ) -DEFAULT_BATCH_SIZE = 512 -DEFAULT_FLUSH_INTERVAL_SECONDS = 5 - class LangsmithInputs(BaseModel): model: Optional[str] = None @@ -59,8 +56,8 @@ def is_serializable(value): return not isinstance(value, non_serializable_types) -class LangsmithLogger(CustomLogger): - def __init__(self): +class LangsmithLogger(CustomBatchLogger): + def __init__(self, **kwargs): self.langsmith_api_key = os.getenv("LANGSMITH_API_KEY") self.langsmith_project = os.getenv("LANGSMITH_PROJECT", "litellm-completion") self.langsmith_default_run_name = os.getenv( @@ -72,17 +69,14 @@ class LangsmithLogger(CustomLogger): self.async_httpx_client = get_async_httpx_client( llm_provider=httpxSpecialProvider.LoggingCallback ) - _batch_size = ( - os.getenv("LANGSMITH_BATCH_SIZE", DEFAULT_BATCH_SIZE) - or litellm.langsmith_batch_size + os.getenv("LANGSMITH_BATCH_SIZE", None) or litellm.langsmith_batch_size ) - self.batch_size = int(_batch_size) - self.log_queue = [] - self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds - self.last_flush_time = time.time() + if _batch_size: + self.batch_size = int(_batch_size) asyncio.create_task(self.periodic_flush()) self.flush_lock = asyncio.Lock() + super().__init__(**kwargs, flush_lock=self.flush_lock) def _prepare_log_data(self, kwargs, response_obj, start_time, end_time): import datetime @@ -291,17 +285,7 @@ class LangsmithLogger(CustomLogger): except: verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}") - async def flush_queue(self): - async with self.flush_lock: - if self.log_queue: - verbose_logger.debug( - "Langsmith: Flushing batch of %s events", self.batch_size - ) - await self._async_send_batch() - self.log_queue.clear() - self.last_flush_time = time.time() - - async def _async_send_batch(self): + async def async_send_batch(self): """ sends runs to /batch endpoint @@ -360,11 +344,3 @@ class LangsmithLogger(CustomLogger): st = datetime.now(timezone.utc) id_ = run_id return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_) - - async def periodic_flush(self): - while True: - await asyncio.sleep(self.flush_interval) - verbose_logger.debug( - f"Langsmith periodic flush after {self.flush_interval} seconds" - ) - await self.flush_queue()