(perf) move s3 logging to Batch logging + async [94% faster perf under 100 RPS on 1 litellm instance] (#6165)

* fix move s3 to use customLogger

* add basic s3 logging test

* add s3 to custom logger compatible

* use batch logger for s3

* s3 set flush interval and batch size

* fix s3 logging

* add notes on s3 logging

* fix s3 logging

* add basic s3 logging test

* fix s3 type errors

* add test for sync logging on s3
This commit is contained in:
Ishaan Jaff 2024-10-11 19:49:03 +05:30 committed by GitHub
parent 4e1c892dfc
commit 2a5624af47
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 407 additions and 149 deletions

View file

@ -116,7 +116,6 @@ lagoLogger = None
dataDogLogger = None
prometheusLogger = None
dynamoLogger = None
s3Logger = None
genericAPILogger = None
clickHouseLogger = None
greenscaleLogger = None
@ -1346,36 +1345,6 @@ class Logging:
user_id=kwargs.get("user", None),
print_verbose=print_verbose,
)
if callback == "s3":
global s3Logger
if s3Logger is None:
s3Logger = S3Logger()
if self.stream:
if "complete_streaming_response" in self.model_call_details:
print_verbose(
"S3Logger Logger: Got Stream Event - Completed Stream Response"
)
s3Logger.log_event(
kwargs=self.model_call_details,
response_obj=self.model_call_details[
"complete_streaming_response"
],
start_time=start_time,
end_time=end_time,
print_verbose=print_verbose,
)
else:
print_verbose(
"S3Logger Logger: Got Stream Event - No complete stream response as yet"
)
else:
s3Logger.log_event(
kwargs=self.model_call_details,
response_obj=result,
start_time=start_time,
end_time=end_time,
print_verbose=print_verbose,
)
if (
callback == "openmeter"
and self.model_call_details.get("litellm_params", {}).get(
@ -2245,7 +2214,7 @@ def set_callbacks(callback_list, function_id=None):
"""
Globally sets the callback client
"""
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger
try:
for callback in callback_list:
@ -2319,8 +2288,6 @@ def set_callbacks(callback_list, function_id=None):
dataDogLogger = DataDogLogger()
elif callback == "dynamodb":
dynamoLogger = DyanmoDBLogger()
elif callback == "s3":
s3Logger = S3Logger()
elif callback == "wandb":
weightsBiasesLogger = WeightsBiasesLogger()
elif callback == "logfire":
@ -2423,6 +2390,14 @@ def _init_custom_logger_compatible_class(
_datadog_logger = DataDogLogger()
_in_memory_loggers.append(_datadog_logger)
return _datadog_logger # type: ignore
elif logging_integration == "s3":
for callback in _in_memory_loggers:
if isinstance(callback, S3Logger):
return callback # type: ignore
_s3_logger = S3Logger()
_in_memory_loggers.append(_s3_logger)
return _s3_logger # type: ignore
elif logging_integration == "gcs_bucket":
for callback in _in_memory_loggers:
if isinstance(callback, GCSBucketLogger):
@ -2589,6 +2564,10 @@ def get_custom_logger_compatible_class(
for callback in _in_memory_loggers:
if isinstance(callback, PrometheusLogger):
return callback
elif logging_integration == "s3":
for callback in _in_memory_loggers:
if isinstance(callback, S3Logger):
return callback
elif logging_integration == "datadog":
for callback in _in_memory_loggers:
if isinstance(callback, DataDogLogger):