s3 set flush interval and batch size

This commit is contained in:
Ishaan Jaff 2024-10-11 16:48:23 +05:30
parent 6865e3ce73
commit 6ab88fec44
2 changed files with 19 additions and 2 deletions

View file

@ -21,6 +21,7 @@ class CustomBatchLogger(CustomLogger):
self, self,
flush_lock: Optional[asyncio.Lock] = None, flush_lock: Optional[asyncio.Lock] = None,
batch_size: Optional[int] = DEFAULT_BATCH_SIZE, batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
flush_interval: Optional[int] = DEFAULT_FLUSH_INTERVAL_SECONDS,
**kwargs, **kwargs,
) -> None: ) -> None:
""" """
@ -28,7 +29,7 @@ class CustomBatchLogger(CustomLogger):
flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching
""" """
self.log_queue: List = [] self.log_queue: List = []
self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds self.flush_interval = flush_interval or DEFAULT_FLUSH_INTERVAL_SECONDS
self.batch_size: int = batch_size or DEFAULT_BATCH_SIZE self.batch_size: int = batch_size or DEFAULT_BATCH_SIZE
self.last_flush_time = time.time() self.last_flush_time = time.time()
self.flush_lock = flush_lock self.flush_lock = flush_lock

View file

@ -40,6 +40,8 @@ class S3Logger(CustomBatchLogger, BaseAWSLLM):
s3_aws_access_key_id: Optional[str] = None, s3_aws_access_key_id: Optional[str] = None,
s3_aws_secret_access_key: Optional[str] = None, s3_aws_secret_access_key: Optional[str] = None,
s3_aws_session_token: Optional[str] = None, s3_aws_session_token: Optional[str] = None,
s3_flush_interval: Optional[int] = None,
s3_batch_size: Optional[int] = None,
s3_config=None, s3_config=None,
**kwargs, **kwargs,
): ):
@ -79,6 +81,11 @@ class S3Logger(CustomBatchLogger, BaseAWSLLM):
s3_path = litellm.s3_callback_params.get("s3_path") s3_path = litellm.s3_callback_params.get("s3_path")
# done reading litellm.s3_callback_params # done reading litellm.s3_callback_params
s3_flush_interval = litellm.s3_callback_params.get(
"s3_flush_interval", None
)
s3_batch_size = litellm.s3_callback_params.get("s3_batch_size", None)
self.bucket_name = s3_bucket_name self.bucket_name = s3_bucket_name
self.s3_path = s3_path self.s3_path = s3_path
verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}") verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}")
@ -96,8 +103,17 @@ class S3Logger(CustomBatchLogger, BaseAWSLLM):
asyncio.create_task(self.periodic_flush()) asyncio.create_task(self.periodic_flush())
self.flush_lock = asyncio.Lock() self.flush_lock = asyncio.Lock()
verbose_logger.debug(
f"s3 flush interval: {s3_flush_interval}, s3 batch size: {s3_batch_size}"
)
# Call CustomLogger's __init__ # Call CustomLogger's __init__
CustomBatchLogger.__init__(self, flush_lock=self.flush_lock) CustomBatchLogger.__init__(
self,
flush_lock=self.flush_lock,
flush_interval=s3_flush_interval,
batch_size=s3_batch_size,
)
# Call BaseAWSLLM's __init__ # Call BaseAWSLLM's __init__
BaseAWSLLM.__init__(self) BaseAWSLLM.__init__(self)