fix(main.py): fix streaming_chunk_builder to return usage

This commit is contained in:
Krrish Dholakia 2023-11-24 11:27:04 -08:00
parent 8edfcd8e5d
commit 4a5dae3941
6 changed files with 133 additions and 117 deletions

View file

@ -2066,14 +2066,14 @@ def config_completion(**kwargs):
"No config path set, please set a config path using `litellm.config_path = 'path/to/config.json'`" "No config path set, please set a config path using `litellm.config_path = 'path/to/config.json'`"
) )
def stream_chunk_builder(chunks: list): def stream_chunk_builder(chunks: list, messages: Optional[list]=None):
id = chunks[0]["id"] id = chunks[0]["id"]
object = chunks[0]["object"] object = chunks[0]["object"]
created = chunks[0]["created"] created = chunks[0]["created"]
model = chunks[0]["model"] model = chunks[0]["model"]
role = chunks[0]["choices"][0]["delta"]["role"] role = chunks[0]["choices"][0]["delta"]["role"]
finish_reason = chunks[-1]["choices"][0]["finish_reason"] finish_reason = chunks[-1]["choices"][0]["finish_reason"]
# Initialize the response dictionary # Initialize the response dictionary
response = { response = {
"id": id, "id": id,
@ -2105,7 +2105,7 @@ def stream_chunk_builder(chunks: list):
argument_list = [] argument_list = []
delta = chunks[0]["choices"][0]["delta"] delta = chunks[0]["choices"][0]["delta"]
function_call = delta.get("function_call", "") function_call = delta.get("function_call", "")
function_call_name = function_call.get("name", "") function_call_name = function_call.name
message = response["choices"][0]["message"] message = response["choices"][0]["message"]
message["function_call"] = {} message["function_call"] = {}
@ -2120,7 +2120,7 @@ def stream_chunk_builder(chunks: list):
# Check if a function call is present # Check if a function call is present
if function_call: if function_call:
# Now, function_call is expected to be a dictionary # Now, function_call is expected to be a dictionary
arguments = function_call.get("arguments", "") arguments = function_call.arguments
argument_list.append(arguments) argument_list.append(arguments)
combined_arguments = "".join(argument_list) combined_arguments = "".join(argument_list)
@ -2144,5 +2144,8 @@ def stream_chunk_builder(chunks: list):
# # Update usage information if needed # # Update usage information if needed
if messages:
response["usage"]["prompt_tokens"] = litellm.utils.token_counter(model=model, messages=messages)
response["usage"]["completion_tokens"] = litellm.utils.token_counter(model=model, text=combined_content) response["usage"]["completion_tokens"] = litellm.utils.token_counter(model=model, text=combined_content)
return response response["usage"]["total_tokens"] = response["usage"]["prompt_tokens"] + response["usage"]["completion_tokens"]
return litellm.utils.convert_to_model_response_object(response_object=response, model_response_object=litellm.ModelResponse())

View file

@ -13,78 +13,6 @@ from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
def test_multiple_deployments():
import concurrent, time
litellm.set_verbose=False
futures = {}
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 240000,
"rpm": 1800
},
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 1000000,
"rpm": 9000
}
]
router = Router(model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=int(os.getenv("REDIS_PORT")),
routing_strategy="simple-shuffle",
set_verbose=False,
num_retries=1) # type: ignore
kwargs = {"model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": "Hey, how's it going?"}],}
results = []
try:
for _ in range(3):
response = router.completion(**kwargs)
results.append(response)
router.flush_cache()
except Exception as e:
print(f"FAILED TEST!")
pytest.fail(f"An error occurred - {str(e)}")
# start_time = time.time()
# for _ in range(1000):
# future = executor.submit(router.completion, **kwargs)
# futures[future] = future
# # Retrieve the results from the futures
# while futures:
# done, not_done = concurrent.futures.wait(futures, timeout=10, return_when=concurrent.futures.FIRST_COMPLETED)
# for future in done:
# try:
# result = future.result()
# results.append(result)
# futures.pop(future) # Remove the done future
# except Exception as e:
# print(f"Exception: {e}; traceback: {traceback.format_exc()}")
# futures.pop(future) # Remove the done future with exception
# print(f"Remaining futures: {len(futures)}")
# end_time = time.time()
# print(f"ELAPSED TIME: {end_time-start_time}")
# Check results
# test_multiple_deployments()
def test_exception_raising(): def test_exception_raising():
# this tests if the router raises an exception when invalid params are set # this tests if the router raises an exception when invalid params are set
# in this test both deployments have bad keys - Keep this test. It validates if the router raises the most recent exception # in this test both deployments have bad keys - Keep this test. It validates if the router raises the most recent exception

View file

@ -0,0 +1,99 @@
#### What this tests ####
# This tests calling batch_completions by running 100 messages together
import sys, os, time
import traceback, asyncio
import pytest
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import litellm
from litellm import Router
import concurrent
from dotenv import load_dotenv
load_dotenv()
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 240000,
"rpm": 1800
},
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 1000000,
"rpm": 9000
}
]
router = Router(model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=int(os.getenv("REDIS_PORT")),
routing_strategy="simple-shuffle",
set_verbose=True,
num_retries=1) # type: ignore
kwargs = {"model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": "Hey, how's it going?"}],}
def test_multiple_deployments_sync():
import concurrent, time
litellm.set_verbose=False
results = []
try:
router.flush_cache()
for _ in range(3):
response = router.completion(**kwargs)
results.append(response)
print(results)
router.flush_cache()
except Exception as e:
print(f"FAILED TEST!")
pytest.fail(f"An error occurred - {str(e)}")
# test_multiple_deployments_sync()
def test_multiple_deployments_parallel():
litellm.set_verbose = False # Corrected the syntax for setting verbose to False
results = []
futures = {}
start_time = time.time()
router.flush_cache()
# Assuming you have an executor instance defined somewhere in your code
with concurrent.futures.ThreadPoolExecutor() as executor:
for _ in range(5):
future = executor.submit(router.completion, **kwargs)
futures[future] = future
# Retrieve the results from the futures
while futures:
done, not_done = concurrent.futures.wait(futures.values(), timeout=10, return_when=concurrent.futures.FIRST_COMPLETED)
for future in done:
try:
result = future.result()
results.append(result)
del futures[future] # Remove the done future
except Exception as e:
print(f"Exception: {e}; traceback: {traceback.format_exc()}")
del futures[future] # Remove the done future with exception
print(f"Remaining futures: {len(futures)}")
end_time = time.time()
print(results)
print(f"ELAPSED TIME: {end_time - start_time}")
# Assuming litellm, router, and executor are defined somewhere in your code
test_multiple_deployments_parallel()

