diff --git a/litellm/integrations/custom_batch_logger.py b/litellm/integrations/custom_batch_logger.py index aa7f0bba2..7ef63d25c 100644 --- a/litellm/integrations/custom_batch_logger.py +++ b/litellm/integrations/custom_batch_logger.py @@ -21,6 +21,7 @@ class CustomBatchLogger(CustomLogger): self, flush_lock: Optional[asyncio.Lock] = None, batch_size: Optional[int] = DEFAULT_BATCH_SIZE, + flush_interval: Optional[int] = DEFAULT_FLUSH_INTERVAL_SECONDS, **kwargs, ) -> None: """ @@ -28,7 +29,7 @@ class CustomBatchLogger(CustomLogger): flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching """ self.log_queue: List = [] - self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds + self.flush_interval = flush_interval or DEFAULT_FLUSH_INTERVAL_SECONDS self.batch_size: int = batch_size or DEFAULT_BATCH_SIZE self.last_flush_time = time.time() self.flush_lock = flush_lock diff --git a/litellm/integrations/s3.py b/litellm/integrations/s3.py index bf1fa2111..37dfe4d93 100644 --- a/litellm/integrations/s3.py +++ b/litellm/integrations/s3.py @@ -40,6 +40,8 @@ class S3Logger(CustomBatchLogger, BaseAWSLLM): s3_aws_access_key_id: Optional[str] = None, s3_aws_secret_access_key: Optional[str] = None, s3_aws_session_token: Optional[str] = None, + s3_flush_interval: Optional[int] = None, + s3_batch_size: Optional[int] = None, s3_config=None, **kwargs, ): @@ -79,6 +81,11 @@ class S3Logger(CustomBatchLogger, BaseAWSLLM): s3_path = litellm.s3_callback_params.get("s3_path") # done reading litellm.s3_callback_params + s3_flush_interval = litellm.s3_callback_params.get( + "s3_flush_interval", None + ) + s3_batch_size = litellm.s3_callback_params.get("s3_batch_size", None) + self.bucket_name = s3_bucket_name self.s3_path = s3_path verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}") @@ -96,8 +103,17 @@ class S3Logger(CustomBatchLogger, BaseAWSLLM): asyncio.create_task(self.periodic_flush()) self.flush_lock = asyncio.Lock() + + verbose_logger.debug( + f"s3 flush interval: {s3_flush_interval}, s3 batch size: {s3_batch_size}" + ) # Call CustomLogger's __init__ - CustomBatchLogger.__init__(self, flush_lock=self.flush_lock) + CustomBatchLogger.__init__( + self, + flush_lock=self.flush_lock, + flush_interval=s3_flush_interval, + batch_size=s3_batch_size, + ) # Call BaseAWSLLM's __init__ BaseAWSLLM.__init__(self)