From 4a5dae3941cd11af45b291a8d9b8413adab4074a Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Fri, 24 Nov 2023 11:27:04 -0800 Subject: [PATCH] fix(main.py): fix streaming_chunk_builder to return usage --- litellm/main.py | 13 +-- litellm/tests/test_router.py | 72 ---------------- litellm/tests/test_router_cooldowns.py | 99 ++++++++++++++++++++++ litellm/tests/test_router_fallbacks.py | 4 +- litellm/tests/test_stream_chunk_builder.py | 55 +++++------- litellm/utils.py | 7 +- 6 files changed, 133 insertions(+), 117 deletions(-) create mode 100644 litellm/tests/test_router_cooldowns.py diff --git a/litellm/main.py b/litellm/main.py index 1e8191e363..7ffabd0584 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -2066,14 +2066,14 @@ def config_completion(**kwargs): "No config path set, please set a config path using `litellm.config_path = 'path/to/config.json'`" ) -def stream_chunk_builder(chunks: list): +def stream_chunk_builder(chunks: list, messages: Optional[list]=None): id = chunks[0]["id"] object = chunks[0]["object"] created = chunks[0]["created"] model = chunks[0]["model"] role = chunks[0]["choices"][0]["delta"]["role"] finish_reason = chunks[-1]["choices"][0]["finish_reason"] - + # Initialize the response dictionary response = { "id": id, @@ -2105,7 +2105,7 @@ def stream_chunk_builder(chunks: list): argument_list = [] delta = chunks[0]["choices"][0]["delta"] function_call = delta.get("function_call", "") - function_call_name = function_call.get("name", "") + function_call_name = function_call.name message = response["choices"][0]["message"] message["function_call"] = {} @@ -2120,7 +2120,7 @@ def stream_chunk_builder(chunks: list): # Check if a function call is present if function_call: # Now, function_call is expected to be a dictionary - arguments = function_call.get("arguments", "") + arguments = function_call.arguments argument_list.append(arguments) combined_arguments = "".join(argument_list) @@ -2144,5 +2144,8 @@ def stream_chunk_builder(chunks: list): # # Update usage information if needed + if messages: + response["usage"]["prompt_tokens"] = litellm.utils.token_counter(model=model, messages=messages) response["usage"]["completion_tokens"] = litellm.utils.token_counter(model=model, text=combined_content) - return response + response["usage"]["total_tokens"] = response["usage"]["prompt_tokens"] + response["usage"]["completion_tokens"] + return litellm.utils.convert_to_model_response_object(response_object=response, model_response_object=litellm.ModelResponse()) diff --git a/litellm/tests/test_router.py b/litellm/tests/test_router.py index e1cf247d88..120fe53ced 100644 --- a/litellm/tests/test_router.py +++ b/litellm/tests/test_router.py @@ -13,78 +13,6 @@ from concurrent.futures import ThreadPoolExecutor from dotenv import load_dotenv load_dotenv() -def test_multiple_deployments(): - import concurrent, time - litellm.set_verbose=False - futures = {} - model_list = [{ # list of model deployments - "model_name": "gpt-3.5-turbo", # openai model name - "litellm_params": { # params for litellm completion/embedding call - "model": "azure/chatgpt-v-2", - "api_key": "bad-key", - "api_version": os.getenv("AZURE_API_VERSION"), - "api_base": os.getenv("AZURE_API_BASE") - }, - "tpm": 240000, - "rpm": 1800 - }, - { - "model_name": "gpt-3.5-turbo", # openai model name - "litellm_params": { # params for litellm completion/embedding call - "model": "gpt-3.5-turbo", - "api_key": os.getenv("OPENAI_API_KEY"), - }, - "tpm": 1000000, - "rpm": 9000 - } - ] - - router = Router(model_list=model_list, - redis_host=os.getenv("REDIS_HOST"), - redis_password=os.getenv("REDIS_PASSWORD"), - redis_port=int(os.getenv("REDIS_PORT")), - routing_strategy="simple-shuffle", - set_verbose=False, - num_retries=1) # type: ignore - kwargs = {"model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": "Hey, how's it going?"}],} - - results = [] - - try: - for _ in range(3): - response = router.completion(**kwargs) - results.append(response) - router.flush_cache() - except Exception as e: - print(f"FAILED TEST!") - pytest.fail(f"An error occurred - {str(e)}") - - # start_time = time.time() - # for _ in range(1000): - # future = executor.submit(router.completion, **kwargs) - # futures[future] = future - - # # Retrieve the results from the futures - # while futures: - # done, not_done = concurrent.futures.wait(futures, timeout=10, return_when=concurrent.futures.FIRST_COMPLETED) - # for future in done: - # try: - # result = future.result() - # results.append(result) - # futures.pop(future) # Remove the done future - # except Exception as e: - # print(f"Exception: {e}; traceback: {traceback.format_exc()}") - # futures.pop(future) # Remove the done future with exception - - # print(f"Remaining futures: {len(futures)}") - - # end_time = time.time() - # print(f"ELAPSED TIME: {end_time-start_time}") - # Check results - - -# test_multiple_deployments() - def test_exception_raising(): # this tests if the router raises an exception when invalid params are set # in this test both deployments have bad keys - Keep this test. It validates if the router raises the most recent exception diff --git a/litellm/tests/test_router_cooldowns.py b/litellm/tests/test_router_cooldowns.py new file mode 100644 index 0000000000..24e47e7cf1 --- /dev/null +++ b/litellm/tests/test_router_cooldowns.py @@ -0,0 +1,99 @@ +#### What this tests #### +# This tests calling batch_completions by running 100 messages together + +import sys, os, time +import traceback, asyncio +import pytest +sys.path.insert( + 0, os.path.abspath("../..") +) # Adds the parent directory to the system path +import litellm +from litellm import Router +import concurrent +from dotenv import load_dotenv +load_dotenv() + +model_list = [{ # list of model deployments + "model_name": "gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": "bad-key", + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { + "model_name": "gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "gpt-3.5-turbo", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "tpm": 1000000, + "rpm": 9000 + } +] + +router = Router(model_list=model_list, + redis_host=os.getenv("REDIS_HOST"), + redis_password=os.getenv("REDIS_PASSWORD"), + redis_port=int(os.getenv("REDIS_PORT")), + routing_strategy="simple-shuffle", + set_verbose=True, + num_retries=1) # type: ignore +kwargs = {"model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": "Hey, how's it going?"}],} + + +def test_multiple_deployments_sync(): + import concurrent, time + litellm.set_verbose=False + results = [] + + try: + router.flush_cache() + for _ in range(3): + response = router.completion(**kwargs) + results.append(response) + print(results) + router.flush_cache() + except Exception as e: + print(f"FAILED TEST!") + pytest.fail(f"An error occurred - {str(e)}") + +# test_multiple_deployments_sync() + + +def test_multiple_deployments_parallel(): + litellm.set_verbose = False # Corrected the syntax for setting verbose to False + results = [] + futures = {} + start_time = time.time() + router.flush_cache() + # Assuming you have an executor instance defined somewhere in your code + with concurrent.futures.ThreadPoolExecutor() as executor: + for _ in range(5): + future = executor.submit(router.completion, **kwargs) + futures[future] = future + + # Retrieve the results from the futures + while futures: + done, not_done = concurrent.futures.wait(futures.values(), timeout=10, return_when=concurrent.futures.FIRST_COMPLETED) + for future in done: + try: + result = future.result() + results.append(result) + del futures[future] # Remove the done future + except Exception as e: + print(f"Exception: {e}; traceback: {traceback.format_exc()}") + del futures[future] # Remove the done future with exception + + print(f"Remaining futures: {len(futures)}") + + end_time = time.time() + print(results) + print(f"ELAPSED TIME: {end_time - start_time}") + +# Assuming litellm, router, and executor are defined somewhere in your code + +test_multiple_deployments_parallel() \ No newline at end of file diff --git a/litellm/tests/test_router_fallbacks.py b/litellm/tests/test_router_fallbacks.py index f50f8f7330..4f21a0ac1c 100644 --- a/litellm/tests/test_router_fallbacks.py +++ b/litellm/tests/test_router_fallbacks.py @@ -93,11 +93,11 @@ def test_async_fallbacks(): response = await router.acompletion(**kwargs) # response = await response print(f"response: {response}") + router.flush_cache() except litellm.Timeout as e: pass except Exception as e: pytest.fail(f"An exception occurred: {e}") - router.flush_cache() asyncio.run(test_get_response()) @@ -110,8 +110,8 @@ def test_sync_context_window_fallbacks(): kwargs["messages"] = [{"role": "user", "content": sample_text}] response = router.completion(**kwargs) print(f"response: {response}") + router.flush_cache() except Exception as e: print(e) - router.flush_cache() # test_sync_context_window_fallbacks() \ No newline at end of file diff --git a/litellm/tests/test_stream_chunk_builder.py b/litellm/tests/test_stream_chunk_builder.py index 8a35868266..5f808a322a 100644 --- a/litellm/tests/test_stream_chunk_builder.py +++ b/litellm/tests/test_stream_chunk_builder.py @@ -1,3 +1,9 @@ +import sys, os, time +import traceback, asyncio +import pytest +sys.path.insert( + 0, os.path.abspath("../..") +) # Adds the parent directory to the system path from litellm import completion, stream_chunk_builder import litellm import os, dotenv @@ -24,40 +30,21 @@ function_schema = { }, } -@pytest.mark.skip def test_stream_chunk_builder(): - litellm.set_verbose = False - litellm.api_key = os.environ["OPENAI_API_KEY"] - response = completion( - model="gpt-3.5-turbo", - messages=messages, - functions=[function_schema], - stream=True, - ) + try: + litellm.set_verbose = False + litellm.api_key = os.environ["OPENAI_API_KEY"] + response = completion( + model="gpt-3.5-turbo", + messages=messages, + functions=[function_schema], + stream=True, + complete_response=True # runs stream_chunk_builder under-the-hood + ) - chunks = [] - - for chunk in response: - # print(chunk) - chunks.append(chunk) - - try: - print(f"chunks: {chunks}") - rebuilt_response = stream_chunk_builder(chunks) - - # exract the response from the rebuilt response - rebuilt_response["id"] - rebuilt_response["object"] - rebuilt_response["created"] - rebuilt_response["model"] - rebuilt_response["choices"] - rebuilt_response["choices"][0]["index"] - choices = rebuilt_response["choices"][0] - message = choices["message"] - role = message["role"] - content = message["content"] - finish_reason = choices["finish_reason"] - print(role, content, finish_reason) - except Exception as e: - raise Exception("stream_chunk_builder failed to rebuild response", e) + print(f"response: {response}") + print(f"response usage: {response['usage']}") + except Exception as e: + pytest.fail(f"An exception occurred - {str(e)}") +test_stream_chunk_builder() \ No newline at end of file diff --git a/litellm/utils.py b/litellm/utils.py index f1cf524351..b805ec2a90 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -710,7 +710,7 @@ class Logging: if self.stream: if result.choices[0].finish_reason is not None: # if it's the last chunk self.streaming_chunks.append(result) - complete_streaming_response = litellm.stream_chunk_builder(self.streaming_chunks) + complete_streaming_response = litellm.stream_chunk_builder(self.streaming_chunks, messages=self.model_call_details.get("messages", None)) else: self.streaming_chunks.append(result) elif isinstance(result, OpenAIObject): @@ -1250,7 +1250,7 @@ def client(original_function): chunks = [] for idx, chunk in enumerate(result): chunks.append(chunk) - return litellm.stream_chunk_builder(chunks) + return litellm.stream_chunk_builder(chunks, messages=kwargs.get("messages", None)) else: return result elif "acompletion" in kwargs and kwargs["acompletion"] == True: @@ -1360,7 +1360,7 @@ def client(original_function): chunks = [] for idx, chunk in enumerate(result): chunks.append(chunk) - return litellm.stream_chunk_builder(chunks) + return litellm.stream_chunk_builder(chunks, messages=kwargs.get("messages", None)) else: return result @@ -5012,7 +5012,6 @@ class CustomStreamWrapper: return completion_obj["content"] = response_obj["text"] print_verbose(f"completion obj content: {completion_obj['content']}") - print_verbose(f"len(completion_obj['content']: {len(completion_obj['content'])}") if response_obj["is_finished"]: model_response.choices[0].finish_reason = response_obj["finish_reason"]