View file

@ -93,11 +93,11 @@ def test_async_fallbacks():
response = await router.acompletion(**kwargs) response = await router.acompletion(**kwargs)
# response = await response # response = await response
print(f"response: {response}") print(f"response: {response}")
router.flush_cache()
except litellm.Timeout as e: except litellm.Timeout as e:
pass pass
except Exception as e: except Exception as e:
pytest.fail(f"An exception occurred: {e}") pytest.fail(f"An exception occurred: {e}")
router.flush_cache()
asyncio.run(test_get_response()) asyncio.run(test_get_response())
@ -110,8 +110,8 @@ def test_sync_context_window_fallbacks():
kwargs["messages"] = [{"role": "user", "content": sample_text}] kwargs["messages"] = [{"role": "user", "content": sample_text}]
response = router.completion(**kwargs) response = router.completion(**kwargs)
print(f"response: {response}") print(f"response: {response}")
router.flush_cache()
except Exception as e: except Exception as e:
print(e) print(e)
router.flush_cache()
# test_sync_context_window_fallbacks() # test_sync_context_window_fallbacks()

View file

@ -1,3 +1,9 @@
import sys, os, time
import traceback, asyncio
import pytest
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
from litellm import completion, stream_chunk_builder from litellm import completion, stream_chunk_builder
import litellm import litellm
import os, dotenv import os, dotenv
@ -24,40 +30,21 @@ function_schema = {
}, },
} }
@pytest.mark.skip
def test_stream_chunk_builder(): def test_stream_chunk_builder():
litellm.set_verbose = False try:
litellm.api_key = os.environ["OPENAI_API_KEY"] litellm.set_verbose = False
response = completion( litellm.api_key = os.environ["OPENAI_API_KEY"]
model="gpt-3.5-turbo", response = completion(
messages=messages, model="gpt-3.5-turbo",
functions=[function_schema], messages=messages,
stream=True, functions=[function_schema],
) stream=True,
complete_response=True # runs stream_chunk_builder under-the-hood
)
chunks = [] print(f"response: {response}")
print(f"response usage: {response['usage']}")
for chunk in response: except Exception as e:
# print(chunk) pytest.fail(f"An exception occurred - {str(e)}")
chunks.append(chunk)
try:
print(f"chunks: {chunks}")
rebuilt_response = stream_chunk_builder(chunks)
# exract the response from the rebuilt response
rebuilt_response["id"]
rebuilt_response["object"]
rebuilt_response["created"]
rebuilt_response["model"]
rebuilt_response["choices"]
rebuilt_response["choices"][0]["index"]
choices = rebuilt_response["choices"][0]
message = choices["message"]
role = message["role"]
content = message["content"]
finish_reason = choices["finish_reason"]
print(role, content, finish_reason)
except Exception as e:
raise Exception("stream_chunk_builder failed to rebuild response", e)
test_stream_chunk_builder()

View file

@ -710,7 +710,7 @@ class Logging:
if self.stream: if self.stream:
if result.choices[0].finish_reason is not None: # if it's the last chunk if result.choices[0].finish_reason is not None: # if it's the last chunk
self.streaming_chunks.append(result) self.streaming_chunks.append(result)
complete_streaming_response = litellm.stream_chunk_builder(self.streaming_chunks) complete_streaming_response = litellm.stream_chunk_builder(self.streaming_chunks, messages=self.model_call_details.get("messages", None))
else: else:
self.streaming_chunks.append(result) self.streaming_chunks.append(result)
elif isinstance(result, OpenAIObject): elif isinstance(result, OpenAIObject):
@ -1250,7 +1250,7 @@ def client(original_function):
chunks = [] chunks = []
for idx, chunk in enumerate(result): for idx, chunk in enumerate(result):
chunks.append(chunk) chunks.append(chunk)
return litellm.stream_chunk_builder(chunks) return litellm.stream_chunk_builder(chunks, messages=kwargs.get("messages", None))
else: else:
return result return result
elif "acompletion" in kwargs and kwargs["acompletion"] == True: elif "acompletion" in kwargs and kwargs["acompletion"] == True:
@ -1360,7 +1360,7 @@ def client(original_function):
chunks = [] chunks = []
for idx, chunk in enumerate(result): for idx, chunk in enumerate(result):
chunks.append(chunk) chunks.append(chunk)
return litellm.stream_chunk_builder(chunks) return litellm.stream_chunk_builder(chunks, messages=kwargs.get("messages", None))
else: else:
return result return result
@ -5012,7 +5012,6 @@ class CustomStreamWrapper:
return return
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
print_verbose(f"completion obj content: {completion_obj['content']}") print_verbose(f"completion obj content: {completion_obj['content']}")
print_verbose(f"len(completion_obj['content']: {len(completion_obj['content'])}")
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]