import sys import dotenv, json, traceback, threading import subprocess, os import litellm, openai import random, uuid, requests import datetime, time import tiktoken encoding = tiktoken.get_encoding("cl100k_base") import pkg_resources from .integrations.helicone import HeliconeLogger from .integrations.aispend import AISpendLogger from .integrations.berrispend import BerriSpendLogger from .integrations.supabase import Supabase from openai.error import OpenAIError as OriginalError from .exceptions import AuthenticationError, InvalidRequestError, RateLimitError, ServiceUnavailableError, OpenAIError from typing import List, Dict, Union ####### ENVIRONMENT VARIABLES ################### dotenv.load_dotenv() # Loading env variables using dotenv sentry_sdk_instance = None capture_exception = None add_breadcrumb = None posthog = None slack_app = None alerts_channel = None heliconeLogger = None aispendLogger = None berrispendLogger = None supabaseClient = None callback_list = [] user_logger_fn = None additional_details = {} local_cache = {} ######## Model Response ######################### # All liteLLM Model responses will be in this format, Follows the OpenAI Format # https://docs.litellm.ai/docs/completion/output # { # 'choices': [ # { # 'finish_reason': 'stop', # 'index': 0, # 'message': { # 'role': 'assistant', # 'content': " I'm doing well, thank you for asking. I am Claude, an AI assistant created by Anthropic." # } # } # ], # 'created': 1691429984.3852863, # 'model': 'claude-instant-1', # 'usage': {'prompt_tokens': 18, 'completion_tokens': 23, 'total_tokens': 41} # } class Message: def __init__(self): self.content: str = "default" self.role: str = "assistant" def __getitem__(self, key): return getattr(self, key) def __setitem__(self, key, value): setattr(self, key, value) def __iter__(self): return iter(vars(self)) def __str__(self): result = f"{{\n 'role': '{self.role}',\n 'content': \"{self.content}\"\n}}" return result class Choices: def __init__(self): self.finish_reason: str = "stop" self.index: int = 0 self.message: Message = Message() def __getitem__(self, key): return getattr(self, key) def __setitem__(self, key, value): setattr(self, key, value) def __iter__(self): return iter(vars(self)) def __str__(self): result = f"{{\n 'finish_reason': '{self.finish_reason}',\n 'index': {self.index},\n 'message': {self.message}\n}}" return result class ModelResponse: def __init__(self): self.choices: List[Choices] = [Choices()] self.created: str = None self.model: str = None self.usage: Dict[str, Union[int, None]] = { "prompt_tokens": None, "completion_tokens": None, "total_tokens": None } def __getitem__(self, key): return getattr(self, key) def __setitem__(self, key, value): setattr(self, key, value) def __iter__(self): return iter(vars(self)) def __str__(self): choices_str = ",\n".join(str(choice) for choice in self.choices) result = f"{{\n 'choices': [\n{choices_str}\n ],\n 'created': {self.created},\n 'model': '{self.model}',\n 'usage': {self.usage}\n}}" return result ############################################################ def print_verbose(print_statement): if litellm.set_verbose: print(f"LiteLLM: {print_statement}") if random.random() <= 0.3: print("Get help - https://discord.com/invite/wuPM9dRgDw") ####### Package Import Handler ################### import importlib import subprocess def install_and_import(package: str): if package in globals().keys(): print_verbose(f"{package} has already been imported.") return try: # Import the module module = importlib.import_module(package) except ImportError: print_verbose(f"{package} is not installed. Installing...") subprocess.call([sys.executable, "-m", "pip", "install", package]) globals()[package] = importlib.import_module(package) # except VersionConflict as vc: # print_verbose(f"Detected version conflict for {package}. Upgrading...") # subprocess.call([sys.executable, "-m", "pip", "install", "--upgrade", package]) # globals()[package] = importlib.import_module(package) finally: if package not in globals().keys(): globals()[package] = importlib.import_module(package) ################################################## ####### LOGGING ################### #Logging function -> log the exact model details + what's being sent | Non-Blocking def logging(model=None, input=None, custom_llm_provider=None, azure=False, additional_args={}, logger_fn=None, exception=None): try: model_call_details = {} if model: model_call_details["model"] = model if azure: model_call_details["azure"] = azure if custom_llm_provider: model_call_details["custom_llm_provider"] = custom_llm_provider if exception: model_call_details["exception"] = exception if input: model_call_details["input"] = input if len(additional_args): model_call_details["additional_args"] = additional_args # log additional call details -> api key, etc. if model: if azure == True or model in litellm.open_ai_chat_completion_models or model in litellm.open_ai_chat_completion_models or model in litellm.open_ai_embedding_models: model_call_details["api_type"] = openai.api_type model_call_details["api_base"] = openai.api_base model_call_details["api_version"] = openai.api_version model_call_details["api_key"] = openai.api_key elif "replicate" in model: model_call_details["api_key"] = os.environ.get("REPLICATE_API_TOKEN") elif model in litellm.anthropic_models: model_call_details["api_key"] = os.environ.get("ANTHROPIC_API_KEY") elif model in litellm.cohere_models: model_call_details["api_key"] = os.environ.get("COHERE_API_KEY") ## User Logging -> if you pass in a custom logging function or want to use sentry breadcrumbs print_verbose(f"Logging Details: logger_fn - {logger_fn} | callable(logger_fn) - {callable(logger_fn)}") if logger_fn and callable(logger_fn): try: logger_fn(model_call_details) # Expectation: any logger function passed in by the user should accept a dict object except Exception as e: print(f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {traceback.format_exc()}") except Exception as e: print(f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {traceback.format_exc()}") pass ####### CLIENT ################### # make it easy to log if completion/embedding runs succeeded or failed + see what happened | Non-Blocking def client(original_function): def function_setup(*args, **kwargs): #just run once to check if user wants to send their data anywhere - PostHog/Sentry/Slack/etc. try: global callback_list, add_breadcrumb, user_logger_fn if (len(litellm.success_callback) > 0 or len(litellm.failure_callback) > 0) and len(callback_list) == 0: callback_list = list(set(litellm.success_callback + litellm.failure_callback)) set_callbacks(callback_list=callback_list,) if add_breadcrumb: add_breadcrumb( category="litellm.llm_call", message=f"Positional Args: {args}, Keyword Args: {kwargs}", level="info", ) if "logger_fn" in kwargs: user_logger_fn = kwargs["logger_fn"] except: # DO NOT BLOCK running the function because of this print_verbose(f"[Non-Blocking] {traceback.format_exc()}") pass def crash_reporting(*args, **kwargs): if litellm.telemetry: try: model = args[0] if len(args) > 0 else kwargs["model"] exception = kwargs["exception"] if "exception" in kwargs else None custom_llm_provider = kwargs["custom_llm_provider"] if "custom_llm_provider" in kwargs else None safe_crash_reporting(model=model, exception=exception, custom_llm_provider=custom_llm_provider) # log usage-crash details. Do not log any user details. If you want to turn this off, set `litellm.telemetry=False`. except: #[Non-Blocking Error] pass def get_prompt(*args, **kwargs): # make this safe checks, it should not throw any exceptions if len(args) > 1: messages = args[1] prompt = " ".join(message["content"] for message in messages) return prompt if "messages" in kwargs: messages = kwargs["messages"] prompt = " ".join(message["content"] for message in messages) return prompt return None def check_cache(*args, **kwargs): try: # never block execution prompt = get_prompt(*args, **kwargs) if prompt != None and prompt in local_cache: # check if messages / prompt exists result = local_cache[prompt] return result else: return None except: return None def add_cache(result, *args, **kwargs): try: # never block execution prompt = get_prompt(*args, **kwargs) local_cache[prompt] = result except: pass def wrapper(*args, **kwargs): start_time = None result = None try: function_setup(*args, **kwargs) ## MODEL CALL start_time = datetime.datetime.now() if litellm.caching and (cached_result := check_cache(*args, **kwargs)) is not None: result = cached_result else: result = original_function(*args, **kwargs) end_time = datetime.datetime.now() ## Add response to CACHE if litellm.caching: add_cache(result, *args, **kwargs) ## LOG SUCCESS crash_reporting(*args, **kwargs) my_thread = threading.Thread(target=handle_success, args=(args, kwargs, result, start_time, end_time)) # don't interrupt execution of main thread my_thread.start() return result except Exception as e: traceback_exception = traceback.format_exc() crash_reporting(*args, **kwargs, exception=traceback_exception) end_time = datetime.datetime.now() my_thread = threading.Thread(target=handle_failure, args=(e, traceback_exception, start_time, end_time, args, kwargs)) # don't interrupt execution of main thread my_thread.start() raise e return wrapper ####### USAGE CALCULATOR ################ def token_counter(model, text): # use tiktoken or anthropic's tokenizer depending on the model num_tokens = 0 if "claude" in model: install_and_import('anthropic') from anthropic import Anthropic, HUMAN_PROMPT, AI_PROMPT anthropic = Anthropic() num_tokens = anthropic.count_tokens(text) else: num_tokens = len(encoding.encode(text)) return num_tokens def cost_per_token(model="gpt-3.5-turbo", prompt_tokens = 0, completion_tokens = 0): ## given prompt_tokens_cost_usd_dollar = 0 completion_tokens_cost_usd_dollar = 0 model_cost_ref = litellm.model_cost if model in model_cost_ref: prompt_tokens_cost_usd_dollar = model_cost_ref[model]["input_cost_per_token"] * prompt_tokens completion_tokens_cost_usd_dollar = model_cost_ref[model]["output_cost_per_token"] * completion_tokens return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar else: # calculate average input cost input_cost_sum = 0 output_cost_sum = 0 model_cost_ref = litellm.model_cost for model in model_cost_ref: input_cost_sum += model_cost_ref[model]["input_cost_per_token"] output_cost_sum += model_cost_ref[model]["output_cost_per_token"] avg_input_cost = input_cost_sum / len(model_cost_ref.keys()) avg_output_cost = output_cost_sum / len(model_cost_ref.keys()) prompt_tokens_cost_usd_dollar = avg_input_cost * prompt_tokens completion_tokens_cost_usd_dollar = avg_output_cost * completion_tokens return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar def completion_cost(model="gpt-3.5-turbo", prompt="", completion=""): prompt_tokens = token_counter(model=model, text=prompt) completion_tokens = token_counter(model=model, text=completion) prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar = cost_per_token(model=model, prompt_tokens = prompt_tokens, completion_tokens = completion_tokens) return prompt_tokens_cost_usd_dollar + completion_tokens_cost_usd_dollar ####### HELPER FUNCTIONS ################ def get_litellm_params( return_async=False, api_key=None, force_timeout=600, azure=False, logger_fn=None, verbose=False, hugging_face=False, replicate=False, together_ai=False, custom_llm_provider=None, custom_api_base=None ): litellm_params = { "return_async": return_async, "api_key": api_key, "force_timeout": force_timeout, "logger_fn": logger_fn, "verbose": verbose, "custom_llm_provider": custom_llm_provider, "custom_api_base": custom_api_base } return litellm_params def get_optional_params( # 12 optional params functions = [], function_call = "", temperature = 1, top_p = 1, n = 1, stream = False, stop = None, max_tokens = float('inf'), presence_penalty = 0, frequency_penalty = 0, logit_bias = {}, user = "", deployment_id = None, model = None, custom_llm_provider = "", top_k = 40, ): optional_params = {} if model in litellm.anthropic_models: # handle anthropic params if stream: optional_params["stream"] = stream if stop != None: optional_params["stop_sequences"] = stop if temperature != 1: optional_params["temperature"] = temperature if top_p != 1: optional_params["top_p"] = top_p return optional_params elif model in litellm.cohere_models: # handle cohere params if stream: optional_params["stream"] = stream if temperature != 1: optional_params["temperature"] = temperature if max_tokens != float('inf'): optional_params["max_tokens"] = max_tokens return optional_params elif custom_llm_provider == "replicate": # any replicate models # TODO: handle translating remaining replicate params if stream: optional_params["stream"] = stream return optional_params elif custom_llm_provider == "together_ai": if stream: optional_params["stream_tokens"] = stream if temperature != 1: optional_params["temperature"] = temperature if top_p != 1: optional_params["top_p"] = top_p if max_tokens != float('inf'): optional_params["max_tokens"] = max_tokens if frequency_penalty != 0: optional_params["frequency_penalty"] = frequency_penalty elif model == "chat-bison": # chat-bison has diff args from chat-bison@001 ty Google if temperature != 1: optional_params["temperature"] = temperature if top_p != 1: optional_params["top_p"] = top_p if max_tokens != float('inf'): optional_params["max_output_tokens"] = max_tokens elif model in litellm.vertex_text_models: # required params for all text vertex calls # temperature=0.2, top_p=0.1, top_k=20 # always set temperature, top_p, top_k else, text bison fails optional_params["temperature"] = temperature optional_params["top_p"] = top_p optional_params["top_k"] = top_k else:# assume passing in params for openai/azure openai if functions != []: optional_params["functions"] = functions if function_call != "": optional_params["function_call"] = function_call if temperature != 1: optional_params["temperature"] = temperature if top_p != 1: optional_params["top_p"] = top_p if n != 1: optional_params["n"] = n if stream: optional_params["stream"] = stream if stop != None: optional_params["stop"] = stop if max_tokens != float('inf'): optional_params["max_tokens"] = max_tokens if presence_penalty != 0: optional_params["presence_penalty"] = presence_penalty if frequency_penalty != 0: optional_params["frequency_penalty"] = frequency_penalty if logit_bias != {}: optional_params["logit_bias"] = logit_bias if user != "": optional_params["user"] = user if deployment_id != None: optional_params["deployment_id"] = deployment_id return optional_params return optional_params def load_test_model(model: str, custom_llm_provider: str = None, custom_api_base: str = None, prompt: str = None, num_calls: int = None, force_timeout: int = None): test_prompt = "Hey, how's it going" test_calls = 100 if prompt: test_prompt = prompt if num_calls: test_calls = num_calls messages = [[{"role": "user", "content": test_prompt}] for _ in range(test_calls)] start_time = time.time() try: litellm.batch_completion(model=model, messages=messages, custom_llm_provider=custom_llm_provider, custom_api_base = custom_api_base, force_timeout=force_timeout) end_time = time.time() response_time = end_time - start_time return {"total_response_time": response_time, "calls_made": 100, "status": "success", "exception": None} except Exception as e: end_time = time.time() response_time = end_time - start_time return {"total_response_time": response_time, "calls_made": 100, "status": "failed", "exception": e} def set_callbacks(callback_list): global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient try: for callback in callback_list: if callback == "sentry": try: import sentry_sdk except ImportError: print_verbose("Package 'sentry_sdk' is missing. Installing it...") subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sentry_sdk']) import sentry_sdk sentry_sdk_instance = sentry_sdk sentry_trace_rate = os.environ.get("SENTRY_API_TRACE_RATE") if "SENTRY_API_TRACE_RATE" in os.environ else "1.0" sentry_sdk_instance.init(dsn=os.environ.get("SENTRY_API_URL"), traces_sample_rate=float(os.environ.get("SENTRY_API_TRACE_RATE"))) capture_exception = sentry_sdk_instance.capture_exception add_breadcrumb = sentry_sdk_instance.add_breadcrumb elif callback == "posthog": try: from posthog import Posthog except ImportError: print_verbose("Package 'posthog' is missing. Installing it...") subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'posthog']) from posthog import Posthog posthog = Posthog( project_api_key=os.environ.get("POSTHOG_API_KEY"), host=os.environ.get("POSTHOG_API_URL")) elif callback == "slack": try: from slack_bolt import App except ImportError: print_verbose("Package 'slack_bolt' is missing. Installing it...") subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'slack_bolt']) from slack_bolt import App slack_app = App( token=os.environ.get("SLACK_API_TOKEN"), signing_secret=os.environ.get("SLACK_API_SECRET") ) alerts_channel = os.environ["SLACK_API_CHANNEL"] print_verbose(f"Initialized Slack App: {slack_app}") elif callback == "helicone": heliconeLogger = HeliconeLogger() elif callback == "aispend": aispendLogger = AISpendLogger() elif callback == "berrispend": berrispendLogger = BerriSpendLogger() elif callback == "supabase": supabaseClient = Supabase() except Exception as e: raise e def handle_failure(exception, traceback_exception, start_time, end_time, args, kwargs): global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, aispendLogger, berrispendLogger try: # print_verbose(f"handle_failure args: {args}") # print_verbose(f"handle_failure kwargs: {kwargs}") success_handler = additional_details.pop("success_handler", None) failure_handler = additional_details.pop("failure_handler", None) additional_details["Event_Name"] = additional_details.pop("failed_event_name", "litellm.failed_query") print_verbose(f"self.failure_callback: {litellm.failure_callback}") # print_verbose(f"additional_details: {additional_details}") for callback in litellm.failure_callback: try: if callback == "slack": slack_msg = "" if len(kwargs) > 0: for key in kwargs: slack_msg += f"{key}: {kwargs[key]}\n" if len(args) > 0: for i, arg in enumerate(args): slack_msg += f"LiteLLM_Args_{str(i)}: {arg}" for detail in additional_details: slack_msg += f"{detail}: {additional_details[detail]}\n" slack_msg += f"Traceback: {traceback_exception}" slack_app.client.chat_postMessage(channel=alerts_channel, text=slack_msg) elif callback == "sentry": capture_exception(exception) elif callback == "posthog": print_verbose(f"inside posthog, additional_details: {len(additional_details.keys())}") ph_obj = {} if len(kwargs) > 0: ph_obj = kwargs if len(args) > 0: for i, arg in enumerate(args): ph_obj["litellm_args_" + str(i)] = arg for detail in additional_details: ph_obj[detail] = additional_details[detail] event_name = additional_details["Event_Name"] print_verbose(f"ph_obj: {ph_obj}") print_verbose(f"PostHog Event Name: {event_name}") if "user_id" in additional_details: posthog.capture(additional_details["user_id"], event_name, ph_obj) else: # PostHog calls require a unique id to identify a user - https://posthog.com/docs/libraries/python unique_id = str(uuid.uuid4()) posthog.capture(unique_id, event_name) print_verbose(f"successfully logged to PostHog!") elif callback == "berrispend": print_verbose("reaches berrispend for logging!") model = args[0] if len(args) > 0 else kwargs["model"] messages = args[1] if len(args) > 1 else kwargs["messages"] result = { "model": model, "created": time.time(), "error": traceback_exception, "usage": { "prompt_tokens": prompt_token_calculator(model, messages=messages), "completion_tokens": 0 } } berrispendLogger.log_event(model=model, messages=messages, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose) elif callback == "aispend": print_verbose("reaches aispend for logging!") model = args[0] if len(args) > 0 else kwargs["model"] messages = args[1] if len(args) > 1 else kwargs["messages"] result = { "model": model, "created": time.time(), "usage": { "prompt_tokens": prompt_token_calculator(model, messages=messages), "completion_tokens": 0 } } aispendLogger.log_event(model=model, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose) elif callback == "supabase": print_verbose("reaches supabase for logging!") model = args[0] if len(args) > 0 else kwargs["model"] messages = args[1] if len(args) > 1 else kwargs["messages"] result = { "model": model, "created": time.time(), "error": traceback_exception, "usage": { "prompt_tokens": prompt_token_calculator(model, messages=messages), "completion_tokens": 0 } } print(f"litellm._thread_context: {litellm._thread_context}") supabaseClient.log_event(model=model, messages=messages, end_user=litellm._thread_context.user, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose) except: print_verbose(f"Error Occurred while logging failure: {traceback.format_exc()}") pass if failure_handler and callable(failure_handler): call_details = { "exception": exception, "additional_details": additional_details } failure_handler(call_details) pass except Exception as e: ## LOGGING logging(logger_fn=user_logger_fn, exception=e) pass def handle_success(args, kwargs, result, start_time, end_time): global heliconeLogger, aispendLogger try: success_handler = additional_details.pop("success_handler", None) failure_handler = additional_details.pop("failure_handler", None) additional_details["Event_Name"] = additional_details.pop("successful_event_name", "litellm.succes_query") for callback in litellm.success_callback: try: if callback == "posthog": ph_obj = {} for detail in additional_details: ph_obj[detail] = additional_details[detail] event_name = additional_details["Event_Name"] if "user_id" in additional_details: posthog.capture(additional_details["user_id"], event_name, ph_obj) else: # PostHog calls require a unique id to identify a user - https://posthog.com/docs/libraries/python unique_id = str(uuid.uuid4()) posthog.capture(unique_id, event_name, ph_obj) pass elif callback == "slack": slack_msg = "" for detail in additional_details: slack_msg += f"{detail}: {additional_details[detail]}\n" slack_app.client.chat_postMessage(channel=alerts_channel, text=slack_msg) elif callback == "helicone": print_verbose("reaches helicone for logging!") model = args[0] if len(args) > 0 else kwargs["model"] messages = args[1] if len(args) > 1 else kwargs["messages"] heliconeLogger.log_success(model=model, messages=messages, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose) elif callback == "aispend": print_verbose("reaches aispend for logging!") model = args[0] if len(args) > 0 else kwargs["model"] aispendLogger.log_event(model=model, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose) elif callback == "berrispend": print_verbose("reaches berrispend for logging!") model = args[0] if len(args) > 0 else kwargs["model"] messages = args[1] if len(args) > 1 else kwargs["messages"] berrispendLogger.log_event(model=model, messages=messages, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose) elif callback == "supabase": print_verbose("reaches supabase for logging!") model = args[0] if len(args) > 0 else kwargs["model"] messages = args[1] if len(args) > 1 else kwargs["messages"] print(f"litellm._thread_context: {litellm._thread_context}") supabaseClient.log_event(model=model, messages=messages, end_user=litellm._thread_context.user, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose) except Exception as e: ## LOGGING logging(logger_fn=user_logger_fn, exception=e) print_verbose(f"[Non-Blocking] Success Callback Error - {traceback.format_exc()}") pass if success_handler and callable(success_handler): success_handler(args, kwargs) pass except Exception as e: ## LOGGING logging(logger_fn=user_logger_fn, exception=e) print_verbose(f"[Non-Blocking] Success Callback Error - {traceback.format_exc()}") pass def prompt_token_calculator(model, messages): # use tiktoken or anthropic's tokenizer depending on the model text = " ".join(message["content"] for message in messages) num_tokens = 0 if "claude" in model: install_and_import('anthropic') from anthropic import Anthropic, HUMAN_PROMPT, AI_PROMPT anthropic = Anthropic() num_tokens = anthropic.count_tokens(text) else: num_tokens = len(encoding.encode(text)) return num_tokens # integration helper function def modify_integration(integration_name, integration_params): global supabaseClient if integration_name == "supabase": if "table_name" in integration_params: Supabase.supabase_table_name = integration_params["table_name"] def exception_type(model, original_exception, custom_llm_provider): global user_logger_fn exception_mapping_worked = False try: if isinstance(original_exception, OriginalError): # Handle the OpenAIError exception_mapping_worked = True if custom_llm_provider == "azure": original_exception.llm_provider = "azure" else: original_exception.llm_provider = "openai" raise original_exception elif model: error_str = str(original_exception) if isinstance(original_exception, BaseException): exception_type = type(original_exception).__name__ else: exception_type = "" logging(model=model, additional_args={"error_str": error_str, "exception_type": exception_type, "original_exception": original_exception}, logger_fn=user_logger_fn) if "claude" in model: #one of the anthropics if hasattr(original_exception, "status_code"): print_verbose(f"status_code: {original_exception.status_code}") if original_exception.status_code == 401: exception_mapping_worked = True raise AuthenticationError(message=f"AnthropicException - {original_exception.message}", llm_provider="anthropic") elif original_exception.status_code == 400: exception_mapping_worked = True raise InvalidRequestError(message=f"AnthropicException - {original_exception.message}", model=model, llm_provider="anthropic") elif original_exception.status_code == 429: exception_mapping_worked = True raise RateLimitError(message=f"AnthropicException - {original_exception.message}", llm_provider="anthropic") elif "Could not resolve authentication method. Expected either api_key or auth_token to be set." in error_str: exception_mapping_worked = True raise AuthenticationError(message=f"AnthropicException - {original_exception.message}", llm_provider="anthropic") elif "replicate" in model: if "Incorrect authentication token" in error_str: exception_mapping_worked = True raise AuthenticationError(message=f"ReplicateException - {error_str}", llm_provider="replicate") elif exception_type == "ModelError": exception_mapping_worked = True raise InvalidRequestError(message=f"ReplicateException - {error_str}", model=model, llm_provider="replicate") elif "Request was throttled" in error_str: exception_mapping_worked = True raise RateLimitError(message=f"ReplicateException - {error_str}", llm_provider="replicate") elif exception_type == "ReplicateError": ## ReplicateError implies an error on Replicate server side, not user side raise ServiceUnavailableError(message=f"ReplicateException - {error_str}", llm_provider="replicate") elif model == "command-nightly": #Cohere if "invalid api token" in error_str or "No API key provided." in error_str: exception_mapping_worked = True raise AuthenticationError(message=f"CohereException - {original_exception.message}", llm_provider="cohere") elif "too many tokens" in error_str: exception_mapping_worked = True raise InvalidRequestError(message=f"CohereException - {original_exception.message}", model=model, llm_provider="cohere") elif "CohereConnectionError" in exception_type: # cohere seems to fire these errors when we load test it (1k+ messages / min) exception_mapping_worked = True raise RateLimitError(message=f"CohereException - {original_exception.message}", llm_provider="cohere") elif custom_llm_provider == "huggingface": if hasattr(original_exception, "status_code"): if original_exception.status_code == 401: exception_mapping_worked = True raise AuthenticationError(message=f"HuggingfaceException - {original_exception.message}", llm_provider="huggingface") elif original_exception.status_code == 400: exception_mapping_worked = True raise InvalidRequestError(message=f"HuggingfaceException - {original_exception.message}", model=model, llm_provider="huggingface") elif original_exception.status_code == 429: exception_mapping_worked = True raise RateLimitError(message=f"HuggingfaceException - {original_exception.message}", llm_provider="huggingface") raise original_exception # base case - return the original exception else: raise original_exception except Exception as e: ## LOGGING logging(logger_fn=user_logger_fn, additional_args={"exception_mapping_worked": exception_mapping_worked, "original_exception": original_exception}, exception=e) if exception_mapping_worked: raise e else: # don't let an error with mapping interrupt the user from receiving an error from the llm api calls raise original_exception def safe_crash_reporting(model=None, exception=None, custom_llm_provider=None): data = { "model": model, "exception": str(exception), "custom_llm_provider": custom_llm_provider } threading.Thread(target=litellm_telemetry, args=(data,)).start() def litellm_telemetry(data): # Load or generate the UUID uuid_file = 'litellm_uuid.txt' try: # Try to open the file and load the UUID with open(uuid_file, 'r') as file: uuid_value = file.read() if uuid_value: uuid_value = uuid_value.strip() else: raise FileNotFoundError except FileNotFoundError: # Generate a new UUID if the file doesn't exist or is empty new_uuid = uuid.uuid4() uuid_value = str(new_uuid) with open(uuid_file, 'w') as file: file.write(uuid_value) except: # [Non-Blocking Error] return try: # Prepare the data to send to litellm logging api payload = { 'uuid': uuid_value, 'data': data, 'version': pkg_resources.get_distribution("litellm").version } # Make the POST request to litellm logging api response = requests.post('https://litellm.berri.ai/logging', headers={"Content-Type": "application/json"}, json=payload) response.raise_for_status() # Raise an exception for HTTP errors except: # [Non-Blocking Error] return ######### Secret Manager ############################ # checks if user has passed in a secret manager client # if passed in then checks the secret there def get_secret(secret_name): if litellm.secret_manager_client != None: # TODO: check which secret manager is being used # currently only supports Infisical secret = litellm.secret_manager_client.get_secret(secret_name).secret_value if secret != None: return secret # if secret found in secret manager return it elif litellm.api_key != None: # if users use litellm default key return litellm.api_key else: return os.environ.get(secret_name) ######## Streaming Class ############################ # wraps the completion stream to return the correct format for the model # replicate/anthropic/cohere class CustomStreamWrapper: def __init__(self, completion_stream, model, custom_llm_provider=None): self.model = model self.custom_llm_provider = custom_llm_provider if model in litellm.cohere_models: # cohere does not return an iterator, so we need to wrap it in one self.completion_stream = iter(completion_stream) elif model == "together_ai": self.completion_stream = iter(completion_stream) else: self.completion_stream = completion_stream def __iter__(self): return self def handle_anthropic_chunk(self, chunk): str_line = chunk.decode('utf-8') # Convert bytes to string if str_line.startswith('data:'): data_json = json.loads(str_line[5:]) return data_json.get("completion", "") return "" def handle_together_ai_chunk(self, chunk): chunk = chunk.decode("utf-8") text_index = chunk.find('"text":"') # this checks if text: exists text_start = text_index + len('"text":"') text_end = chunk.find('"}', text_start) if text_index != -1 and text_end != -1: extracted_text = chunk[text_start:text_end] return extracted_text else: return "" def handle_huggingface_chunk(self, chunk): chunk = chunk.decode("utf-8") if chunk.startswith('data:'): data_json = json.loads(chunk[5:]) if "token" in data_json and "text" in data_json["token"]: return data_json["token"]["text"] else: return "" return "" def __next__(self): completion_obj ={ "role": "assistant", "content": ""} if self.model in litellm.anthropic_models: chunk = next(self.completion_stream) completion_obj["content"] = self.handle_anthropic_chunk(chunk) elif self.model == "replicate": chunk = next(self.completion_stream) completion_obj["content"] = chunk elif self.model == "together_ai": chunk = next(self.completion_stream) text_data = self.handle_together_ai_chunk(chunk) if text_data == "": return self.__next__() completion_obj["content"] = text_data elif self.model in litellm.cohere_models: chunk = next(self.completion_stream) completion_obj["content"] = chunk.text elif self.custom_llm_provider and self.custom_llm_provider == "huggingface": chunk = next(self.completion_stream) completion_obj["content"] = self.handle_huggingface_chunk(chunk) # return this for all models return {"choices": [{"delta": completion_obj}]} ########## Reading Config File ############################ def read_config_args(config_path): try: import os current_path = os.getcwd() with open(config_path, "r") as config_file: config = json.load(config_file) # read keys/ values from config file and return them return config except Exception as e: print("An error occurred while reading config:", str(e)) raise e ########## ollama implementation ############################ import aiohttp async def get_ollama_response_stream(api_base="http://localhost:11434", model="llama2", prompt="Why is the sky blue?"): session = aiohttp.ClientSession() url = f'{api_base}/api/generate' data = { "model": model, "prompt": prompt, } try: async with session.post(url, json=data) as resp: async for line in resp.content.iter_any(): if line: try: json_chunk = line.decode("utf-8") chunks = json_chunk.split("\n") for chunk in chunks: if chunk.strip() != "": j = json.loads(chunk) if "response" in j: completion_obj ={ "role": "assistant", "content": ""} completion_obj["content"] = j["response"] yield {"choices": [{"delta": completion_obj}]} # self.responses.append(j["response"]) # yield "blank" except Exception as e: print(f"Error decoding JSON: {e}") finally: await session.close() async def stream_to_string(generator): response = "" async for chunk in generator: response += chunk["content"] return response ########## Together AI streaming ############################# async def together_ai_completion_streaming(json_data, headers): session = aiohttp.ClientSession() url = 'https://api.together.xyz/inference' # headers = { # 'Authorization': f'Bearer {together_ai_token}', # 'Content-Type': 'application/json' # } # data = { # "model": "togethercomputer/llama-2-70b-chat", # "prompt": "write 1 page on the topic of the history of the united state", # "max_tokens": 1000, # "temperature": 0.7, # "top_p": 0.7, # "top_k": 50, # "repetition_penalty": 1, # "stream_tokens": True # } try: async with session.post(url, json=json_data, headers=headers) as resp: async for line in resp.content.iter_any(): # print(line) if line: try: json_chunk = line.decode("utf-8") json_string = json_chunk.split('data: ')[1] # Convert the JSON string to a dictionary data_dict = json.loads(json_string) completion_response = data_dict['choices'][0]['text'] completion_obj ={ "role": "assistant", "content": ""} completion_obj["content"] = completion_response yield {"choices": [{"delta": completion_obj}]} except: pass finally: await session.close()