diff --git a/litellm/utils.py b/litellm/utils.py index b2f28cfcd8..a409eaba86 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -17,8 +17,14 @@ import datetime, time import tiktoken import uuid import aiohttp +import logging +import asyncio from tokenizers import Tokenizer import pkg_resources +from dataclasses import ( + dataclass, + field, +) # for storing API inputs, outputs, and metadata encoding = tiktoken.get_encoding("cl100k_base") import importlib.metadata from .integrations.traceloop import TraceloopLogger @@ -3716,269 +3722,6 @@ def get_valid_models(): ############################# BATCH COMPLETION with Rate Limit Throttling ####################### -""" -API REQUEST PARALLEL PROCESSOR - -Using the OpenAI API to process lots of text quickly takes some care. -If you trickle in a million API requests one by one, they'll take days to complete. -If you flood a million API requests in parallel, they'll exceed the rate limits and fail with errors. -To maximize throughput, parallel requests need to be throttled to stay under rate limits. - -This script parallelizes requests to the OpenAI API while throttling to stay under rate limits. - -Features: -- Streams requests from file, to avoid running out of memory for giant jobs -- Makes requests concurrently, to maximize throughput -- Throttles request and token usage, to stay under rate limits -- Retries failed requests up to {max_attempts} times, to avoid missing data -- Logs errors, to diagnose problems with requests - -``` - -Inputs: -- requests_filepath : str - - path to the file containing the requests to be processed - - file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field - - e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}} - - as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically) - - an example file is provided at examples/data/example_requests_to_parallel_process.jsonl - - the code to generate the example file is appended to the bottom of this script -- save_filepath : str, optional - - path to the file where the results will be saved - - file will be a jsonl file, where each line is an array with the original request plus the API response - - e.g., [{"model": "text-embedding-ada-002", "input": "embed me"}, {...}] - - if omitted, results will be saved to {requests_filename}_results.jsonl -- api_key : str, optional - - API key to use - - if omitted, the script will attempt to read it from an environment variable {os.getenv("OPENAI_API_KEY")} -- max_requests_per_minute : float, optional - - target number of requests to make per minute (will make less if limited by tokens) - - leave headroom by setting this to 50% or 75% of your limit - - if requests are limiting you, try batching multiple embeddings or completions into one request - - if omitted, will default to 1,500 -- max_tokens_per_minute : float, optional - - target number of tokens to use per minute (will use less if limited by requests) - - leave headroom by setting this to 50% or 75% of your limit - - if omitted, will default to 125,000 -- token_encoding_name : str, optional - - name of the token encoding used, as defined in the `tiktoken` package - - if omitted, will default to "cl100k_base" (used by `text-embedding-ada-002`) -- max_attempts : int, optional - - number of times to retry a failed request before giving up - - if omitted, will default to 5 -- logging_level : int, optional - - level of logging to use; higher numbers will log fewer messages - - 40 = ERROR; will log only when requests fail after all retries - - 30 = WARNING; will log when requests his rate limits or other errors - - 20 = INFO; will log when requests start and the status at finish - - 10 = DEBUG; will log various things as the loop runs to see when they occur - - if omitted, will default to 20 (INFO). - -The script is structured as follows: - - Imports - - Define main() - - Initialize things - - In main loop: - - Get next request if one is not already waiting for capacity - - Update available token & request capacity - - If enough capacity available, call API - - The loop pauses if a rate limit error is hit - - The loop breaks when no tasks remain - - Define dataclasses - - StatusTracker (stores script metadata counters; only one instance is created) - - APIRequest (stores API inputs, outputs, metadata; one method to call API) - - Define functions - - append_to_jsonl (writes to results file) - - num_tokens_consumed_from_request (bigger function to infer token usage from request) - - task_id_generator_function (yields 1, 2, 3, ...) - - Run main() -""" - - -# imports -import asyncio # for running API calls concurrently -import json # for saving results to a jsonl file -import logging # for logging rate limit warnings and other messages -import os # for reading API key -import re # for matching endpoint from request URL -import tiktoken # for counting tokens -import time # for sleeping after rate limit is hit -from dataclasses import ( - dataclass, - field, -) # for storing API inputs, outputs, and metadata - - -async def batch_completion_rate_limits( - requests_filepath: str = "", - jobs: list = [], - save_filepath: str = None, - api_key: str = os.getenv("OPENAI_API_KEY"), - max_requests_per_minute: float = 3_000 * 0.5, - max_tokens_per_minute: float = 250_000 * 0.5, - token_encoding_name: str = "cl100k_base", - max_attempts: int = 5, - logging_level: int = logging.INFO, - ): - - if save_filepath == None: - save_filepath = "litellm_results.jsonl" - - # constants - seconds_to_pause_after_rate_limit_error = 15 - seconds_to_sleep_each_loop = ( - 0.001 # 1 ms limits max throughput to 1,000 requests per second - ) - - # initialize logging - logging.basicConfig(level=logging_level) - logging.debug(f"Logging initialized at level {logging_level}") - - # infer API endpoint and construct request header - - request_header = {"Authorization": f"Bearer {api_key}"} - - # initialize trackers - queue_of_requests_to_retry = asyncio.Queue() - task_id_generator = ( - task_id_generator_function() - ) # generates integer IDs of 1, 2, 3, ... - status_tracker = ( - StatusTracker() - ) # single instance to track a collection of variables - next_request = None # variable to hold the next request to call - - # initialize available capacity counts - available_request_capacity = max_requests_per_minute - available_token_capacity = max_tokens_per_minute - last_update_time = time.time() - - # initialize flags - file_not_finished = True # after file is empty, we'll skip reading it - logging.debug(f"Initialization complete.") - - requests = iter(jobs) - - while True: - # get next request (if one is not already waiting for capacity) - if next_request is None: - if not queue_of_requests_to_retry.empty(): - next_request = queue_of_requests_to_retry.get_nowait() - logging.debug( - f"Retrying request {next_request.task_id}: {next_request}" - ) - elif file_not_finished: - try: - # get new request - request_json = next(requests) - if "api_key" not in request_json: - request_json["api_key"] = api_key - # print("CREATING API REQUEST") - next_request = APIRequest( - task_id=next(task_id_generator), - request_json=request_json, - token_consumption=num_tokens_consumed_from_request( - request_json, token_encoding_name - ), - attempts_left=max_attempts, - metadata=request_json.pop("metadata", None), - ) - # print("AFTER INIT API REQUEST") - status_tracker.num_tasks_started += 1 - status_tracker.num_tasks_in_progress += 1 - logging.debug( - f"Reading request {next_request.task_id}: {next_request}" - ) - except: - logging.debug("Jobs finished") - file_not_finished = False - - # update available capacity - current_time = time.time() - seconds_since_update = current_time - last_update_time - available_request_capacity = min( - available_request_capacity - + max_requests_per_minute * seconds_since_update / 60.0, - max_requests_per_minute, - ) - available_token_capacity = min( - available_token_capacity - + max_tokens_per_minute * seconds_since_update / 60.0, - max_tokens_per_minute, - ) - last_update_time = current_time - - # if enough capacity available, call API - if next_request: - next_request_tokens = next_request.token_consumption - if ( - available_request_capacity >= 1 - and available_token_capacity >= next_request_tokens - ): - # update counters - available_request_capacity -= 1 - available_token_capacity -= next_request_tokens - next_request.attempts_left -= 1 - - # call API - # after finishing, log final status - logging.info( - f"""Running Request {next_request.task_id}, using tokens: {next_request.token_consumption} remaining available tokens: {available_token_capacity}""" - ) - next_request.task_id - - asyncio.create_task( - next_request.call_api( - request_header=request_header, - retry_queue=queue_of_requests_to_retry, - save_filepath=save_filepath, - status_tracker=status_tracker, - ) - ) - next_request = None # reset next_request to empty - - # if all tasks are finished, break - if status_tracker.num_tasks_in_progress == 0: - break - - # main loop sleeps briefly so concurrent tasks can run - await asyncio.sleep(seconds_to_sleep_each_loop) - - # if a rate limit error was hit recently, pause to cool down - seconds_since_rate_limit_error = ( - time.time() - status_tracker.time_of_last_rate_limit_error - ) - if ( - seconds_since_rate_limit_error - < seconds_to_pause_after_rate_limit_error - ): - remaining_seconds_to_pause = ( - seconds_to_pause_after_rate_limit_error - - seconds_since_rate_limit_error - ) - await asyncio.sleep(remaining_seconds_to_pause) - # ^e.g., if pause is 15 seconds and final limit was hit 5 seconds ago - logging.warn( - f"Pausing to cool down until {time.ctime(status_tracker.time_of_last_rate_limit_error + seconds_to_pause_after_rate_limit_error)}" - ) - - # after finishing, log final status - logging.info( - f"""Parallel processing complete. Results saved to {save_filepath}""" - ) - if status_tracker.num_tasks_failed > 0: - logging.warning( - f"{status_tracker.num_tasks_failed} / {status_tracker.num_tasks_started} requests failed. Errors logged to {save_filepath}." - ) - if status_tracker.num_rate_limit_errors > 0: - logging.warning( - f"{status_tracker.num_rate_limit_errors} rate limit errors received. Consider running at a lower rate." - ) - - -# dataclasses - - @dataclass class StatusTracker: """Stores metadata about the script's progress. Only one instance is created.""" @@ -4012,13 +3755,13 @@ class APIRequest: status_tracker: StatusTracker, ): """Calls the OpenAI API and saves results.""" - logging.info(f"Starting request #{self.task_id}") + logging.info(f"Making API Call for request #{self.task_id}") error = None try: response = await litellm.acompletion( **self.request_json ) - # print("got response", response) + print(response) logging.info(f"Completed request #{self.task_id}") except Exception as e: logging.warning( @@ -4047,7 +3790,7 @@ class APIRequest: if self.metadata else [self.request_json, [str(e) for e in self.result]] ) - append_to_jsonl(data, save_filepath) + self.append_to_jsonl(data, save_filepath) status_tracker.num_tasks_in_progress -= 1 status_tracker.num_tasks_failed += 1 else: @@ -4056,48 +3799,231 @@ class APIRequest: if self.metadata else [self.request_json, response] ) - append_to_jsonl(data, save_filepath) + self.append_to_jsonl(data, save_filepath) status_tracker.num_tasks_in_progress -= 1 status_tracker.num_tasks_succeeded += 1 logging.debug(f"Request {self.task_id} saved to {save_filepath}") + + + def append_to_jsonl(self, data, filename: str) -> None: + """Append a json payload to the end of a jsonl file.""" + json_string = json.dumps(data) + with open(filename, "a") as f: + f.write(json_string + "\n") -def append_to_jsonl(data, filename: str) -> None: - """Append a json payload to the end of a jsonl file.""" - json_string = json.dumps(data) - with open(filename, "a") as f: - f.write(json_string + "\n") - - -def num_tokens_consumed_from_request( - request_json: dict, - token_encoding_name: str, -): - """Count the number of tokens in the request. Only supports completion and embedding requests.""" - encoding = tiktoken.get_encoding(token_encoding_name) - # if completions request, tokens = prompt + n * max_tokens +class RateLimitHandler(): + def __init__(self, max_tokens_per_minute, max_requests_per_minute): + self.max_tokens_per_minute = max_tokens_per_minute + self.max_requests_per_minute = max_requests_per_minute + print("init rate limit handler") - max_tokens = request_json.get("max_tokens", 15) - n = request_json.get("n", 1) - completion_tokens = n * max_tokens + + async def batch_completion( + self, + requests_filepath: str = "", + jobs: list = [], + save_filepath: str = None, + api_key: str = os.getenv("OPENAI_API_KEY"), + max_requests_per_minute: float = 3_000 * 0.5, + max_tokens_per_minute: float = 250_000 * 0.5, + token_encoding_name: str = "cl100k_base", + max_attempts: int = 5, + logging_level: int = logging.INFO, + ): + + if save_filepath == None: + save_filepath = "litellm_results.jsonl" + print("running batch completion") + + # constants + seconds_to_pause_after_rate_limit_error = 15 + seconds_to_sleep_each_loop = ( + 0.001 # 1 ms limits max throughput to 1,000 requests per second + ) + + # initialize logging + logging.basicConfig(level=logging_level) + logging.debug(f"Logging initialized at level {logging_level}") + + # infer API endpoint and construct request header + + request_header = {"Authorization": f"Bearer {api_key}"} + + # initialize trackers + queue_of_requests_to_retry = asyncio.Queue() + task_id_generator = ( + self.task_id_generator_function() + ) # generates integer IDs of 1, 2, 3, ... + status_tracker = ( + StatusTracker() + ) # single instance to track a collection of variables + next_request = None # variable to hold the next request to call + + # initialize available capacity counts + available_request_capacity = max_requests_per_minute + available_token_capacity = max_tokens_per_minute + last_update_time = time.time() + + # initialize flags + file_not_finished = True # after file is empty, we'll skip reading it + logging.debug(f"Initialization complete.") + + requests = iter(jobs) + + while True: + # get next request (if one is not already waiting for capacity) + if next_request is None: + if not queue_of_requests_to_retry.empty(): + next_request = queue_of_requests_to_retry.get_nowait() + logging.debug( + f"Retrying request {next_request.task_id}: {next_request}" + ) + elif file_not_finished: + try: + # get new request + request_json = next(requests) + if "api_key" not in request_json: + request_json["api_key"] = api_key + # print("CREATING API REQUEST") + next_request = APIRequest( + task_id=next(task_id_generator), + request_json=request_json, + token_consumption=self.num_tokens_consumed_from_request( + request_json, token_encoding_name + ), + attempts_left=max_attempts, + metadata=request_json.pop("metadata", None), + ) + # print("AFTER INIT API REQUEST") + status_tracker.num_tasks_started += 1 + status_tracker.num_tasks_in_progress += 1 + logging.debug( + f"Reading request {next_request.task_id}: {next_request}" + ) + except: + logging.debug("Jobs finished") + file_not_finished = False + + # update available capacity + current_time = time.time() + seconds_since_update = current_time - last_update_time + available_request_capacity = min( + available_request_capacity + + max_requests_per_minute * seconds_since_update / 60.0, + max_requests_per_minute, + ) + available_token_capacity = min( + available_token_capacity + + max_tokens_per_minute * seconds_since_update / 60.0, + max_tokens_per_minute, + ) + last_update_time = current_time + + # if enough capacity available, call API + if next_request: + next_request_tokens = next_request.token_consumption + if ( + available_request_capacity >= 1 + and available_token_capacity >= next_request_tokens + ): + # update counters + available_request_capacity -= 1 + available_token_capacity -= next_request_tokens + next_request.attempts_left -= 1 + + # call API + # after finishing, log final status + logging.info( + f"""Running Request {next_request.task_id}, using tokens: {next_request.token_consumption} remaining available tokens: {available_token_capacity}""" + ) + next_request.task_id + + asyncio.create_task( + next_request.call_api( + request_header=request_header, + retry_queue=queue_of_requests_to_retry, + save_filepath=save_filepath, + status_tracker=status_tracker, + ) + ) + next_request = None # reset next_request to empty + + # if all tasks are finished, break + if status_tracker.num_tasks_in_progress == 0: + break + + # main loop sleeps briefly so concurrent tasks can run + await asyncio.sleep(seconds_to_sleep_each_loop) + + # if a rate limit error was hit recently, pause to cool down + seconds_since_rate_limit_error = ( + time.time() - status_tracker.time_of_last_rate_limit_error + ) + if ( + seconds_since_rate_limit_error + < seconds_to_pause_after_rate_limit_error + ): + remaining_seconds_to_pause = ( + seconds_to_pause_after_rate_limit_error + - seconds_since_rate_limit_error + ) + await asyncio.sleep(remaining_seconds_to_pause) + # ^e.g., if pause is 15 seconds and final limit was hit 5 seconds ago + logging.warn( + f"Pausing to cool down until {time.ctime(status_tracker.time_of_last_rate_limit_error + seconds_to_pause_after_rate_limit_error)}" + ) + + # after finishing, log final status + logging.info( + f"""Parallel processing complete. Results saved to {save_filepath}""" + ) + if status_tracker.num_tasks_failed > 0: + logging.warning( + f"{status_tracker.num_tasks_failed} / {status_tracker.num_tasks_started} requests failed. Errors logged to {save_filepath}." + ) + if status_tracker.num_rate_limit_errors > 0: + logging.warning( + f"{status_tracker.num_rate_limit_errors} rate limit errors received. Consider running at a lower rate." + ) - num_tokens = 0 - for message in request_json["messages"]: - num_tokens += 4 # every message follows {role/name}\n{content}\n - for key, value in message.items(): - num_tokens += len(encoding.encode(value)) - if key == "name": # if there's a name, the role is omitted - num_tokens -= 1 # role is always required and always 1 token - num_tokens += 2 # every reply is primed with assistant - return num_tokens + completion_tokens + # dataclasses -def task_id_generator_function(): - """Generate integers 0, 1, 2, and so on.""" - task_id = 0 - while True: - yield task_id - task_id += 1 + + + + + def num_tokens_consumed_from_request( + self, + request_json: dict, + token_encoding_name: str, + ): + """Count the number of tokens in the request. Only supports completion and embedding requests.""" + encoding = tiktoken.get_encoding(token_encoding_name) + # if completions request, tokens = prompt + n * max_tokens + + max_tokens = request_json.get("max_tokens", 15) + n = request_json.get("n", 1) + completion_tokens = n * max_tokens + + + num_tokens = 0 + for message in request_json["messages"]: + num_tokens += 4 # every message follows {role/name}\n{content}\n + for key, value in message.items(): + num_tokens += len(encoding.encode(value)) + if key == "name": # if there's a name, the role is omitted + num_tokens -= 1 # role is always required and always 1 token + num_tokens += 2 # every reply is primed with assistant + return num_tokens + completion_tokens + + def task_id_generator_function(self): + """Generate integers 0, 1, 2, and so on.""" + task_id = 0 + while True: + yield task_id + task_id += 1 ###### USAGE ################