Merge pull request #23 from BerriAI/code-clean-up

Code clean up
This commit is contained in:
Krish Dholakia 2023-07-31 18:35:16 -07:00 committed by GitHub
commit 557ba4b139
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 339 additions and 283 deletions

BIN
.DS_Store vendored

Binary file not shown.

View file

@ -1 +1,2 @@
__version__ = "1.0.0"
from .main import * # Import all the symbols from main.py from .main import * # Import all the symbols from main.py

View file

@ -1 +1,30 @@
success_callback = []
failure_callback = []
set_verbose=False
####### COMPLETION MODELS ###################
open_ai_chat_completion_models = [
'gpt-3.5-turbo',
'gpt-4'
]
open_ai_text_completion_models = [
'text-davinci-003'
]
cohere_models = [
'command-nightly',
]
anthropic_models = [
"claude-2",
"claude-instant-1"
]
####### EMBEDDING MODELS ###################
open_ai_embedding_models = [
'text-embedding-ada-002'
]
from .utils import client, logging # Import all the symbols from main.py
from .main import * # Import all the symbols from main.py from .main import * # Import all the symbols from main.py

Binary file not shown.

Binary file not shown.

View file

@ -2,85 +2,16 @@ import os, openai, cohere, replicate, sys
from typing import Any from typing import Any
from func_timeout import func_set_timeout, FunctionTimedOut from func_timeout import func_set_timeout, FunctionTimedOut
from anthropic import Anthropic, HUMAN_PROMPT, AI_PROMPT from anthropic import Anthropic, HUMAN_PROMPT, AI_PROMPT
import json
import traceback import traceback
import threading
import dotenv import dotenv
import traceback import traceback
import subprocess import litellm
import uuid from litellm import client, logging
from litellm import success_callback, failure_callback
import random
####### ENVIRONMENT VARIABLES ################### ####### ENVIRONMENT VARIABLES ###################
dotenv.load_dotenv() # Loading env variables using dotenv dotenv.load_dotenv() # Loading env variables using dotenv
set_verbose = False
sentry_sdk_instance = None
capture_exception = None
add_breadcrumb = None
posthog = None
slack_app = None
alerts_channel = None
success_callback = []
failure_callback = []
callback_list = []
user_logger_fn = None
additional_details = {}
## Set verbose to true -> ```litellm.verbose = True```
def print_verbose(print_statement):
if set_verbose:
print(f"LiteLLM: {print_statement}")
print("Get help - https://discord.com/invite/wuPM9dRgDw")
####### COMPLETION MODELS ###################
open_ai_chat_completion_models = [
'gpt-3.5-turbo',
'gpt-4'
]
open_ai_text_completion_models = [
'text-davinci-003'
]
cohere_models = [
'command-nightly',
]
anthropic_models = [
"claude-2",
"claude-instant-1"
]
####### EMBEDDING MODELS ###################
open_ai_embedding_models = [
'text-embedding-ada-002'
]
####### CLIENT ################### make it easy to log completion/embedding runs
def client(original_function):
def function_setup(): #just run once to check if user wants to send their data anywhere
try:
if len(success_callback) > 0 or len(failure_callback) > 0 and len(callback_list) == 0:
callback_list = list(set(success_callback + failure_callback))
set_callbacks(callback_list=callback_list)
except: # DO NOT BLOCK running the function because of this
print_verbose(f"[Non-Blocking] {traceback.format_exc()}")
pass
def wrapper(*args, **kwargs):
# Code to be executed before the embedding function
try:
function_setup()
## EMBEDDING CALL
result = original_function(*args, **kwargs)
## LOG SUCCESS
my_thread = threading.Thread(target=handle_success, args=(args, kwargs)) # don't interrupt execution of main thread
my_thread.start()
return result
except Exception as e:
traceback_exception = traceback.format_exc()
my_thread = threading.Thread(target=handle_failure, args=(e, traceback.format_exc(), args, kwargs)) # don't interrupt execution of main thread
my_thread.start()
raise e
return wrapper
def get_optional_params( def get_optional_params(
@ -159,7 +90,7 @@ def completion(
messages = messages, messages = messages,
**optional_params **optional_params
) )
elif model in open_ai_chat_completion_models: elif model in litellm.open_ai_chat_completion_models:
openai.api_type = "openai" openai.api_type = "openai"
openai.api_base = "https://api.openai.com/v1" openai.api_base = "https://api.openai.com/v1"
openai.api_version = None openai.api_version = None
@ -173,7 +104,7 @@ def completion(
messages = messages, messages = messages,
**optional_params **optional_params
) )
elif model in open_ai_text_completion_models: elif model in litellm.open_ai_text_completion_models:
openai.api_type = "openai" openai.api_type = "openai"
openai.api_base = "https://api.openai.com/v1" openai.api_base = "https://api.openai.com/v1"
openai.api_version = None openai.api_version = None
@ -219,7 +150,7 @@ def completion(
] ]
} }
response = new_response response = new_response
elif model in anthropic_models: elif model in litellm.anthropic_models:
#anthropic defaults to os.environ.get("ANTHROPIC_API_KEY") #anthropic defaults to os.environ.get("ANTHROPIC_API_KEY")
prompt = f"{HUMAN_PROMPT}" prompt = f"{HUMAN_PROMPT}"
for message in messages: for message in messages:
@ -259,7 +190,7 @@ def completion(
} }
print_verbose(f"new response: {new_response}") print_verbose(f"new response: {new_response}")
response = new_response response = new_response
elif model in cohere_models: elif model in litellm.cohere_models:
cohere_key = os.environ.get("COHERE_API_KEY") cohere_key = os.environ.get("COHERE_API_KEY")
co = cohere.Client(cohere_key) co = cohere.Client(cohere_key)
prompt = " ".join([message["content"] for message in messages]) prompt = " ".join([message["content"] for message in messages])
@ -283,8 +214,36 @@ def completion(
], ],
} }
response = new_response response = new_response
elif model in litellm.open_ai_chat_completion_models:
openai.api_type = "openai"
openai.api_base = "https://api.openai.com/v1"
openai.api_version = None
openai.api_key = os.environ.get("OPENAI_API_KEY")
## LOGGING
logging(model=model, input=messages, azure=azure, logger_fn=logger_fn)
## COMPLETION CALL
response = openai.ChatCompletion.create(
model=model,
messages = messages
)
elif model in litellm.open_ai_text_completion_models:
openai.api_type = "openai"
openai.api_base = "https://api.openai.com/v1"
openai.api_version = None
openai.api_key = os.environ.get("OPENAI_API_KEY")
prompt = " ".join([message["content"] for message in messages])
## LOGGING
logging(model=model, input=prompt, azure=azure, logger_fn=logger_fn)
## COMPLETION CALL
response = openai.Completion.create(
model=model,
prompt = prompt
)
else: else:
raise Exception(f"Model '{model}' not found. Please check your model name and try again.") logging(model=model, input=messages, azure=azure, logger_fn=logger_fn)
args = locals()
raise ValueError(f"No valid completion model args passed in - {args}")
return response return response
except Exception as e: except Exception as e:
logging(model=model, input=messages, azure=azure, additional_args={"max_tokens": max_tokens}, logger_fn=logger_fn) logging(model=model, input=messages, azure=azure, additional_args={"max_tokens": max_tokens}, logger_fn=logger_fn)
@ -307,7 +266,7 @@ def embedding(model, input=[], azure=False, forceTimeout=60, logger_fn=None):
## EMBEDDING CALL ## EMBEDDING CALL
response = openai.Embedding.create(input=input, engine=model) response = openai.Embedding.create(input=input, engine=model)
print_verbose(f"response_value: {str(response)[:50]}") print_verbose(f"response_value: {str(response)[:50]}")
elif model in open_ai_embedding_models: elif model in litellm.open_ai_embedding_models:
openai.api_type = "openai" openai.api_type = "openai"
openai.api_base = "https://api.openai.com/v1" openai.api_base = "https://api.openai.com/v1"
openai.api_version = None openai.api_version = None
@ -324,180 +283,11 @@ def embedding(model, input=[], azure=False, forceTimeout=60, logger_fn=None):
return response return response
####### HELPER FUNCTIONS ################ ####### HELPER FUNCTIONS ################
## Set verbose to true -> ```litellm.set_verbose = True```
def print_verbose(print_statement):
if litellm.set_verbose:
print(f"LiteLLM: {print_statement}")
if random.random() <= 0.3:
print("Get help - https://discord.com/invite/wuPM9dRgDw")
def set_callbacks(callback_list):
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel
for callback in callback_list:
if callback == "sentry":
try:
import sentry_sdk
except ImportError:
print_verbose("Package 'sentry_sdk' is missing. Installing it...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sentry_sdk'])
import sentry_sdk
sentry_sdk_instance = sentry_sdk
sentry_sdk_instance.init(dsn=os.environ.get("SENTRY_API_URL"), traces_sample_rate=float(os.environ.get("SENTRY_API_TRACE_RATE")))
capture_exception = sentry_sdk_instance.capture_exception
add_breadcrumb = sentry_sdk_instance.add_breadcrumb
elif callback == "posthog":
try:
from posthog import Posthog
except ImportError:
print_verbose("Package 'posthog' is missing. Installing it...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'posthog'])
from posthog import Posthog
posthog = Posthog(
project_api_key=os.environ.get("POSTHOG_API_KEY"),
host=os.environ.get("POSTHOG_API_URL"))
elif callback == "slack":
try:
from slack_bolt import App
except ImportError:
print_verbose("Package 'slack_bolt' is missing. Installing it...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'slack_bolt'])
from slack_bolt import App
slack_app = App(
token=os.environ.get("SLACK_API_TOKEN"),
signing_secret=os.environ.get("SLACK_API_SECRET")
)
alerts_channel = os.environ["SLACK_API_CHANNEL"]
print_verbose(f"Initialized Slack App: {slack_app}")
def handle_failure(exception, traceback_exception, args, kwargs):
print_verbose(f"handle_failure args: {args}")
print_verbose(f"handle_failure kwargs: {kwargs}")
success_handler = additional_details.pop("success_handler", None)
failure_handler = additional_details.pop("failure_handler", None)
additional_details["Event_Name"] = additional_details.pop("failed_event_name", "litellm.failed_query")
print_verbose(f"self.failure_callback: {failure_callback}")
print_verbose(f"additional_details: {additional_details}")
for callback in failure_callback:
try:
if callback == "slack":
slack_msg = ""
if len(kwargs) > 0:
for key in kwargs:
slack_msg += f"{key}: {kwargs[key]}\n"
if len(args) > 0:
for i, arg in enumerate(args):
slack_msg += f"LiteLLM_Args_{str(i)}: {arg}"
for detail in additional_details:
slack_msg += f"{detail}: {additional_details[detail]}\n"
slack_msg += f"Traceback: {traceback_exception}"
print_verbose(f"This is the slack message: {slack_msg}")
slack_app.client.chat_postMessage(channel=alerts_channel, text=slack_msg)
elif callback == "sentry":
capture_exception(exception)
elif callback == "posthog":
print_verbose(f"inside posthog, additional_details: {len(additional_details.keys())}")
ph_obj = {}
if len(kwargs) > 0:
ph_obj = kwargs
if len(args) > 0:
for i, arg in enumerate(args):
ph_obj["litellm_args_" + str(i)] = arg
print_verbose(f"ph_obj: {ph_obj}")
for detail in additional_details:
ph_obj[detail] = additional_details[detail]
event_name = additional_details["Event_Name"]
print_verbose(f"PostHog Event Name: {event_name}")
if "user_id" in additional_details:
posthog.capture(additional_details["user_id"], event_name, ph_obj)
else: # PostHog calls require a unique id to identify a user - https://posthog.com/docs/libraries/python
print(f"ph_obj: {ph_obj})")
unique_id = str(uuid.uuid4())
posthog.capture(unique_id, event_name)
print_verbose(f"successfully logged to PostHog!")
except:
print_verbose(f"Error Occurred while logging failure: {traceback.format_exc()}")
pass
if failure_handler and callable(failure_handler):
call_details = {
"exception": exception,
"additional_details": additional_details
}
failure_handler(call_details)
pass
def handle_input(model_call_details={}):
if len(model_call_details.keys()) > 0:
model = model_call_details["model"] if "model" in model_call_details else None
if model:
for callback in callback_list:
if callback == "sentry": # add a sentry breadcrumb if user passed in sentry integration
add_breadcrumb(
category=f'{model}',
message='Trying request model {} input {}'.format(model, json.dumps(model_call_details)),
level='info',
)
if user_logger_fn and callable(user_logger_fn):
user_logger_fn(model_call_details)
pass
def handle_success(*args, **kwargs):
success_handler = additional_details.pop("success_handler", None)
failure_handler = additional_details.pop("failure_handler", None)
additional_details["Event_Name"] = additional_details.pop("successful_event_name", "litellm.succes_query")
for callback in success_callback:
try:
if callback == "posthog":
ph_obj = {}
for detail in additional_details:
ph_obj[detail] = additional_details[detail]
event_name = additional_details["Event_Name"]
if "user_id" in additional_details:
posthog.capture(additional_details["user_id"], event_name, ph_obj)
else: # PostHog calls require a unique id to identify a user - https://posthog.com/docs/libraries/python
unique_id = str(uuid.uuid4())
posthog.capture(unique_id, event_name, ph_obj)
pass
elif callback == "slack":
slack_msg = ""
for detail in additional_details:
slack_msg += f"{detail}: {additional_details[detail]}\n"
slack_app.client.chat_postMessage(channel=alerts_channel, text=slack_msg)
except:
pass
if success_handler and callable(success_handler):
success_handler(args, kwargs)
pass
#Logging function -> log the exact model details + what's being sent | Non-Blocking
def logging(model, input, azure=False, additional_args={}, logger_fn=None):
try:
model_call_details = {}
model_call_details["model"] = model
model_call_details["input"] = input
model_call_details["azure"] = azure
# log additional call details -> api key, etc.
if azure == True or model in open_ai_chat_completion_models or model in open_ai_chat_completion_models or model in open_ai_embedding_models:
model_call_details["api_type"] = openai.api_type
model_call_details["api_base"] = openai.api_base
model_call_details["api_version"] = openai.api_version
model_call_details["api_key"] = openai.api_key
elif "replicate" in model:
model_call_details["api_key"] = os.environ.get("REPLICATE_API_TOKEN")
elif model in anthropic_models:
model_call_details["api_key"] = os.environ.get("ANTHROPIC_API_KEY")
elif model in cohere_models:
model_call_details["api_key"] = os.environ.get("COHERE_API_KEY")
model_call_details["additional_args"] = additional_args
## Logging
print_verbose(f"Basic model call details: {model_call_details}")
if logger_fn and callable(logger_fn):
try:
logger_fn(model_call_details) # Expectation: any logger function passed in by the user should accept a dict object
except:
print_verbose(f"[Non-Blocking] Exception occurred while logging {traceback.format_exc()}")
pass
except:
pass

View file

@ -1,3 +1,7 @@
#### What this tests ####
# This tests chaos monkeys - if random parts of the system are broken / things aren't sent correctly - what happens.
# Expect to add more edge cases to this over time.
import sys, os import sys, os
import traceback import traceback
@ -11,11 +15,11 @@ parent_dir = os.path.join(current_dir, '..')
sys.path.append(parent_dir) sys.path.append(parent_dir)
import main import main
from main import embedding, completion from main import embedding, completion, set_verbose
main.success_callback = ["posthog"] main.success_callback = ["posthog"]
main.failure_callback = ["slack", "sentry", "posthog"] main.failure_callback = ["slack", "sentry", "posthog"]
main.set_verbose = True set_verbose(True)
user_message = "Hello, how are you?" user_message = "Hello, how are you?"
messages = [{ "content": user_message,"role": "user"}] messages = [{ "content": user_message,"role": "user"}]

View file

@ -1,12 +1,16 @@
#### What this tests ####
# This tests error logging (with custom user functions) for the `completion` + `embedding` endpoints w/ callbacks
import sys, os import sys, os
import traceback import traceback
sys.path.append('..') # Adds the parent directory to the system path sys.path.insert(0, os.path.abspath('../..')) # Adds the parent directory to the system path
import main import litellm
from main import embedding, completion from litellm import embedding, completion
main.success_callback = ["posthog"]
main.failure_callback = ["slack", "sentry", "posthog"]
# main.set_verbose = True litellm.success_callback = ["posthog"]
litellm.failure_callback = ["slack", "sentry", "posthog"]
# litellm.set_verbose = True
def logger_fn(model_call_object: dict): def logger_fn(model_call_object: dict):
# print(f"model call details: {model_call_object}") # print(f"model call details: {model_call_object}")

View file

@ -1,10 +1,10 @@
import sys, os import sys, os
import traceback import traceback
sys.path.append('..') # Adds the parent directory to the system path sys.path.insert(0, os.path.abspath('../..')) # Adds the parent directory to the system path
import main import litellm
from main import completion from litellm import embedding, completion
main.set_verbose = True litellm.set_verbose = True
user_message = "Hello, whats the weather in San Francisco??" user_message = "Hello, whats the weather in San Francisco??"
messages = [{ "content": user_message,"role": "user"}] messages = [{ "content": user_message,"role": "user"}]

View file

@ -1,10 +1,13 @@
#### What this tests ####
# This tests error logging (with custom user functions) for the raw `completion` + `embedding` endpoints
import sys, os import sys, os
import traceback import traceback
sys.path.append('..') # Adds the parent directory to the system path sys.path.insert(0, os.path.abspath('../..')) # Adds the parent directory to the system path
import main import litellm
from main import completion, embedding from litellm import embedding, completion
main.verbose = True ## Replace to: ```litellm.verbose = True``` when using pypi package litellm.set_verbose = True
def logger_fn(model_call_object: dict): def logger_fn(model_call_object: dict):
print(f"model call details: {model_call_object}") print(f"model call details: {model_call_object}")

View file

@ -1,25 +1,26 @@
#### What this tests ####
# This tests error handling + logging (esp. for sentry breadcrumbs)
import sys, os import sys, os
import traceback import traceback
sys.path.append('..') # Adds the parent directory to the system path sys.path.insert(0, os.path.abspath('../..')) # Adds the parent directory to the system path
import main import litellm
from main import embedding, completion from litellm import embedding, completion
main.success_callback = ["posthog"]
main.failure_callback = ["slack", "sentry", "posthog"]
main.set_verbose = True litellm.success_callback = ["posthog"]
litellm.failure_callback = ["slack", "sentry", "posthog"]
model_fallback_list = ["replicate/llama-2-70b-chat:2c1608e18606fad2812020dc541930f2d0495ce32eee50074220b87300bc16e1", "claude-instant-1", "gpt-3.5-turbo"] litellm.set_verbose = True
model_fallback_list = ["replicate/llama-2-70b-chat:2c1608e18606fad2812020dc541930f2d0495ce32eee50074220b87300bc16e1", "replicate/llama-2-70b-chat:2c1608e18606fad2812020dc541930f2d0495ce32eee50074220b87300bc16e1", "chatgpt-test"]
user_message = "Hello, how are you?" user_message = "Hello, how are you?"
messages = [{ "content": user_message,"role": "user"}] messages = [{ "content": user_message,"role": "user"}]
# for _ in range(10):
for model in model_fallback_list: for model in model_fallback_list:
try: try:
response = embedding(model="text-embedding-ada-002", input=[user_message])
response = completion(model=model, messages=messages) response = completion(model=model, messages=messages)
print(response) print(response)
if response != None:
break
except Exception as e: except Exception as e:
print(f"error occurred: {traceback.format_exc()}") print(f"error occurred: {traceback.format_exc()}")
raise e

224
litellm/utils.py Normal file
View file

@ -0,0 +1,224 @@
import dotenv
import json
import traceback
import threading
import traceback
import subprocess
import uuid
import litellm
import os
import openai
import random
####### ENVIRONMENT VARIABLES ###################
dotenv.load_dotenv() # Loading env variables using dotenv
sentry_sdk_instance = None
capture_exception = None
add_breadcrumb = None
posthog = None
slack_app = None
alerts_channel = None
callback_list = []
user_logger_fn = None
additional_details = {}
def print_verbose(print_statement):
if litellm.set_verbose:
print(f"LiteLLM: {print_statement}")
if random.random() <= 0.3:
print("Get help - https://discord.com/invite/wuPM9dRgDw")
####### LOGGING ###################
#Logging function -> log the exact model details + what's being sent | Non-Blocking
def logging(model, input, azure=False, additional_args={}, logger_fn=None):
try:
model_call_details = {}
model_call_details["model"] = model
model_call_details["input"] = input
model_call_details["azure"] = azure
# log additional call details -> api key, etc.
if azure == True or model in litellm.open_ai_chat_completion_models or model in litellm.open_ai_chat_completion_models or model in litellm.open_ai_embedding_models:
model_call_details["api_type"] = openai.api_type
model_call_details["api_base"] = openai.api_base
model_call_details["api_version"] = openai.api_version
model_call_details["api_key"] = openai.api_key
elif "replicate" in model:
model_call_details["api_key"] = os.environ.get("REPLICATE_API_TOKEN")
elif model in litellm.anthropic_models:
model_call_details["api_key"] = os.environ.get("ANTHROPIC_API_KEY")
elif model in litellm.cohere_models:
model_call_details["api_key"] = os.environ.get("COHERE_API_KEY")
model_call_details["additional_args"] = additional_args
## User Logging -> if you pass in a custom logging function or want to use sentry breadcrumbs
print_verbose(f"Basic model call details: {model_call_details}")
if logger_fn and callable(logger_fn):
try:
logger_fn(model_call_details) # Expectation: any logger function passed in by the user should accept a dict object
except:
print_verbose(f"[Non-Blocking] Exception occurred while logging {traceback.format_exc()}")
except:
traceback.print_exc()
pass
####### CLIENT ###################
# make it easy to log if completion/embedding runs succeeded or failed + see what happened | Non-Blocking
def client(original_function):
def function_setup(*args, **kwargs): #just run once to check if user wants to send their data anywhere - PostHog/Sentry/Slack/etc.
try:
global callback_list, add_breadcrumb
if (len(litellm.success_callback) > 0 or len(litellm.failure_callback) > 0) and len(callback_list) == 0:
callback_list = list(set(litellm.success_callback + litellm.failure_callback))
set_callbacks(callback_list=callback_list)
if add_breadcrumb:
add_breadcrumb(
category="litellm.llm_call",
message=f"Positional Args: {args}, Keyword Args: {kwargs}",
level="info",
)
except: # DO NOT BLOCK running the function because of this
print_verbose(f"[Non-Blocking] {traceback.format_exc()}")
pass
def wrapper(*args, **kwargs):
try:
function_setup(args, kwargs)
## MODEL CALL
result = original_function(*args, **kwargs)
## LOG SUCCESS
my_thread = threading.Thread(target=handle_success, args=(args, kwargs)) # don't interrupt execution of main thread
my_thread.start()
return result
except Exception as e:
traceback_exception = traceback.format_exc()
my_thread = threading.Thread(target=handle_failure, args=(e, traceback_exception, args, kwargs)) # don't interrupt execution of main thread
my_thread.start()
raise e
return wrapper
####### HELPER FUNCTIONS ################
def set_callbacks(callback_list):
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel
for callback in callback_list:
if callback == "sentry":
try:
import sentry_sdk
except ImportError:
print_verbose("Package 'sentry_sdk' is missing. Installing it...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sentry_sdk'])
import sentry_sdk
sentry_sdk_instance = sentry_sdk
sentry_sdk_instance.init(dsn=os.environ.get("SENTRY_API_URL"), traces_sample_rate=float(os.environ.get("SENTRY_API_TRACE_RATE")))
capture_exception = sentry_sdk_instance.capture_exception
add_breadcrumb = sentry_sdk_instance.add_breadcrumb
elif callback == "posthog":
try:
from posthog import Posthog
except ImportError:
print_verbose("Package 'posthog' is missing. Installing it...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'posthog'])
from posthog import Posthog
posthog = Posthog(
project_api_key=os.environ.get("POSTHOG_API_KEY"),
host=os.environ.get("POSTHOG_API_URL"))
elif callback == "slack":
try:
from slack_bolt import App
except ImportError:
print_verbose("Package 'slack_bolt' is missing. Installing it...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'slack_bolt'])
from slack_bolt import App
slack_app = App(
token=os.environ.get("SLACK_API_TOKEN"),
signing_secret=os.environ.get("SLACK_API_SECRET")
)
alerts_channel = os.environ["SLACK_API_CHANNEL"]
print_verbose(f"Initialized Slack App: {slack_app}")
def handle_failure(exception, traceback_exception, args, kwargs):
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel
print_verbose(f"handle_failure args: {args}")
print_verbose(f"handle_failure kwargs: {kwargs}")
success_handler = additional_details.pop("success_handler", None)
failure_handler = additional_details.pop("failure_handler", None)
additional_details["Event_Name"] = additional_details.pop("failed_event_name", "litellm.failed_query")
print_verbose(f"self.failure_callback: {litellm.failure_callback}")
print_verbose(f"additional_details: {additional_details}")
for callback in litellm.failure_callback:
try:
if callback == "slack":
slack_msg = ""
if len(kwargs) > 0:
for key in kwargs:
slack_msg += f"{key}: {kwargs[key]}\n"
if len(args) > 0:
for i, arg in enumerate(args):
slack_msg += f"LiteLLM_Args_{str(i)}: {arg}"
for detail in additional_details:
slack_msg += f"{detail}: {additional_details[detail]}\n"
slack_msg += f"Traceback: {traceback_exception}"
slack_app.client.chat_postMessage(channel=alerts_channel, text=slack_msg)
elif callback == "sentry":
capture_exception(exception)
elif callback == "posthog":
print_verbose(f"inside posthog, additional_details: {len(additional_details.keys())}")
ph_obj = {}
if len(kwargs) > 0:
ph_obj = kwargs
if len(args) > 0:
for i, arg in enumerate(args):
ph_obj["litellm_args_" + str(i)] = arg
for detail in additional_details:
ph_obj[detail] = additional_details[detail]
event_name = additional_details["Event_Name"]
print_verbose(f"ph_obj: {ph_obj}")
print_verbose(f"PostHog Event Name: {event_name}")
if "user_id" in additional_details:
posthog.capture(additional_details["user_id"], event_name, ph_obj)
else: # PostHog calls require a unique id to identify a user - https://posthog.com/docs/libraries/python
unique_id = str(uuid.uuid4())
posthog.capture(unique_id, event_name)
print_verbose(f"successfully logged to PostHog!")
except:
print_verbose(f"Error Occurred while logging failure: {traceback.format_exc()}")
pass
if failure_handler and callable(failure_handler):
call_details = {
"exception": exception,
"additional_details": additional_details
}
failure_handler(call_details)
pass
def handle_success(*args, **kwargs):
success_handler = additional_details.pop("success_handler", None)
failure_handler = additional_details.pop("failure_handler", None)
additional_details["Event_Name"] = additional_details.pop("successful_event_name", "litellm.succes_query")
for callback in litellm.success_callback:
try:
if callback == "posthog":
ph_obj = {}
for detail in additional_details:
ph_obj[detail] = additional_details[detail]
event_name = additional_details["Event_Name"]
if "user_id" in additional_details:
posthog.capture(additional_details["user_id"], event_name, ph_obj)
else: # PostHog calls require a unique id to identify a user - https://posthog.com/docs/libraries/python
unique_id = str(uuid.uuid4())
posthog.capture(unique_id, event_name, ph_obj)
pass
elif callback == "slack":
slack_msg = ""
for detail in additional_details:
slack_msg += f"{detail}: {additional_details[detail]}\n"
slack_app.client.chat_postMessage(channel=alerts_channel, text=slack_msg)
except:
pass
if success_handler and callable(success_handler):
success_handler(args, kwargs)
pass