diff --git a/litellm/utils.py b/litellm/utils.py index 7965664e45..5ba5c15eda 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -4286,437 +4286,4 @@ def get_valid_models(): valid_models.extend(models_for_provider) return valid_models except: - return [] # NON-Blocking - - -############################# BATCH COMPLETION with Rate Limit Throttling ####################### -@dataclass -class StatusTracker: - """Stores metadata about the script's progress. Only one instance is created.""" - - num_tasks_started: int = 0 - num_tasks_in_progress: int = 0 # script ends when this reaches 0 - num_tasks_succeeded: int = 0 - num_tasks_failed: int = 0 - num_rate_limit_errors: int = 0 - num_api_errors: int = 0 # excluding rate limit errors, counted above - num_other_errors: int = 0 - time_of_last_rate_limit_error: int = 0 # used to cool off after hitting rate limits - - -@dataclass -class APIRequest: - """Stores an API request's inputs, outputs, and other metadata. Contains a method to make an API call.""" - - task_id: int - request_json: dict - token_consumption: int - attempts_left: int - metadata: dict - result: list = field(default_factory=list) - - async def call_api( - self, - request_header: dict, - retry_queue: asyncio.Queue, - status_tracker: StatusTracker, - save_filepath: str = "", - ): - """Calls the OpenAI API and saves results.""" - logging.info(f"Making API Call for request #{self.task_id} {self.request_json}") - error = None - try: - response = await litellm.acompletion( - **self.request_json - ) - logging.info(f"Completed request #{self.task_id}") - if save_filepath == "": # return respons - return response - # else this gets written to save_filepath - except Exception as e: - logging.warning( - f"Request {self.task_id} failed with error {e}" - ) - status_tracker.num_api_errors += 1 - error = e - print(f"got exception {e}") - if "Rate limit" in str(e): - status_tracker.time_of_last_rate_limit_error = int(time.time()) - status_tracker.num_rate_limit_errors += 1 - status_tracker.num_api_errors -= ( - 1 # rate limit errors are counted separately - ) - - if error: - self.result.append(error) - if self.attempts_left: - retry_queue.put_nowait(self) - else: - logging.error( - f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}" - ) - data = ( - [self.request_json, [str(e) for e in self.result], self.metadata] - if self.metadata - else [self.request_json, [str(e) for e in self.result]] - ) - self.append_to_jsonl(data, save_filepath) - status_tracker.num_tasks_in_progress -= 1 - status_tracker.num_tasks_failed += 1 - else: - data = ( - [self.request_json, response, self.metadata] - if self.metadata - else [self.request_json, response] - ) - self.append_to_jsonl(data, save_filepath) - status_tracker.num_tasks_in_progress -= 1 - status_tracker.num_tasks_succeeded += 1 - logging.debug(f"Request {self.task_id} saved to {save_filepath}") - - - def append_to_jsonl(self, data, filename: str) -> None: - """Append a json payload to the end of a jsonl file.""" - json_string = json.dumps(data) - with open(filename, "a") as f: - f.write(json_string + "\n") - - -class RateLimitManager(): - import uuid - def __init__(self, max_tokens_per_minute, max_requests_per_minute): - self.max_tokens_per_minute = max_tokens_per_minute - self.max_requests_per_minute = max_requests_per_minute - # print("init rate limit handler") - self.status_tracker = StatusTracker() - self.last_update_time = time.time() - self.available_request_capacity = max_requests_per_minute - self.available_token_capacity = max_tokens_per_minute - self.queue_of_requests_to_retry = asyncio.Queue() # type: ignore - self.task = 0 # for tracking ids for tasks - self.cooldown_time = 10 # time to cooldown between retries in seconds - - async def acompletion(self, max_attempts=5, **kwargs): - # Initialize logging - logging.basicConfig(level=logging.INFO) - - # Initialize request - logging.info(f"Initializing API request for request id:{self.task}") - request = APIRequest( - task_id=self.task, - request_json=kwargs, - token_consumption=self.num_tokens_consumed_from_request(request_json=kwargs, token_encoding_name="cl100k_base"), - attempts_left=max_attempts, - metadata=kwargs.pop("metadata", None), - ) - self.task+=1 # added a new task to execute - - # Check and update current capacity for model - current_time = time.time() - seconds_since_update = current_time - self.last_update_time - - self.available_request_capacity = min( - self.available_request_capacity + self.max_requests_per_minute * seconds_since_update / 60.0, - self.max_requests_per_minute, - ) - - self.available_token_capacity = min( - self.available_token_capacity + self.max_tokens_per_minute * seconds_since_update / 60.0, - self.max_tokens_per_minute, - ) - - self.last_update_time = current_time - - request_tokens = request.token_consumption - logging.debug("Request tokens: " + str(request_tokens)) - - queue_of_requests_to_retry = asyncio.Queue() - - if (self.available_request_capacity >= 1 and self.available_token_capacity >= request_tokens): - - # Update counters - self.available_request_capacity -= 1 - self.available_token_capacity -= request_tokens - request.attempts_left -= 1 - - # Call API and log final status - logging.info(f"""Running Request {request.task_id}, using tokens: {request.token_consumption}. Remaining available tokens: {self.available_token_capacity}""") - - result = await request.call_api( - request_header={}, - retry_queue=queue_of_requests_to_retry, - save_filepath="", - status_tracker=self.status_tracker, - ) - return result - else: - logging.info(f"OVER CAPACITY for {request.task_id}. retrying {request.attempts_left} times") - while request.attempts_left >= 0: - # Sleep for a minute to allow for capacity - logging.info(f"OVER CAPACITY for {request.task_id}. Cooling down for 60 seconds, retrying {request.attempts_left} times") - await asyncio.sleep(self.cooldown_time) - - # Check capacity - current_time = time.time() - seconds_since_update = current_time - self.last_update_time - - self.available_request_capacity = min( - self.available_request_capacity + self.max_requests_per_minute * seconds_since_update / 60.0, - self.max_requests_per_minute, - ) - - self.available_token_capacity = min( - self.available_token_capacity + self.max_tokens_per_minute * seconds_since_update / 60.0, - self.max_tokens_per_minute, - ) - - self.last_update_time = current_time - - request_tokens = request.token_consumption - - if self.available_request_capacity >= 1 and self.available_token_capacity >= request_tokens: - logging.info("Available token capacity available.") - - # Update counters - self.available_request_capacity -= 1 - self.available_token_capacity -= request_tokens - request.attempts_left -= 1 - - # Call API and log final status - logging.info(f"""Running Request {request.task_id}, using tokens: {request.token_consumption}. Remaining available tokens: {self.available_token_capacity}""") - - result = await request.call_api( - request_header={}, - retry_queue=queue_of_requests_to_retry, - save_filepath="", - status_tracker=self.status_tracker, - ) - return result - - logging.warning(f"Request {request.task_id} is still over capacity. Number of retry attempts left: {request.attempts_left}") - request.attempts_left -=1 - - async def batch_completion( - self, - requests_filepath: str = "", - jobs: list = [], - save_filepath: Optional[str] = None, - api_key: Optional[str] = os.getenv("OPENAI_API_KEY"), - max_requests_per_minute: float = 3_000 * 0.5, - max_tokens_per_minute: float = 250_000 * 0.5, - token_encoding_name: str = "cl100k_base", - max_attempts: int = 5, - logging_level: int = logging.INFO, - ): - - if save_filepath == None: - save_filepath = "litellm_results.jsonl" - print("running batch completion") - - # constants - seconds_to_pause_after_rate_limit_error = 15 - seconds_to_sleep_each_loop = ( - 0.001 # 1 ms limits max throughput to 1,000 requests per second - ) - - # initialize logging - logging.basicConfig(level=logging_level) - logging.debug(f"Logging initialized at level {logging_level}") - - # infer API endpoint and construct request header - - request_header = {"Authorization": f"Bearer {api_key}"} - - # initialize trackers - queue_of_requests_to_retry = asyncio.Queue() # type: ignore - task_id_generator = ( - self.task_id_generator_function() - ) # generates integer IDs of 1, 2, 3, ... - status_tracker = ( - StatusTracker() - ) # single instance to track a collection of variables - next_request = None # variable to hold the next request to call - - # initialize available capacity counts - available_request_capacity = max_requests_per_minute - available_token_capacity = max_tokens_per_minute - last_update_time = time.time() - - # initialize flags - file_not_finished = True # after file is empty, we'll skip reading it - logging.debug(f"Initialization complete.") - - requests = iter(jobs) - - while True: - # get next request (if one is not already waiting for capacity) - if next_request is None: - if not queue_of_requests_to_retry.empty(): - next_request = queue_of_requests_to_retry.get_nowait() - logging.debug( - f"Retrying request {next_request.task_id}: {next_request}" - ) - elif file_not_finished: - try: - # get new request - request_json = next(requests) - if "api_key" not in request_json: - request_json["api_key"] = api_key - # print("CREATING API REQUEST") - next_request = APIRequest( - task_id=next(task_id_generator), - request_json=request_json, - token_consumption=self.num_tokens_consumed_from_request( - request_json, token_encoding_name - ), - attempts_left=max_attempts, - metadata=request_json.pop("metadata", None), - ) - # print("AFTER INIT API REQUEST") - status_tracker.num_tasks_started += 1 - status_tracker.num_tasks_in_progress += 1 - logging.debug( - f"Reading request {next_request.task_id}: {next_request}" - ) - except: - logging.debug("Jobs finished") - file_not_finished = False - - # update available capacity - current_time = time.time() - seconds_since_update = current_time - last_update_time - available_request_capacity = min( - available_request_capacity - + max_requests_per_minute * seconds_since_update / 60.0, - max_requests_per_minute, - ) - available_token_capacity = min( - available_token_capacity - + max_tokens_per_minute * seconds_since_update / 60.0, - max_tokens_per_minute, - ) - last_update_time = current_time - - # if enough capacity available, call API - if next_request: - next_request_tokens = next_request.token_consumption - if ( - available_request_capacity >= 1 - and available_token_capacity >= next_request_tokens - ): - # update counters - available_request_capacity -= 1 - available_token_capacity -= next_request_tokens - next_request.attempts_left -= 1 - - # call API - # after finishing, log final status - logging.info( - f"""Running Request {next_request.task_id}, using tokens: {next_request.token_consumption} remaining available tokens: {available_token_capacity}""" - ) - next_request.task_id - - asyncio.create_task( - next_request.call_api( - request_header=request_header, - retry_queue=queue_of_requests_to_retry, - save_filepath=save_filepath, - status_tracker=status_tracker, - ) - ) - next_request = None # reset next_request to empty - - # if all tasks are finished, break - if status_tracker.num_tasks_in_progress == 0: - break - - # main loop sleeps briefly so concurrent tasks can run - await asyncio.sleep(seconds_to_sleep_each_loop) - - # if a rate limit error was hit recently, pause to cool down - seconds_since_rate_limit_error = ( - time.time() - status_tracker.time_of_last_rate_limit_error - ) - if ( - seconds_since_rate_limit_error - < seconds_to_pause_after_rate_limit_error - ): - remaining_seconds_to_pause = ( - seconds_to_pause_after_rate_limit_error - - seconds_since_rate_limit_error - ) - await asyncio.sleep(remaining_seconds_to_pause) - # ^e.g., if pause is 15 seconds and final limit was hit 5 seconds ago - logging.warn( - f"Pausing to cool down until {time.ctime(status_tracker.time_of_last_rate_limit_error + seconds_to_pause_after_rate_limit_error)}" - ) - - # after finishing, log final status - logging.info( - f"""Parallel processing complete. Results saved to {save_filepath}""" - ) - if status_tracker.num_tasks_failed > 0: - logging.warning( - f"{status_tracker.num_tasks_failed} / {status_tracker.num_tasks_started} requests failed. Errors logged to {save_filepath}." - ) - if status_tracker.num_rate_limit_errors > 0: - logging.warning( - f"{status_tracker.num_rate_limit_errors} rate limit errors received. Consider running at a lower rate." - ) - - - # dataclasses - - - - - - def num_tokens_consumed_from_request( - self, - request_json: dict, - token_encoding_name: str, - ): - """Count the number of tokens in the request. Only supports completion and embedding requests.""" - encoding = tiktoken.get_encoding(token_encoding_name) - # if completions request, tokens = prompt + n * max_tokens - - max_tokens = request_json.get("max_tokens", 15) - n = request_json.get("n", 1) - completion_tokens = n * max_tokens - - - num_tokens = 0 - for message in request_json["messages"]: - num_tokens += 4 # every message follows {role/name}\n{content}\n - for key, value in message.items(): - num_tokens += len(encoding.encode(value)) - if key == "name": # if there's a name, the role is omitted - num_tokens -= 1 # role is always required and always 1 token - num_tokens += 2 # every reply is primed with assistant - return num_tokens + completion_tokens - - def task_id_generator_function(self): - """Generate integers 0, 1, 2, and so on.""" - task_id = 0 - while True: - yield task_id - task_id += 1 - - -###### USAGE ################ -# jobs = [ -# {"model": "gpt-4", "messages": [{"content": "Please provide a summary of the latest scientific discoveries."*500, "role": "user"}]}, -# {"model": "gpt-4", "messages": [{"content": "Please provide a summary of the latest scientific discoveries."*800, "role": "user"}]}, -# {"model": "gpt-4", "messages": [{"content": "Please provide a summary of the latest scientific discoveries."*900, "role": "user"}]}, -# {"model": "gpt-4", "messages": [{"content": "Please provide a summary of the latest scientific discoveries."*900, "role": "user"}]}, -# {"model": "gpt-4", "messages": [{"content": "Please provide a summary of the latest scientific discoveries."*900, "role": "user"}]} -# ] - -# asyncio.run( -# batch_completion_rate_limits( -# jobs = jobs, -# api_key="", -# max_requests_per_minute=60, -# max_tokens_per_minute=40000 -# ) -# ) \ No newline at end of file + return [] # NON-Blocking \ No newline at end of file