Merge pull request #4015 from BerriAI/litellm_stream_options_fix_2

feat(utils.py): Support `stream_options` param across all providers
This commit is contained in:
Krish Dholakia 2024-06-04 20:59:39 -07:00 committed by GitHub
commit d6f4233441
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 129 additions and 26 deletions

View file

@ -680,12 +680,6 @@ class ModelResponse(OpenAIObject):
usage = usage
elif stream is None or stream == False:
usage = Usage()
elif (
stream == True
and stream_options is not None
and stream_options.get("include_usage") == True
):
usage = Usage()
if hidden_params:
self._hidden_params = hidden_params
@ -1143,6 +1137,7 @@ class Logging:
global supabaseClient, liteDebuggerClient, promptLayerLogger, weightsBiasesLogger, langsmithLogger, logfireLogger, capture_exception, add_breadcrumb, lunaryLogger
custom_pricing: bool = False
stream_options = None
def __init__(
self,
@ -1211,6 +1206,7 @@ class Logging:
self.litellm_params = litellm_params
self.logger_fn = litellm_params.get("logger_fn", None)
print_verbose(f"self.optional_params: {self.optional_params}")
self.model_call_details = {
"model": self.model,
"messages": self.messages,
@ -1226,6 +1222,9 @@ class Logging:
**additional_params,
}
## check if stream options is set ## - used by CustomStreamWrapper for easy instrumentation
if "stream_options" in additional_params:
self.stream_options = additional_params["stream_options"]
## check if custom pricing set ##
if (
litellm_params.get("input_cost_per_token") is not None
@ -3044,6 +3043,7 @@ def function_setup(
user="",
optional_params={},
litellm_params=litellm_params,
stream_options=kwargs.get("stream_options", None),
)
return logging_obj, kwargs
except Exception as e:
@ -5354,7 +5354,7 @@ def get_optional_params(
unsupported_params = {}
for k in non_default_params.keys():
if k not in supported_params:
if k == "user":
if k == "user" or k == "stream_options":
continue
if k == "n" and n == 1: # langchain sends n=1 as a default value
continue # skip this param
@ -10283,7 +10283,14 @@ class CustomStreamWrapper:
self.response_id = None
self.logging_loop = None
self.rules = Rules()
self.stream_options = stream_options
self.stream_options = stream_options or getattr(
logging_obj, "stream_options", None
)
self.messages = getattr(logging_obj, "messages", None)
self.sent_stream_usage = False
self.chunks: List = (
[]
) # keep track of the returned chunks - used for calculating the input/output tokens for stream options
def __iter__(self):
return self
@ -11110,8 +11117,7 @@ class CustomStreamWrapper:
model_response.system_fingerprint = self.system_fingerprint
model_response._hidden_params["custom_llm_provider"] = self.custom_llm_provider
model_response._hidden_params["created_at"] = time.time()
model_response.choices = [StreamingChoices()]
model_response.choices[0].finish_reason = None
model_response.choices = [StreamingChoices(finish_reason=None)]
return model_response
def is_delta_empty(self, delta: Delta) -> bool:
@ -11397,8 +11403,14 @@ class CustomStreamWrapper:
if (
self.stream_options
and self.stream_options.get("include_usage", False) == True
and response_obj["usage"] is not None
):
model_response.usage = response_obj["usage"]
self.sent_stream_usage = True
model_response.usage = litellm.Usage(
prompt_tokens=response_obj["usage"].prompt_tokens,
completion_tokens=response_obj["usage"].completion_tokens,
total_tokens=response_obj["usage"].total_tokens,
)
elif self.custom_llm_provider == "databricks":
response_obj = litellm.DatabricksConfig()._chunk_parser(chunk)
completion_obj["content"] = response_obj["text"]
@ -11408,8 +11420,14 @@ class CustomStreamWrapper:
if (
self.stream_options
and self.stream_options.get("include_usage", False) == True
and response_obj["usage"] is not None
):
model_response.usage = response_obj["usage"]
self.sent_stream_usage = True
model_response.usage = litellm.Usage(
prompt_tokens=response_obj["usage"].prompt_tokens,
completion_tokens=response_obj["usage"].completion_tokens,
total_tokens=response_obj["usage"].total_tokens,
)
elif self.custom_llm_provider == "azure_text":
response_obj = self.handle_azure_text_completion_chunk(chunk)
completion_obj["content"] = response_obj["text"]
@ -11466,8 +11484,14 @@ class CustomStreamWrapper:
if (
self.stream_options is not None
and self.stream_options["include_usage"] == True
and response_obj["usage"] is not None
):
model_response.usage = response_obj["usage"]
self.sent_stream_usage = True
model_response.usage = litellm.Usage(
prompt_tokens=response_obj["usage"].prompt_tokens,
completion_tokens=response_obj["usage"].completion_tokens,
total_tokens=response_obj["usage"].total_tokens,
)
model_response.model = self.model
print_verbose(
@ -11744,7 +11768,6 @@ class CustomStreamWrapper:
model_response.choices[0].finish_reason = "stop"
return model_response
## needs to handle the empty string case (even starting chunk can be an empty string)
def __next__(self):
try:
while True:
@ -11776,9 +11799,27 @@ class CustomStreamWrapper:
input=self.response_uptil_now, model=self.model
)
# RETURN RESULT
self.chunks.append(response)
return response
except StopIteration:
if self.sent_last_chunk == True:
if (
self.sent_stream_usage == False
and self.stream_options is not None
and self.stream_options.get("include_usage", False) == True
):
# send the final chunk with stream options
complete_streaming_response = litellm.stream_chunk_builder(
chunks=self.chunks, messages=self.messages
)
response = self.model_response_creator()
response.usage = complete_streaming_response.usage # type: ignore
## LOGGING
threading.Thread(
target=self.logging_obj.success_handler, args=(response,)
).start() # log response
self.sent_stream_usage = True
return response
raise # Re-raise StopIteration
else:
self.sent_last_chunk = True
@ -11876,6 +11917,7 @@ class CustomStreamWrapper:
input=self.response_uptil_now, model=self.model
)
print_verbose(f"final returned processed chunk: {processed_chunk}")
self.chunks.append(processed_chunk)
return processed_chunk
raise StopAsyncIteration
else: # temporary patch for non-aiohttp async calls
@ -11915,9 +11957,32 @@ class CustomStreamWrapper:
input=self.response_uptil_now, model=self.model
)
# RETURN RESULT
self.chunks.append(processed_chunk)
return processed_chunk
except StopAsyncIteration:
if self.sent_last_chunk == True:
if (
self.sent_stream_usage == False
and self.stream_options is not None
and self.stream_options.get("include_usage", False) == True
):
# send the final chunk with stream options
complete_streaming_response = litellm.stream_chunk_builder(
chunks=self.chunks, messages=self.messages
)
response = self.model_response_creator()
response.usage = complete_streaming_response.usage
## LOGGING
threading.Thread(
target=self.logging_obj.success_handler, args=(response,)
).start() # log response
asyncio.create_task(
self.logging_obj.async_success_handler(
response,
)
)
self.sent_stream_usage = True
return response
raise # Re-raise StopIteration
else:
self.sent_last_chunk = True