Litellm dev bedrock anthropic 3 7 v2 (#8843)

* feat(bedrock/converse/transformation.py): support claude-3-7-sonnet reasoning_Content transformation

Closes https://github.com/BerriAI/litellm/issues/8777

* fix(bedrock/): support returning `reasoning_content` on streaming for claude-3-7

Resolves https://github.com/BerriAI/litellm/issues/8777

* feat(bedrock/): unify converse reasoning content blocks for consistency across anthropic and bedrock

* fix(anthropic/chat/transformation.py): handle deepseek-style 'reasoning_content' extraction within transformation.py

simpler logic

* feat(bedrock/): fix streaming to return blocks in consistent format

* fix: fix linting error

* test: fix test

* feat(factory.py): fix bedrock thinking block translation on tool calling

allows passing the thinking blocks back to bedrock for tool calling

* fix(types/utils.py): don't exclude provider_specific_fields on model dump

ensures consistent responses

* fix: fix linting errors

* fix(convert_dict_to_response.py): pass reasoning_content on root

* fix: test

* fix(streaming_handler.py): add helper util for setting model id

* fix(streaming_handler.py): fix setting model id on model response stream chunk

* fix(streaming_handler.py): fix linting error

* fix(streaming_handler.py): fix linting error

* fix(types/utils.py): add provider_specific_fields to model stream response

* fix(streaming_handler.py): copy provider specific fields and add them to the root of the streaming response

* fix(streaming_handler.py): fix check

* fix: fix test

* fix(types/utils.py): ensure messages content is always openai compatible

* fix(types/utils.py): fix delta object to always be openai compatible

only introduce new params if variable exists

* test: fix bedrock nova tests

* test: skip flaky test

* test: skip flaky test in ci/cd
This commit is contained in:
Krish Dholakia 2025-02-26 16:05:33 -08:00 committed by GitHub
parent f3ef6c92a3
commit 05a973bf19
20 changed files with 447 additions and 149 deletions

View file

@ -473,6 +473,7 @@ def convert_to_model_response_object( # noqa: PLR0915
tool_calls=tool_calls,
audio=choice["message"].get("audio", None),
provider_specific_fields=provider_specific_fields,
reasoning_content=reasoning_content,
)
finish_reason = choice.get("finish_reason", None)
if finish_reason is None:

View file

@ -2151,6 +2151,10 @@ from email.message import Message
import httpx
from litellm.types.llms.bedrock import (
BedrockConverseReasoningContentBlock,
BedrockConverseReasoningTextBlock,
)
from litellm.types.llms.bedrock import ContentBlock as BedrockContentBlock
from litellm.types.llms.bedrock import DocumentBlock as BedrockDocumentBlock
from litellm.types.llms.bedrock import ImageBlock as BedrockImageBlock
@ -2963,6 +2967,28 @@ class BedrockConverseMessagesProcessor:
return contents
@staticmethod
def translate_thinking_blocks_to_reasoning_content_blocks(
thinking_blocks: List[ChatCompletionThinkingBlock],
) -> List[BedrockContentBlock]:
reasoning_content_blocks: List[BedrockContentBlock] = []
for thinking_block in thinking_blocks:
reasoning_text = thinking_block.get("thinking")
reasoning_signature = thinking_block.get("signature_delta")
text_block = BedrockConverseReasoningTextBlock(
text=reasoning_text or "",
)
if reasoning_signature is not None:
text_block["signature"] = reasoning_signature
reasoning_content_block = BedrockConverseReasoningContentBlock(
reasoningText=text_block,
)
bedrock_content_block = BedrockContentBlock(
reasoningContent=reasoning_content_block
)
reasoning_content_blocks.append(bedrock_content_block)
return reasoning_content_blocks
def _bedrock_converse_messages_pt( # noqa: PLR0915
messages: List,
@ -3109,11 +3135,23 @@ def _bedrock_converse_messages_pt( # noqa: PLR0915
assistant_content: List[BedrockContentBlock] = []
## MERGE CONSECUTIVE ASSISTANT CONTENT ##
while msg_i < len(messages) and messages[msg_i]["role"] == "assistant":
assistant_message_block = get_assistant_message_block_or_continue_message(
message=messages[msg_i],
assistant_continue_message=assistant_continue_message,
)
_assistant_content = assistant_message_block.get("content", None)
thinking_blocks = cast(
Optional[List[ChatCompletionThinkingBlock]],
assistant_message_block.get("thinking_blocks"),
)
if thinking_blocks is not None:
assistant_content.extend(
BedrockConverseMessagesProcessor.translate_thinking_blocks_to_reasoning_content_blocks(
thinking_blocks
)
)
if _assistant_content is not None and isinstance(_assistant_content, list):
assistants_parts: List[BedrockContentBlock] = []

View file

@ -5,7 +5,7 @@ import threading
import time
import traceback
import uuid
from typing import Any, Callable, Dict, List, Optional, cast
from typing import Any, Callable, Dict, List, Optional, Union, cast
import httpx
from pydantic import BaseModel
@ -14,6 +14,7 @@ import litellm
from litellm import verbose_logger
from litellm.litellm_core_utils.redact_messages import LiteLLMLoggingObject
from litellm.litellm_core_utils.thread_pool_executor import executor
from litellm.types.llms.openai import ChatCompletionChunk
from litellm.types.utils import Delta
from litellm.types.utils import GenericStreamingChunk as GChunk
from litellm.types.utils import (
@ -110,7 +111,7 @@ class CustomStreamWrapper:
) # GUARANTEE OPENAI HEADERS IN RESPONSE
self._response_headers = _response_headers
self.response_id = None
self.response_id: Optional[str] = None
self.logging_loop = None
self.rules = Rules()
self.stream_options = stream_options or getattr(
@ -721,6 +722,39 @@ class CustomStreamWrapper:
is_empty = False
return is_empty
def set_model_id(
self, id: str, model_response: ModelResponseStream
) -> ModelResponseStream:
"""
Set the model id and response id to the given id.
Ensure model id is always the same across all chunks.
If first chunk sent + id set, use that id for all chunks.
"""
if self.response_id is None:
self.response_id = id
if self.response_id is not None and isinstance(self.response_id, str):
model_response.id = self.response_id
return model_response
def copy_model_response_level_provider_specific_fields(
self,
original_chunk: Union[ModelResponseStream, ChatCompletionChunk],
model_response: ModelResponseStream,
) -> ModelResponseStream:
"""
Copy provider_specific_fields from original_chunk to model_response.
"""
provider_specific_fields = getattr(
original_chunk, "provider_specific_fields", None
)
if provider_specific_fields is not None:
model_response.provider_specific_fields = provider_specific_fields
for k, v in provider_specific_fields.items():
setattr(model_response, k, v)
return model_response
def return_processed_chunk_logic( # noqa
self,
completion_obj: Dict[str, Any],
@ -747,6 +781,10 @@ class CustomStreamWrapper:
and completion_obj["function_call"] is not None
)
or (model_response.choices[0].delta.provider_specific_fields is not None)
or (
"provider_specific_fields" in model_response
and model_response.choices[0].delta.provider_specific_fields is not None
)
or (
"provider_specific_fields" in response_obj
and response_obj["provider_specific_fields"] is not None
@ -763,8 +801,6 @@ class CustomStreamWrapper:
## check if openai/azure chunk
original_chunk = response_obj.get("original_chunk", None)
if original_chunk:
model_response.id = original_chunk.id
self.response_id = original_chunk.id
if len(original_chunk.choices) > 0:
choices = []
for choice in original_chunk.choices:
@ -798,9 +834,10 @@ class CustomStreamWrapper:
model_response.choices[0].delta, "role"
):
_initial_delta = model_response.choices[0].delta.model_dump()
_initial_delta.pop("role", None)
model_response.choices[0].delta = Delta(**_initial_delta)
print_verbose(
verbose_logger.debug(
f"model_response.choices[0].delta: {model_response.choices[0].delta}"
)
else:
@ -870,7 +907,7 @@ class CustomStreamWrapper:
self.chunks.append(model_response)
return
def chunk_creator(self, chunk): # type: ignore # noqa: PLR0915
def chunk_creator(self, chunk: Any): # type: ignore # noqa: PLR0915
model_response = self.model_response_creator()
response_obj: Dict[str, Any] = {}
@ -886,16 +923,13 @@ class CustomStreamWrapper:
) # check if chunk is a generic streaming chunk
) or (
self.custom_llm_provider
and (
self.custom_llm_provider == "anthropic"
or self.custom_llm_provider in litellm._custom_providers
)
and self.custom_llm_provider in litellm._custom_providers
):
if self.received_finish_reason is not None:
if "provider_specific_fields" not in chunk:
raise StopIteration
anthropic_response_obj: GChunk = chunk
anthropic_response_obj: GChunk = cast(GChunk, chunk)
completion_obj["content"] = anthropic_response_obj["text"]
if anthropic_response_obj["is_finished"]:
self.received_finish_reason = anthropic_response_obj[
@ -927,7 +961,7 @@ class CustomStreamWrapper:
].items():
setattr(model_response, key, value)
response_obj = anthropic_response_obj
response_obj = cast(Dict[str, Any], anthropic_response_obj)
elif self.model == "replicate" or self.custom_llm_provider == "replicate":
response_obj = self.handle_replicate_chunk(chunk)
completion_obj["content"] = response_obj["text"]
@ -989,6 +1023,7 @@ class CustomStreamWrapper:
try:
completion_obj["content"] = chunk.text
except Exception as e:
original_exception = e
if "Part has no text." in str(e):
## check for function calling
function_call = (
@ -1030,7 +1065,7 @@ class CustomStreamWrapper:
_model_response.choices = [_streaming_response]
response_obj = {"original_chunk": _model_response}
else:
raise e
raise original_exception
if (
hasattr(chunk.candidates[0], "finish_reason")
and chunk.candidates[0].finish_reason.name
@ -1093,8 +1128,9 @@ class CustomStreamWrapper:
total_tokens=response_obj["usage"].total_tokens,
)
elif self.custom_llm_provider == "text-completion-codestral":
response_obj = litellm.CodestralTextCompletionConfig()._chunk_parser(
chunk
response_obj = cast(
Dict[str, Any],
litellm.CodestralTextCompletionConfig()._chunk_parser(chunk),
)
completion_obj["content"] = response_obj["text"]
print_verbose(f"completion obj content: {completion_obj['content']}")
@ -1156,8 +1192,9 @@ class CustomStreamWrapper:
self.received_finish_reason = response_obj["finish_reason"]
if response_obj.get("original_chunk", None) is not None:
if hasattr(response_obj["original_chunk"], "id"):
model_response.id = response_obj["original_chunk"].id
self.response_id = model_response.id
model_response = self.set_model_id(
response_obj["original_chunk"].id, model_response
)
if hasattr(response_obj["original_chunk"], "system_fingerprint"):
model_response.system_fingerprint = response_obj[
"original_chunk"
@ -1206,8 +1243,16 @@ class CustomStreamWrapper:
): # function / tool calling branch - only set for openai/azure compatible endpoints
# enter this branch when no content has been passed in response
original_chunk = response_obj.get("original_chunk", None)
model_response.id = original_chunk.id
self.response_id = original_chunk.id
if hasattr(original_chunk, "id"):
model_response = self.set_model_id(
original_chunk.id, model_response
)
if hasattr(original_chunk, "provider_specific_fields"):
model_response = (
self.copy_model_response_level_provider_specific_fields(
original_chunk, model_response
)
)
if original_chunk.choices and len(original_chunk.choices) > 0:
delta = original_chunk.choices[0].delta
if delta is not None and (