(fixes) gcs bucket key based logging (#6044)

* fixes for gcs bucket logging

* fix StandardCallbackDynamicParams

* fix - gcs logging when payload is not serializable

* add test_add_callback_via_key_litellm_pre_call_utils_gcs_bucket

* working success callbacks

* linting fixes

* fix linting error

* add type hints to functions

* fixes for dynamic success and failure logging

* fix for test_async_chat_openai_stream
This commit is contained in:
Ishaan Jaff 2024-10-04 11:56:10 +05:30 committed by GitHub
parent 793593e735
commit 670ecda4e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 446 additions and 39 deletions

View file

@ -192,16 +192,28 @@ class Logging:
def __init__(
self,
model,
model: str,
messages,
stream,
call_type,
start_time,
litellm_call_id,
function_id,
dynamic_success_callbacks=None,
dynamic_failure_callbacks=None,
dynamic_async_success_callbacks=None,
litellm_call_id: str,
function_id: str,
dynamic_input_callbacks: Optional[
List[Union[str, Callable, CustomLogger]]
] = None,
dynamic_success_callbacks: Optional[
List[Union[str, Callable, CustomLogger]]
] = None,
dynamic_async_success_callbacks: Optional[
List[Union[str, Callable, CustomLogger]]
] = None,
dynamic_failure_callbacks: Optional[
List[Union[str, Callable, CustomLogger]]
] = None,
dynamic_async_failure_callbacks: Optional[
List[Union[str, Callable, CustomLogger]]
] = None,
kwargs: Optional[Dict] = None,
):
if messages is not None:
@ -230,27 +242,117 @@ class Logging:
[]
) # for generating complete stream response
self.model_call_details: Dict[Any, Any] = {}
self.dynamic_input_callbacks: List[Any] = (
[]
) # [TODO] callbacks set for just that call
self.dynamic_failure_callbacks = dynamic_failure_callbacks
self.dynamic_success_callbacks = (
dynamic_success_callbacks # callbacks set for just that call
)
self.dynamic_async_success_callbacks = (
dynamic_async_success_callbacks # callbacks set for just that call
)
# Initialize dynamic callbacks
self.dynamic_input_callbacks: Optional[
List[Union[str, Callable, CustomLogger]]
] = dynamic_input_callbacks
self.dynamic_success_callbacks: Optional[
List[Union[str, Callable, CustomLogger]]
] = dynamic_success_callbacks
self.dynamic_async_success_callbacks: Optional[
List[Union[str, Callable, CustomLogger]]
] = dynamic_async_success_callbacks
self.dynamic_failure_callbacks: Optional[
List[Union[str, Callable, CustomLogger]]
] = dynamic_failure_callbacks
self.dynamic_async_failure_callbacks: Optional[
List[Union[str, Callable, CustomLogger]]
] = dynamic_async_failure_callbacks
# Process dynamic callbacks
self.process_dynamic_callbacks()
## DYNAMIC LANGFUSE / GCS / logging callback KEYS ##
self.standard_callback_dynamic_params: StandardCallbackDynamicParams = (
self.initialize_standard_callback_dynamic_params(kwargs)
)
## TIME TO FIRST TOKEN LOGGING ##
## TIME TO FIRST TOKEN LOGGING ##
self.completion_start_time: Optional[datetime.datetime] = None
def process_dynamic_callbacks(self):
"""
Initializes CustomLogger compatible callbacks in self.dynamic_* callbacks
If a callback is in litellm._known_custom_logger_compatible_callbacks, it needs to be intialized and added to the respective dynamic_* callback list.
"""
# Process input callbacks
self.dynamic_input_callbacks = self._process_dynamic_callback_list(
self.dynamic_input_callbacks, dynamic_callbacks_type="input"
)
# Process failure callbacks
self.dynamic_failure_callbacks = self._process_dynamic_callback_list(
self.dynamic_failure_callbacks, dynamic_callbacks_type="failure"
)
# Process async failure callbacks
self.dynamic_async_failure_callbacks = self._process_dynamic_callback_list(
self.dynamic_async_failure_callbacks, dynamic_callbacks_type="async_failure"
)
# Process success callbacks
self.dynamic_success_callbacks = self._process_dynamic_callback_list(
self.dynamic_success_callbacks, dynamic_callbacks_type="success"
)
# Process async success callbacks
self.dynamic_async_success_callbacks = self._process_dynamic_callback_list(
self.dynamic_async_success_callbacks, dynamic_callbacks_type="async_success"
)
def _process_dynamic_callback_list(
self,
callback_list: Optional[List[Union[str, Callable, CustomLogger]]],
dynamic_callbacks_type: Literal[
"input", "success", "failure", "async_success", "async_failure"
],
) -> Optional[List[Union[str, Callable, CustomLogger]]]:
"""
Helper function to initialize CustomLogger compatible callbacks in self.dynamic_* callbacks
- If a callback is in litellm._known_custom_logger_compatible_callbacks,
replace the string with the initialized callback class.
- If dynamic callback is a "success" callback that is a known_custom_logger_compatible_callbacks then add it to dynamic_async_success_callbacks
- If dynamic callback is a "failure" callback that is a known_custom_logger_compatible_callbacks then add it to dynamic_failure_callbacks
"""
if callback_list is None:
return None
processed_list: List[Union[str, Callable, CustomLogger]] = []
for callback in callback_list:
if (
isinstance(callback, str)
and callback in litellm._known_custom_logger_compatible_callbacks
):
callback_class = _init_custom_logger_compatible_class(
callback, internal_usage_cache=None, llm_router=None # type: ignore
)
if callback_class is not None:
processed_list.append(callback_class)
# If processing dynamic_success_callbacks, add to dynamic_async_success_callbacks
if dynamic_callbacks_type == "success":
if self.dynamic_async_success_callbacks is None:
self.dynamic_async_success_callbacks = []
self.dynamic_async_success_callbacks.append(callback_class)
elif dynamic_callbacks_type == "failure":
if self.dynamic_async_failure_callbacks is None:
self.dynamic_async_failure_callbacks = []
self.dynamic_async_failure_callbacks.append(callback_class)
else:
processed_list.append(callback)
return processed_list
def initialize_standard_callback_dynamic_params(
self, kwargs: Optional[Dict] = None
) -> StandardCallbackDynamicParams:
"""
Initialize the standard callback dynamic params from the kwargs
checks if langfuse_secret_key, gcs_bucket_name in kwargs and sets the corresponding attributes in StandardCallbackDynamicParams
"""
standard_callback_dynamic_params = StandardCallbackDynamicParams()
if kwargs:
_supported_callback_params = (
@ -413,7 +515,7 @@ class Logging:
self.model_call_details["api_call_start_time"] = datetime.datetime.now()
# Input Integration Logging -> If you want to log the fact that an attempt to call the model was made
callbacks = litellm.input_callback + self.dynamic_input_callbacks
callbacks = litellm.input_callback + (self.dynamic_input_callbacks or [])
for callback in callbacks:
try:
if callback == "supabase" and supabaseClient is not None:
@ -529,7 +631,7 @@ class Logging:
)
# Input Integration Logging -> If you want to log the fact that an attempt to call the model was made
callbacks = litellm.input_callback + self.dynamic_input_callbacks
callbacks = litellm.input_callback + (self.dynamic_input_callbacks or [])
for callback in callbacks:
try:
if callback == "sentry" and add_breadcrumb:
@ -2004,8 +2106,25 @@ class Logging:
start_time=start_time,
end_time=end_time,
)
callbacks = [] # init this to empty incase it's not created
if self.dynamic_async_failure_callbacks is not None and isinstance(
self.dynamic_async_failure_callbacks, list
):
callbacks = self.dynamic_async_failure_callbacks
## keep the internal functions ##
for callback in litellm._async_failure_callback:
if (
isinstance(callback, CustomLogger)
and "_PROXY_" in callback.__class__.__name__
):
callbacks.append(callback)
else:
callbacks = litellm._async_failure_callback
result = None # result sent to all loggers, init this to None incase it's not created
for callback in litellm._async_failure_callback:
for callback in callbacks:
try:
if isinstance(callback, CustomLogger): # custom logger class
await callback.async_log_failure_event(