forked from phoenix/litellm-mirror
fix(utils.py): fix streaming logic
This commit is contained in:
parent
5b06627c09
commit
788e24bd83
3 changed files with 87 additions and 31 deletions
112
litellm/utils.py
112
litellm/utils.py
|
@ -276,15 +276,14 @@ class Delta(OpenAIObject):
|
|||
self.content = content
|
||||
self.role = role
|
||||
self.function_call = function_call
|
||||
if tool_calls is not None:
|
||||
if isinstance(tool_calls, dict):
|
||||
self.tool_calls = []
|
||||
for tool_call in tool_calls:
|
||||
if tool_call.get("index", None) is None:
|
||||
tool_call["index"] = 0
|
||||
self.tool_calls.append(ChatCompletionDeltaToolCall(**tool_call))
|
||||
else:
|
||||
self.tool_calls = tool_calls
|
||||
if tool_calls is not None and isinstance(tool_calls, dict):
|
||||
self.tool_calls = []
|
||||
for tool_call in tool_calls:
|
||||
if tool_call.get("index", None) is None:
|
||||
tool_call["index"] = 0
|
||||
self.tool_calls.append(ChatCompletionDeltaToolCall(**tool_call))
|
||||
else:
|
||||
self.tool_calls = tool_calls
|
||||
|
||||
def __contains__(self, key):
|
||||
# Define custom behavior for the 'in' operator
|
||||
|
@ -8722,6 +8721,9 @@ class CustomStreamWrapper:
|
|||
):
|
||||
try:
|
||||
delta = dict(original_chunk.choices[0].delta)
|
||||
model_response.system_fingerprint = (
|
||||
original_chunk.system_fingerprint
|
||||
)
|
||||
## AZURE - check if arguments is not None
|
||||
if (
|
||||
original_chunk.choices[0].delta.function_call
|
||||
|
@ -8762,32 +8764,64 @@ class CustomStreamWrapper:
|
|||
delta = dict(original_chunk.choices[0].delta)
|
||||
print_verbose(f"original delta: {delta}")
|
||||
model_response.choices[0].delta = Delta(**delta)
|
||||
print_verbose(
|
||||
f"new delta: {model_response.choices[0].delta}"
|
||||
)
|
||||
except Exception as e:
|
||||
model_response.choices[0].delta = Delta()
|
||||
else:
|
||||
return
|
||||
model_response.system_fingerprint = original_chunk.system_fingerprint
|
||||
if self.sent_first_chunk == False:
|
||||
model_response.choices[0].delta["role"] = "assistant"
|
||||
self.sent_first_chunk = True
|
||||
|
||||
print_verbose(
|
||||
f"model_response.choices[0].delta: {model_response.choices[0].delta}; completion_obj: {completion_obj}"
|
||||
)
|
||||
print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}")
|
||||
## RETURN ARG
|
||||
if (
|
||||
completion_obj["content"] is not None
|
||||
or response_obj.get("original_chunk", None) is not None
|
||||
):
|
||||
hold = False
|
||||
if completion_obj["content"] is not None:
|
||||
hold, model_response_str = self.check_special_tokens(
|
||||
chunk=completion_obj["content"],
|
||||
finish_reason=model_response.choices[0].finish_reason,
|
||||
) # filter out bos/eos tokens from openai-compatible hf endpoints
|
||||
print_verbose(
|
||||
f"hold - {hold}, model_response_str - {model_response_str}"
|
||||
)
|
||||
"content" in completion_obj
|
||||
and isinstance(completion_obj["content"], str)
|
||||
and len(completion_obj["content"]) > 0
|
||||
): # cannot set content of an OpenAI Object to be an empty string
|
||||
hold, model_response_str = self.check_special_tokens(
|
||||
chunk=completion_obj["content"],
|
||||
finish_reason=model_response.choices[0].finish_reason,
|
||||
) # filter out bos/eos tokens from openai-compatible hf endpoints
|
||||
print_verbose(
|
||||
f"hold - {hold}, model_response_str - {model_response_str}"
|
||||
)
|
||||
if hold is False:
|
||||
## check if openai/azure chunk
|
||||
original_chunk = response_obj.get("original_chunk", None)
|
||||
if original_chunk is None:
|
||||
if original_chunk:
|
||||
model_response.id = original_chunk.id
|
||||
if len(original_chunk.choices) > 0:
|
||||
try:
|
||||
delta = dict(original_chunk.choices[0].delta)
|
||||
print_verbose(f"original delta: {delta}")
|
||||
model_response.choices[0].delta = Delta(**delta)
|
||||
except Exception as e:
|
||||
model_response.choices[0].delta = Delta()
|
||||
else:
|
||||
return
|
||||
model_response.system_fingerprint = (
|
||||
original_chunk.system_fingerprint
|
||||
)
|
||||
print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}")
|
||||
if self.sent_first_chunk == False:
|
||||
model_response.choices[0].delta["role"] = "assistant"
|
||||
self.sent_first_chunk = True
|
||||
elif self.sent_first_chunk == True and hasattr(
|
||||
model_response.choices[0].delta, "role"
|
||||
):
|
||||
_initial_delta = model_response.choices[
|
||||
0
|
||||
].delta.model_dump()
|
||||
_initial_delta.pop("role", None)
|
||||
model_response.choices[0].delta = Delta(**_initial_delta)
|
||||
print_verbose(
|
||||
f"model_response.choices[0].delta: {model_response.choices[0].delta}"
|
||||
)
|
||||
else:
|
||||
## else
|
||||
completion_obj["content"] = model_response_str
|
||||
if self.sent_first_chunk == False:
|
||||
completion_obj["role"] = "assistant"
|
||||
|
@ -8812,6 +8846,14 @@ class CustomStreamWrapper:
|
|||
model_response.choices[0].finish_reason
|
||||
) # ensure consistent output to openai
|
||||
return model_response
|
||||
elif (
|
||||
model_response.choices[0].delta.tool_calls is not None
|
||||
or model_response.choices[0].delta.function_call is not None
|
||||
):
|
||||
if self.sent_first_chunk == False:
|
||||
model_response.choices[0].delta["role"] = "assistant"
|
||||
self.sent_first_chunk = True
|
||||
return model_response
|
||||
else:
|
||||
return
|
||||
except StopIteration:
|
||||
|
@ -8860,7 +8902,14 @@ class CustomStreamWrapper:
|
|||
print_verbose(f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}")
|
||||
response: Optional[ModelResponse] = self.chunk_creator(chunk=chunk)
|
||||
print_verbose(f"PROCESSED CHUNK POST CHUNK CREATOR: {response}")
|
||||
if response is None:
|
||||
|
||||
if response is None or (
|
||||
isinstance(response, ModelResponse)
|
||||
and isinstance(response.choices[0], StreamingChoices)
|
||||
and response.choices[0].delta.content is None
|
||||
and response.choices[0].delta.function_call is None
|
||||
and response.choices[0].delta.tool_calls is None
|
||||
):
|
||||
continue
|
||||
## LOGGING
|
||||
threading.Thread(
|
||||
|
@ -8904,7 +8953,11 @@ class CustomStreamWrapper:
|
|||
print_verbose(f"value of async chunk: {chunk}")
|
||||
if chunk == "None" or chunk is None:
|
||||
raise Exception
|
||||
elif self.custom_llm_provider == "gemini" and len(chunk.parts) == 0:
|
||||
elif (
|
||||
self.custom_llm_provider == "gemini"
|
||||
and hasattr(chunk, "parts")
|
||||
and len(chunk.parts) == 0
|
||||
):
|
||||
continue
|
||||
# chunk_creator() does logging/stream chunk building. We need to let it know its being called in_async_func, so we don't double add chunks.
|
||||
# __anext__ also calls async_success_handler, which does logging
|
||||
|
@ -8933,6 +8986,7 @@ class CustomStreamWrapper:
|
|||
self.rules.post_call_rules(
|
||||
input=self.response_uptil_now, model=self.model
|
||||
)
|
||||
print_verbose(f"final returned processed chunk: {processed_chunk}")
|
||||
return processed_chunk
|
||||
raise StopAsyncIteration
|
||||
else: # temporary patch for non-aiohttp async calls
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue