mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 11:14:04 +00:00
Merge pull request #2203 from BerriAI/litellm_streaming_caching_fix
fix(utils.py): support returning caching streaming response for function calling streaming calls
This commit is contained in:
commit
95b5b7f1fc
5 changed files with 168 additions and 64 deletions
175
litellm/utils.py
175
litellm/utils.py
|
@ -213,6 +213,13 @@ class Function(OpenAIObject):
|
|||
name: str
|
||||
|
||||
|
||||
class ChatCompletionDeltaToolCall(OpenAIObject):
|
||||
id: str
|
||||
function: Function
|
||||
type: str
|
||||
index: int
|
||||
|
||||
|
||||
class ChatCompletionMessageToolCall(OpenAIObject):
|
||||
id: str
|
||||
function: Function
|
||||
|
@ -269,7 +276,14 @@ class Delta(OpenAIObject):
|
|||
self.content = content
|
||||
self.role = role
|
||||
self.function_call = function_call
|
||||
self.tool_calls = tool_calls
|
||||
if tool_calls is not None and isinstance(tool_calls, dict):
|
||||
self.tool_calls = []
|
||||
for tool_call in tool_calls:
|
||||
if tool_call.get("index", None) is None:
|
||||
tool_call["index"] = 0
|
||||
self.tool_calls.append(ChatCompletionDeltaToolCall(**tool_call))
|
||||
else:
|
||||
self.tool_calls = tool_calls
|
||||
|
||||
def __contains__(self, key):
|
||||
# Define custom behavior for the 'in' operator
|
||||
|
@ -1182,7 +1196,8 @@ class Logging:
|
|||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
)
|
||||
except:
|
||||
except Exception as e:
|
||||
|
||||
complete_streaming_response = None
|
||||
else:
|
||||
self.sync_streaming_chunks.append(result)
|
||||
|
@ -5847,6 +5862,18 @@ async def convert_to_streaming_response_async(response_object: Optional[dict] =
|
|||
choice_list = []
|
||||
|
||||
for idx, choice in enumerate(response_object["choices"]):
|
||||
if (
|
||||
choice["message"].get("tool_calls", None) is not None
|
||||
and isinstance(choice["message"]["tool_calls"], list)
|
||||
and len(choice["message"]["tool_calls"]) > 0
|
||||
and isinstance(choice["message"]["tool_calls"][0], dict)
|
||||
):
|
||||
pydantic_tool_calls = []
|
||||
for index, t in enumerate(choice["message"]["tool_calls"]):
|
||||
if "index" not in t:
|
||||
t["index"] = index
|
||||
pydantic_tool_calls.append(ChatCompletionDeltaToolCall(**t))
|
||||
choice["message"]["tool_calls"] = pydantic_tool_calls
|
||||
delta = Delta(
|
||||
content=choice["message"].get("content", None),
|
||||
role=choice["message"]["role"],
|
||||
|
@ -8650,6 +8677,7 @@ class CustomStreamWrapper:
|
|||
"text": chunk.choices[0].delta.content,
|
||||
"is_finished": True,
|
||||
"finish_reason": chunk.choices[0].finish_reason,
|
||||
"original_chunk": chunk,
|
||||
}
|
||||
|
||||
completion_obj["content"] = response_obj["text"]
|
||||
|
@ -8681,13 +8709,82 @@ class CustomStreamWrapper:
|
|||
|
||||
model_response.model = self.model
|
||||
print_verbose(
|
||||
f"model_response: {model_response}; completion_obj: {completion_obj}"
|
||||
)
|
||||
print_verbose(
|
||||
f"model_response finish reason 3: {model_response.choices[0].finish_reason}"
|
||||
f"model_response finish reason 3: {model_response.choices[0].finish_reason}; response_obj={response_obj}"
|
||||
)
|
||||
## FUNCTION CALL PARSING
|
||||
if (
|
||||
len(completion_obj["content"]) > 0
|
||||
response_obj is not None
|
||||
and response_obj.get("original_chunk", None) is not None
|
||||
): # function / tool calling branch - only set for openai/azure compatible endpoints
|
||||
# enter this branch when no content has been passed in response
|
||||
original_chunk = response_obj.get("original_chunk", None)
|
||||
model_response.id = original_chunk.id
|
||||
if len(original_chunk.choices) > 0:
|
||||
if (
|
||||
original_chunk.choices[0].delta.function_call is not None
|
||||
or original_chunk.choices[0].delta.tool_calls is not None
|
||||
):
|
||||
try:
|
||||
delta = dict(original_chunk.choices[0].delta)
|
||||
model_response.system_fingerprint = (
|
||||
original_chunk.system_fingerprint
|
||||
)
|
||||
## AZURE - check if arguments is not None
|
||||
if (
|
||||
original_chunk.choices[0].delta.function_call
|
||||
is not None
|
||||
):
|
||||
if (
|
||||
getattr(
|
||||
original_chunk.choices[0].delta.function_call,
|
||||
"arguments",
|
||||
)
|
||||
is None
|
||||
):
|
||||
original_chunk.choices[
|
||||
0
|
||||
].delta.function_call.arguments = ""
|
||||
elif original_chunk.choices[0].delta.tool_calls is not None:
|
||||
if isinstance(
|
||||
original_chunk.choices[0].delta.tool_calls, list
|
||||
):
|
||||
for t in original_chunk.choices[0].delta.tool_calls:
|
||||
if hasattr(t, "functions") and hasattr(
|
||||
t.functions, "arguments"
|
||||
):
|
||||
if (
|
||||
getattr(
|
||||
t.function,
|
||||
"arguments",
|
||||
)
|
||||
is None
|
||||
):
|
||||
t.function.arguments = ""
|
||||
model_response.choices[0].delta = Delta(**delta)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
model_response.choices[0].delta = Delta()
|
||||
else:
|
||||
try:
|
||||
delta = dict(original_chunk.choices[0].delta)
|
||||
print_verbose(f"original delta: {delta}")
|
||||
model_response.choices[0].delta = Delta(**delta)
|
||||
print_verbose(
|
||||
f"new delta: {model_response.choices[0].delta}"
|
||||
)
|
||||
except Exception as e:
|
||||
model_response.choices[0].delta = Delta()
|
||||
else:
|
||||
return
|
||||
print_verbose(
|
||||
f"model_response.choices[0].delta: {model_response.choices[0].delta}; completion_obj: {completion_obj}"
|
||||
)
|
||||
print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}")
|
||||
## RETURN ARG
|
||||
if (
|
||||
"content" in completion_obj
|
||||
and isinstance(completion_obj["content"], str)
|
||||
and len(completion_obj["content"]) > 0
|
||||
): # cannot set content of an OpenAI Object to be an empty string
|
||||
hold, model_response_str = self.check_special_tokens(
|
||||
chunk=completion_obj["content"],
|
||||
|
@ -8739,7 +8836,7 @@ class CustomStreamWrapper:
|
|||
return model_response
|
||||
else:
|
||||
return
|
||||
elif model_response.choices[0].finish_reason:
|
||||
elif model_response.choices[0].finish_reason is not None:
|
||||
# flush any remaining holding chunk
|
||||
if len(self.holding_chunk) > 0:
|
||||
if model_response.choices[0].delta.content is None:
|
||||
|
@ -8749,61 +8846,15 @@ class CustomStreamWrapper:
|
|||
self.holding_chunk + model_response.choices[0].delta.content
|
||||
)
|
||||
self.holding_chunk = ""
|
||||
# get any function call arguments
|
||||
model_response.choices[0].finish_reason = map_finish_reason(
|
||||
model_response.choices[0].finish_reason
|
||||
) # ensure consistent output to openai
|
||||
return model_response
|
||||
elif (
|
||||
response_obj is not None
|
||||
and response_obj.get("original_chunk", None) is not None
|
||||
): # function / tool calling branch - only set for openai/azure compatible endpoints
|
||||
# enter this branch when no content has been passed in response
|
||||
original_chunk = response_obj.get("original_chunk", None)
|
||||
model_response.id = original_chunk.id
|
||||
if len(original_chunk.choices) > 0:
|
||||
if (
|
||||
original_chunk.choices[0].delta.function_call is not None
|
||||
or original_chunk.choices[0].delta.tool_calls is not None
|
||||
):
|
||||
try:
|
||||
delta = dict(original_chunk.choices[0].delta)
|
||||
## AZURE - check if arguments is not None
|
||||
if (
|
||||
original_chunk.choices[0].delta.function_call
|
||||
is not None
|
||||
):
|
||||
if (
|
||||
getattr(
|
||||
original_chunk.choices[0].delta.function_call,
|
||||
"arguments",
|
||||
)
|
||||
is None
|
||||
):
|
||||
original_chunk.choices[
|
||||
0
|
||||
].delta.function_call.arguments = ""
|
||||
elif original_chunk.choices[0].delta.tool_calls is not None:
|
||||
if isinstance(
|
||||
original_chunk.choices[0].delta.tool_calls, list
|
||||
):
|
||||
for t in original_chunk.choices[0].delta.tool_calls:
|
||||
if (
|
||||
getattr(
|
||||
t.function,
|
||||
"arguments",
|
||||
)
|
||||
is None
|
||||
):
|
||||
t.function.arguments = ""
|
||||
model_response.choices[0].delta = Delta(**delta)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
model_response.choices[0].delta = Delta()
|
||||
else:
|
||||
return
|
||||
else:
|
||||
return
|
||||
model_response.system_fingerprint = original_chunk.system_fingerprint
|
||||
model_response.choices[0].delta.tool_calls is not None
|
||||
or model_response.choices[0].delta.function_call is not None
|
||||
):
|
||||
if self.sent_first_chunk == False:
|
||||
model_response.choices[0].delta["role"] = "assistant"
|
||||
self.sent_first_chunk = True
|
||||
|
@ -8856,6 +8907,7 @@ class CustomStreamWrapper:
|
|||
print_verbose(f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}")
|
||||
response: Optional[ModelResponse] = self.chunk_creator(chunk=chunk)
|
||||
print_verbose(f"PROCESSED CHUNK POST CHUNK CREATOR: {response}")
|
||||
|
||||
if response is None:
|
||||
continue
|
||||
## LOGGING
|
||||
|
@ -8900,7 +8952,11 @@ class CustomStreamWrapper:
|
|||
print_verbose(f"value of async chunk: {chunk}")
|
||||
if chunk == "None" or chunk is None:
|
||||
raise Exception
|
||||
elif self.custom_llm_provider == "gemini" and len(chunk.parts) == 0:
|
||||
elif (
|
||||
self.custom_llm_provider == "gemini"
|
||||
and hasattr(chunk, "parts")
|
||||
and len(chunk.parts) == 0
|
||||
):
|
||||
continue
|
||||
# chunk_creator() does logging/stream chunk building. We need to let it know its being called in_async_func, so we don't double add chunks.
|
||||
# __anext__ also calls async_success_handler, which does logging
|
||||
|
@ -8929,6 +8985,7 @@ class CustomStreamWrapper:
|
|||
self.rules.post_call_rules(
|
||||
input=self.response_uptil_now, model=self.model
|
||||
)
|
||||
print_verbose(f"final returned processed chunk: {processed_chunk}")
|
||||
return processed_chunk
|
||||
raise StopAsyncIteration
|
||||
else: # temporary patch for non-aiohttp async calls
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue