From f6af10b2caed33badd4a55a6fa1cf266ff0a0376 Mon Sep 17 00:00:00 2001 From: ishaan-jaff Date: Wed, 4 Oct 2023 14:45:03 -0700 Subject: [PATCH] add batch_completion with rate limits to utils --- litellm/utils.py | 404 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 404 insertions(+) diff --git a/litellm/utils.py b/litellm/utils.py index 9c70575f8..7b05fd451 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -3713,3 +3713,407 @@ def get_valid_models(): return valid_models except: return [] # NON-Blocking + + +############################# BATCH COMPLETION with Rate Limit Throttling ####################### +""" +API REQUEST PARALLEL PROCESSOR + +Using the OpenAI API to process lots of text quickly takes some care. +If you trickle in a million API requests one by one, they'll take days to complete. +If you flood a million API requests in parallel, they'll exceed the rate limits and fail with errors. +To maximize throughput, parallel requests need to be throttled to stay under rate limits. + +This script parallelizes requests to the OpenAI API while throttling to stay under rate limits. + +Features: +- Streams requests from file, to avoid running out of memory for giant jobs +- Makes requests concurrently, to maximize throughput +- Throttles request and token usage, to stay under rate limits +- Retries failed requests up to {max_attempts} times, to avoid missing data +- Logs errors, to diagnose problems with requests + +``` + +Inputs: +- requests_filepath : str + - path to the file containing the requests to be processed + - file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field + - e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}} + - as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically) + - an example file is provided at examples/data/example_requests_to_parallel_process.jsonl + - the code to generate the example file is appended to the bottom of this script +- save_filepath : str, optional + - path to the file where the results will be saved + - file will be a jsonl file, where each line is an array with the original request plus the API response + - e.g., [{"model": "text-embedding-ada-002", "input": "embed me"}, {...}] + - if omitted, results will be saved to {requests_filename}_results.jsonl +- api_key : str, optional + - API key to use + - if omitted, the script will attempt to read it from an environment variable {os.getenv("OPENAI_API_KEY")} +- max_requests_per_minute : float, optional + - target number of requests to make per minute (will make less if limited by tokens) + - leave headroom by setting this to 50% or 75% of your limit + - if requests are limiting you, try batching multiple embeddings or completions into one request + - if omitted, will default to 1,500 +- max_tokens_per_minute : float, optional + - target number of tokens to use per minute (will use less if limited by requests) + - leave headroom by setting this to 50% or 75% of your limit + - if omitted, will default to 125,000 +- token_encoding_name : str, optional + - name of the token encoding used, as defined in the `tiktoken` package + - if omitted, will default to "cl100k_base" (used by `text-embedding-ada-002`) +- max_attempts : int, optional + - number of times to retry a failed request before giving up + - if omitted, will default to 5 +- logging_level : int, optional + - level of logging to use; higher numbers will log fewer messages + - 40 = ERROR; will log only when requests fail after all retries + - 30 = WARNING; will log when requests his rate limits or other errors + - 20 = INFO; will log when requests start and the status at finish + - 10 = DEBUG; will log various things as the loop runs to see when they occur + - if omitted, will default to 20 (INFO). + +The script is structured as follows: + - Imports + - Define main() + - Initialize things + - In main loop: + - Get next request if one is not already waiting for capacity + - Update available token & request capacity + - If enough capacity available, call API + - The loop pauses if a rate limit error is hit + - The loop breaks when no tasks remain + - Define dataclasses + - StatusTracker (stores script metadata counters; only one instance is created) + - APIRequest (stores API inputs, outputs, metadata; one method to call API) + - Define functions + - append_to_jsonl (writes to results file) + - num_tokens_consumed_from_request (bigger function to infer token usage from request) + - task_id_generator_function (yields 1, 2, 3, ...) + - Run main() +""" + + +# imports +import asyncio # for running API calls concurrently +import json # for saving results to a jsonl file +import logging # for logging rate limit warnings and other messages +import os # for reading API key +import re # for matching endpoint from request URL +import tiktoken # for counting tokens +import time # for sleeping after rate limit is hit +from dataclasses import ( + dataclass, + field, +) # for storing API inputs, outputs, and metadata + + +async def batch_completion_rate_limits( + requests_filepath: str = "", + jobs: list = [], + save_filepath: str = None, + api_key: str = os.getenv("OPENAI_API_KEY"), + max_requests_per_minute: float = 3_000 * 0.5, + max_tokens_per_minute: float = 250_000 * 0.5, + token_encoding_name: str = "cl100k_base", + max_attempts: int = 5, + logging_level: int = logging.INFO, + ): + + if save_filepath == None: + save_filepath = "litellm_results.jsonl" + + # constants + seconds_to_pause_after_rate_limit_error = 15 + seconds_to_sleep_each_loop = ( + 0.001 # 1 ms limits max throughput to 1,000 requests per second + ) + + # initialize logging + logging.basicConfig(level=logging_level) + logging.debug(f"Logging initialized at level {logging_level}") + + # infer API endpoint and construct request header + + request_header = {"Authorization": f"Bearer {api_key}"} + + # initialize trackers + queue_of_requests_to_retry = asyncio.Queue() + task_id_generator = ( + task_id_generator_function() + ) # generates integer IDs of 1, 2, 3, ... + status_tracker = ( + StatusTracker() + ) # single instance to track a collection of variables + next_request = None # variable to hold the next request to call + + # initialize available capacity counts + available_request_capacity = max_requests_per_minute + available_token_capacity = max_tokens_per_minute + last_update_time = time.time() + + # initialize flags + file_not_finished = True # after file is empty, we'll skip reading it + logging.debug(f"Initialization complete.") + + requests = iter(jobs) + + while True: + # get next request (if one is not already waiting for capacity) + if next_request is None: + if not queue_of_requests_to_retry.empty(): + next_request = queue_of_requests_to_retry.get_nowait() + logging.debug( + f"Retrying request {next_request.task_id}: {next_request}" + ) + elif file_not_finished: + try: + # get new request + request_json = next(requests) + if "api_key" not in request_json: + request_json["api_key"] = api_key + # print("CREATING API REQUEST") + next_request = APIRequest( + task_id=next(task_id_generator), + request_json=request_json, + token_consumption=num_tokens_consumed_from_request( + request_json, token_encoding_name + ), + attempts_left=max_attempts, + metadata=request_json.pop("metadata", None), + ) + # print("AFTER INIT API REQUEST") + status_tracker.num_tasks_started += 1 + status_tracker.num_tasks_in_progress += 1 + logging.debug( + f"Reading request {next_request.task_id}: {next_request}" + ) + except: + logging.debug("Jobs finished") + file_not_finished = False + + # update available capacity + current_time = time.time() + seconds_since_update = current_time - last_update_time + available_request_capacity = min( + available_request_capacity + + max_requests_per_minute * seconds_since_update / 60.0, + max_requests_per_minute, + ) + available_token_capacity = min( + available_token_capacity + + max_tokens_per_minute * seconds_since_update / 60.0, + max_tokens_per_minute, + ) + last_update_time = current_time + + # if enough capacity available, call API + if next_request: + next_request_tokens = next_request.token_consumption + if ( + available_request_capacity >= 1 + and available_token_capacity >= next_request_tokens + ): + # update counters + available_request_capacity -= 1 + available_token_capacity -= next_request_tokens + next_request.attempts_left -= 1 + + # call API + # after finishing, log final status + logging.info( + f"""Running Request {next_request.task_id}, using tokens: {next_request.token_consumption} remaining available tokens: {available_token_capacity}""" + ) + next_request.task_id + + asyncio.create_task( + next_request.call_api( + request_header=request_header, + retry_queue=queue_of_requests_to_retry, + save_filepath=save_filepath, + status_tracker=status_tracker, + ) + ) + next_request = None # reset next_request to empty + + # if all tasks are finished, break + if status_tracker.num_tasks_in_progress == 0: + break + + # main loop sleeps briefly so concurrent tasks can run + await asyncio.sleep(seconds_to_sleep_each_loop) + + # if a rate limit error was hit recently, pause to cool down + seconds_since_rate_limit_error = ( + time.time() - status_tracker.time_of_last_rate_limit_error + ) + if ( + seconds_since_rate_limit_error + < seconds_to_pause_after_rate_limit_error + ): + remaining_seconds_to_pause = ( + seconds_to_pause_after_rate_limit_error + - seconds_since_rate_limit_error + ) + await asyncio.sleep(remaining_seconds_to_pause) + # ^e.g., if pause is 15 seconds and final limit was hit 5 seconds ago + logging.warn( + f"Pausing to cool down until {time.ctime(status_tracker.time_of_last_rate_limit_error + seconds_to_pause_after_rate_limit_error)}" + ) + + # after finishing, log final status + logging.info( + f"""Parallel processing complete. Results saved to {save_filepath}""" + ) + if status_tracker.num_tasks_failed > 0: + logging.warning( + f"{status_tracker.num_tasks_failed} / {status_tracker.num_tasks_started} requests failed. Errors logged to {save_filepath}." + ) + if status_tracker.num_rate_limit_errors > 0: + logging.warning( + f"{status_tracker.num_rate_limit_errors} rate limit errors received. Consider running at a lower rate." + ) + + +# dataclasses + + +@dataclass +class StatusTracker: + """Stores metadata about the script's progress. Only one instance is created.""" + + num_tasks_started: int = 0 + num_tasks_in_progress: int = 0 # script ends when this reaches 0 + num_tasks_succeeded: int = 0 + num_tasks_failed: int = 0 + num_rate_limit_errors: int = 0 + num_api_errors: int = 0 # excluding rate limit errors, counted above + num_other_errors: int = 0 + time_of_last_rate_limit_error: int = 0 # used to cool off after hitting rate limits + + +@dataclass +class APIRequest: + """Stores an API request's inputs, outputs, and other metadata. Contains a method to make an API call.""" + + task_id: int + request_json: dict + token_consumption: int + attempts_left: int + metadata: dict + result: list = field(default_factory=list) + + async def call_api( + self, + request_header: dict, + retry_queue: asyncio.Queue, + save_filepath: str, + status_tracker: StatusTracker, + ): + """Calls the OpenAI API and saves results.""" + logging.info(f"Starting request #{self.task_id}") + error = None + try: + response = await litellm.acompletion( + **self.request_json + ) + # print("got response", response) + logging.info(f"Completed request #{self.task_id}") + except Exception as e: + logging.warning( + f"Request {self.task_id} failed with error {e}" + ) + status_tracker.num_api_errors += 1 + error = e + print(f"got exception {e}") + if "Rate limit" in str(e): + status_tracker.time_of_last_rate_limit_error = time.time() + status_tracker.num_rate_limit_errors += 1 + status_tracker.num_api_errors -= ( + 1 # rate limit errors are counted separately + ) + + if error: + self.result.append(error) + if self.attempts_left: + retry_queue.put_nowait(self) + else: + logging.error( + f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}" + ) + data = ( + [self.request_json, [str(e) for e in self.result], self.metadata] + if self.metadata + else [self.request_json, [str(e) for e in self.result]] + ) + append_to_jsonl(data, save_filepath) + status_tracker.num_tasks_in_progress -= 1 + status_tracker.num_tasks_failed += 1 + else: + data = ( + [self.request_json, response, self.metadata] + if self.metadata + else [self.request_json, response] + ) + append_to_jsonl(data, save_filepath) + status_tracker.num_tasks_in_progress -= 1 + status_tracker.num_tasks_succeeded += 1 + logging.debug(f"Request {self.task_id} saved to {save_filepath}") + + +def append_to_jsonl(data, filename: str) -> None: + """Append a json payload to the end of a jsonl file.""" + json_string = json.dumps(data) + with open(filename, "a") as f: + f.write(json_string + "\n") + + +def num_tokens_consumed_from_request( + request_json: dict, + token_encoding_name: str, +): + """Count the number of tokens in the request. Only supports completion and embedding requests.""" + encoding = tiktoken.get_encoding(token_encoding_name) + # if completions request, tokens = prompt + n * max_tokens + + max_tokens = request_json.get("max_tokens", 15) + n = request_json.get("n", 1) + completion_tokens = n * max_tokens + + + num_tokens = 0 + for message in request_json["messages"]: + num_tokens += 4 # every message follows {role/name}\n{content}\n + for key, value in message.items(): + num_tokens += len(encoding.encode(value)) + if key == "name": # if there's a name, the role is omitted + num_tokens -= 1 # role is always required and always 1 token + num_tokens += 2 # every reply is primed with assistant + return num_tokens + completion_tokens + +def task_id_generator_function(): + """Generate integers 0, 1, 2, and so on.""" + task_id = 0 + while True: + yield task_id + task_id += 1 + + +###### USAGE ################ +# jobs = [ +# {"model": "gpt-4", "messages": [{"content": "Please provide a summary of the latest scientific discoveries."*500, "role": "user"}]}, +# {"model": "gpt-4", "messages": [{"content": "Please provide a summary of the latest scientific discoveries."*800, "role": "user"}]}, +# {"model": "gpt-4", "messages": [{"content": "Please provide a summary of the latest scientific discoveries."*900, "role": "user"}]}, +# {"model": "gpt-4", "messages": [{"content": "Please provide a summary of the latest scientific discoveries."*900, "role": "user"}]}, +# {"model": "gpt-4", "messages": [{"content": "Please provide a summary of the latest scientific discoveries."*900, "role": "user"}]} +# ] + +# asyncio.run( +# batch_completion_rate_limits( +# jobs = jobs, +# api_key="", +# max_requests_per_minute=60, +# max_tokens_per_minute=40000 +# ) +# ) \ No newline at end of file