diff --git a/docs/my-website/docs/providers/anthropic.md b/docs/my-website/docs/providers/anthropic.md index 0c7b2a442..290e094d0 100644 --- a/docs/my-website/docs/providers/anthropic.md +++ b/docs/my-website/docs/providers/anthropic.md @@ -864,3 +864,96 @@ Human: How do I boil water? Assistant: ``` + +## Usage - PDF + +Pass base64 encoded PDF files to Anthropic models using the `image_url` field. + + + + +### **using base64** +```python +from litellm import completion, supports_pdf_input +import base64 +import requests + +# URL of the file +url = "https://storage.googleapis.com/cloud-samples-data/generative-ai/pdf/2403.05530.pdf" + +# Download the file +response = requests.get(url) +file_data = response.content + +encoded_file = base64.b64encode(file_data).decode("utf-8") + +## check if model supports pdf input - (2024/11/11) only claude-3-5-haiku-20241022 supports it +supports_pdf_input("anthropic/claude-3-5-haiku-20241022") # True + +response = completion( + model="anthropic/claude-3-5-haiku-20241022", + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": "You are a very professional document summarization specialist. Please summarize the given document."}, + { + "type": "image_url", + "image_url": f"data:application/pdf;base64,{encoded_file}", # 👈 PDF + }, + ], + } + ], + max_tokens=300, +) + +print(response.choices[0]) +``` + + + +1. Add model to config + +```yaml +- model_name: claude-3-5-haiku-20241022 + litellm_params: + model: anthropic/claude-3-5-haiku-20241022 + api_key: os.environ/ANTHROPIC_API_KEY +``` + +2. Start Proxy + +``` +litellm --config /path/to/config.yaml +``` + +3. Test it! + +```bash +curl http://0.0.0.0:4000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer " \ + -d '{ + "model": "claude-3-5-haiku-20241022", + "messages": [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": "You are a very professional document summarization specialist. Please summarize the given document" + }, + { + "type": "image_url", + "image_url": "data:application/pdf;base64,{encoded_file}" # 👈 PDF + } + } + ] + } + ], + "max_tokens": 300 + }' + +``` + + diff --git a/litellm/__init__.py b/litellm/__init__.py index b739afb93..9812de1d8 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -375,6 +375,7 @@ open_ai_text_completion_models: List = [] cohere_models: List = [] cohere_chat_models: List = [] mistral_chat_models: List = [] +text_completion_codestral_models: List = [] anthropic_models: List = [] empower_models: List = [] openrouter_models: List = [] @@ -401,6 +402,19 @@ deepinfra_models: List = [] perplexity_models: List = [] watsonx_models: List = [] gemini_models: List = [] +xai_models: List = [] +deepseek_models: List = [] +azure_ai_models: List = [] +voyage_models: List = [] +databricks_models: List = [] +cloudflare_models: List = [] +codestral_models: List = [] +friendliai_models: List = [] +palm_models: List = [] +groq_models: List = [] +azure_models: List = [] +anyscale_models: List = [] +cerebras_models: List = [] def add_known_models(): @@ -477,6 +491,34 @@ def add_known_models(): # ignore the 'up-to', '-to-' model names -> not real models. just for cost tracking based on model params. if "-to-" not in key: fireworks_ai_embedding_models.append(key) + elif value.get("litellm_provider") == "text-completion-codestral": + text_completion_codestral_models.append(key) + elif value.get("litellm_provider") == "xai": + xai_models.append(key) + elif value.get("litellm_provider") == "deepseek": + deepseek_models.append(key) + elif value.get("litellm_provider") == "azure_ai": + azure_ai_models.append(key) + elif value.get("litellm_provider") == "voyage": + voyage_models.append(key) + elif value.get("litellm_provider") == "databricks": + databricks_models.append(key) + elif value.get("litellm_provider") == "cloudflare": + cloudflare_models.append(key) + elif value.get("litellm_provider") == "codestral": + codestral_models.append(key) + elif value.get("litellm_provider") == "friendliai": + friendliai_models.append(key) + elif value.get("litellm_provider") == "palm": + palm_models.append(key) + elif value.get("litellm_provider") == "groq": + groq_models.append(key) + elif value.get("litellm_provider") == "azure": + azure_models.append(key) + elif value.get("litellm_provider") == "anyscale": + anyscale_models.append(key) + elif value.get("litellm_provider") == "cerebras": + cerebras_models.append(key) add_known_models() @@ -722,6 +764,20 @@ model_list = ( + vertex_language_models + watsonx_models + gemini_models + + text_completion_codestral_models + + xai_models + + deepseek_models + + azure_ai_models + + voyage_models + + databricks_models + + cloudflare_models + + codestral_models + + friendliai_models + + palm_models + + groq_models + + azure_models + + anyscale_models + + cerebras_models ) @@ -778,6 +834,7 @@ class LlmProviders(str, Enum): FIREWORKS_AI = "fireworks_ai" FRIENDLIAI = "friendliai" WATSONX = "watsonx" + WATSONX_TEXT = "watsonx_text" TRITON = "triton" PREDIBASE = "predibase" DATABRICKS = "databricks" @@ -794,6 +851,7 @@ provider_list: List[Union[LlmProviders, str]] = list(LlmProviders) models_by_provider: dict = { "openai": open_ai_chat_completion_models + open_ai_text_completion_models, + "text-completion-openai": open_ai_text_completion_models, "cohere": cohere_models + cohere_chat_models, "cohere_chat": cohere_chat_models, "anthropic": anthropic_models, @@ -817,6 +875,23 @@ models_by_provider: dict = { "watsonx": watsonx_models, "gemini": gemini_models, "fireworks_ai": fireworks_ai_models + fireworks_ai_embedding_models, + "aleph_alpha": aleph_alpha_models, + "text-completion-codestral": text_completion_codestral_models, + "xai": xai_models, + "deepseek": deepseek_models, + "mistral": mistral_chat_models, + "azure_ai": azure_ai_models, + "voyage": voyage_models, + "databricks": databricks_models, + "cloudflare": cloudflare_models, + "codestral": codestral_models, + "nlp_cloud": nlp_cloud_models, + "friendliai": friendliai_models, + "palm": palm_models, + "groq": groq_models, + "azure": azure_models, + "anyscale": anyscale_models, + "cerebras": cerebras_models, } # mapping for those models which have larger equivalents @@ -889,7 +964,6 @@ from .utils import ( supports_system_messages, get_litellm_params, acreate, - get_model_list, get_max_tokens, get_model_info, register_prompt_template, diff --git a/litellm/integrations/opentelemetry.py b/litellm/integrations/opentelemetry.py index 8102f2c60..30a280e57 100644 --- a/litellm/integrations/opentelemetry.py +++ b/litellm/integrations/opentelemetry.py @@ -2,14 +2,16 @@ import os from dataclasses import dataclass from datetime import datetime from functools import wraps -from typing import TYPE_CHECKING, Any, Dict, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union import litellm from litellm._logging import verbose_logger from litellm.integrations.custom_logger import CustomLogger from litellm.types.services import ServiceLoggerPayload from litellm.types.utils import ( + ChatCompletionMessageToolCall, EmbeddingResponse, + Function, ImageResponse, ModelResponse, StandardLoggingPayload, @@ -403,6 +405,28 @@ class OpenTelemetry(CustomLogger): except Exception: return "" + @staticmethod + def _tool_calls_kv_pair( + tool_calls: List[ChatCompletionMessageToolCall], + ) -> Dict[str, Any]: + from litellm.proxy._types import SpanAttributes + + kv_pairs: Dict[str, Any] = {} + for idx, tool_call in enumerate(tool_calls): + _function = tool_call.get("function") + if not _function: + continue + + keys = Function.__annotations__.keys() + for key in keys: + _value = _function.get(key) + if _value: + kv_pairs[ + f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.function_call.{key}" + ] = _value + + return kv_pairs + def set_attributes( # noqa: PLR0915 self, span: Span, kwargs, response_obj: Optional[Any] ): @@ -597,18 +621,13 @@ class OpenTelemetry(CustomLogger): message = choice.get("message") tool_calls = message.get("tool_calls") if tool_calls: - self.safe_set_attribute( - span=span, - key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.function_call.name", - value=tool_calls[0].get("function").get("name"), - ) - self.safe_set_attribute( - span=span, - key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.function_call.arguments", - value=tool_calls[0] - .get("function") - .get("arguments"), - ) + kv_pairs = OpenTelemetry._tool_calls_kv_pair(tool_calls) # type: ignore + for key, value in kv_pairs.items(): + self.safe_set_attribute( + span=span, + key=key, + value=value, + ) except Exception as e: verbose_logger.exception( diff --git a/litellm/llms/anthropic/chat/handler.py b/litellm/llms/anthropic/chat/handler.py index da95ac075..2d119a28f 100644 --- a/litellm/llms/anthropic/chat/handler.py +++ b/litellm/llms/anthropic/chat/handler.py @@ -71,11 +71,12 @@ def validate_environment( prompt_caching_set = AnthropicConfig().is_cache_control_set(messages=messages) computer_tool_used = AnthropicConfig().is_computer_tool_used(tools=tools) - + pdf_used = AnthropicConfig().is_pdf_used(messages=messages) headers = AnthropicConfig().get_anthropic_headers( anthropic_version=anthropic_version, computer_tool_used=computer_tool_used, prompt_caching_set=prompt_caching_set, + pdf_used=pdf_used, api_key=api_key, ) diff --git a/litellm/llms/anthropic/chat/transformation.py b/litellm/llms/anthropic/chat/transformation.py index ec3285473..18c53b696 100644 --- a/litellm/llms/anthropic/chat/transformation.py +++ b/litellm/llms/anthropic/chat/transformation.py @@ -104,6 +104,7 @@ class AnthropicConfig: anthropic_version: Optional[str] = None, computer_tool_used: bool = False, prompt_caching_set: bool = False, + pdf_used: bool = False, ) -> dict: import json @@ -112,6 +113,8 @@ class AnthropicConfig: betas.append("prompt-caching-2024-07-31") if computer_tool_used: betas.append("computer-use-2024-10-22") + if pdf_used: + betas.append("pdfs-2024-09-25") headers = { "anthropic-version": anthropic_version or "2023-06-01", "x-api-key": api_key, @@ -365,6 +368,21 @@ class AnthropicConfig: return True return False + def is_pdf_used(self, messages: List[AllMessageValues]) -> bool: + """ + Set to true if media passed into messages. + """ + for message in messages: + if ( + "content" in message + and message["content"] is not None + and isinstance(message["content"], list) + ): + for content in message["content"]: + if "type" in content: + return True + return False + def translate_system_message( self, messages: List[AllMessageValues] ) -> List[AnthropicSystemMessageContent]: diff --git a/litellm/llms/prompt_templates/factory.py b/litellm/llms/prompt_templates/factory.py index aee304760..80ad2ca35 100644 --- a/litellm/llms/prompt_templates/factory.py +++ b/litellm/llms/prompt_templates/factory.py @@ -1330,7 +1330,10 @@ def convert_to_anthropic_tool_invoke( def add_cache_control_to_content( anthropic_content_element: Union[ - dict, AnthropicMessagesImageParam, AnthropicMessagesTextParam + dict, + AnthropicMessagesImageParam, + AnthropicMessagesTextParam, + AnthropicMessagesDocumentParam, ], orignal_content_element: Union[dict, AllMessageValues], ): @@ -1343,6 +1346,32 @@ def add_cache_control_to_content( return anthropic_content_element +def _anthropic_content_element_factory( + image_chunk: GenericImageParsingChunk, +) -> Union[AnthropicMessagesImageParam, AnthropicMessagesDocumentParam]: + if image_chunk["media_type"] == "application/pdf": + _anthropic_content_element: Union[ + AnthropicMessagesDocumentParam, AnthropicMessagesImageParam + ] = AnthropicMessagesDocumentParam( + type="document", + source=AnthropicContentParamSource( + type="base64", + media_type=image_chunk["media_type"], + data=image_chunk["data"], + ), + ) + else: + _anthropic_content_element = AnthropicMessagesImageParam( + type="image", + source=AnthropicContentParamSource( + type="base64", + media_type=image_chunk["media_type"], + data=image_chunk["data"], + ), + ) + return _anthropic_content_element + + def anthropic_messages_pt( # noqa: PLR0915 messages: List[AllMessageValues], model: str, @@ -1400,15 +1429,9 @@ def anthropic_messages_pt( # noqa: PLR0915 openai_image_url=m["image_url"]["url"] ) - _anthropic_content_element = AnthropicMessagesImageParam( - type="image", - source=AnthropicImageParamSource( - type="base64", - media_type=image_chunk["media_type"], - data=image_chunk["data"], - ), + _anthropic_content_element = ( + _anthropic_content_element_factory(image_chunk) ) - _content_element = add_cache_control_to_content( anthropic_content_element=_anthropic_content_element, orignal_content_element=dict(m), diff --git a/litellm/model_prices_and_context_window_backup.json b/litellm/model_prices_and_context_window_backup.json index e8aeac2cb..48b25523e 100644 --- a/litellm/model_prices_and_context_window_backup.json +++ b/litellm/model_prices_and_context_window_backup.json @@ -1898,7 +1898,8 @@ "supports_function_calling": true, "tool_use_system_prompt_tokens": 264, "supports_assistant_prefill": true, - "supports_prompt_caching": true + "supports_prompt_caching": true, + "supports_pdf_input": true }, "claude-3-opus-20240229": { "max_tokens": 4096, diff --git a/litellm/proxy/_new_secret_config.yaml b/litellm/proxy/_new_secret_config.yaml index c44a46a67..cd723275b 100644 --- a/litellm/proxy/_new_secret_config.yaml +++ b/litellm/proxy/_new_secret_config.yaml @@ -1,63 +1,7 @@ model_list: - - model_name: claude-3-5-sonnet-20240620 + - model_name: "*" litellm_params: - model: claude-3-5-sonnet-20240620 - api_key: os.environ/ANTHROPIC_API_KEY - - model_name: claude-3-5-sonnet-aihubmix - litellm_params: - model: openai/claude-3-5-sonnet-20240620 - input_cost_per_token: 0.000003 # 3$/M - output_cost_per_token: 0.000015 # 15$/M - api_base: "https://exampleopenaiendpoint-production.up.railway.app" - api_key: my-fake-key - - model_name: fake-openai-endpoint-2 - litellm_params: - model: openai/my-fake-model - api_key: my-fake-key - api_base: https://exampleopenaiendpoint-production.up.railway.app/ - stream_timeout: 0.001 - timeout: 1 - rpm: 1 - - model_name: fake-openai-endpoint - litellm_params: - model: openai/my-fake-model - api_key: my-fake-key - api_base: https://exampleopenaiendpoint-production.up.railway.app/ - ## bedrock chat completions - - model_name: "*anthropic.claude*" - litellm_params: - model: bedrock/*anthropic.claude* - aws_access_key_id: os.environ/BEDROCK_AWS_ACCESS_KEY_ID - aws_secret_access_key: os.environ/BEDROCK_AWS_SECRET_ACCESS_KEY - aws_region_name: os.environ/AWS_REGION_NAME - guardrailConfig: - "guardrailIdentifier": "h4dsqwhp6j66" - "guardrailVersion": "2" - "trace": "enabled" - -## bedrock embeddings - - model_name: "*amazon.titan-embed-*" - litellm_params: - model: bedrock/amazon.titan-embed-* - aws_access_key_id: os.environ/BEDROCK_AWS_ACCESS_KEY_ID - aws_secret_access_key: os.environ/BEDROCK_AWS_SECRET_ACCESS_KEY - aws_region_name: os.environ/AWS_REGION_NAME - - model_name: "*cohere.embed-*" - litellm_params: - model: bedrock/cohere.embed-* - aws_access_key_id: os.environ/BEDROCK_AWS_ACCESS_KEY_ID - aws_secret_access_key: os.environ/BEDROCK_AWS_SECRET_ACCESS_KEY - aws_region_name: os.environ/AWS_REGION_NAME - - - model_name: gpt-4 - litellm_params: - model: azure/chatgpt-v-2 - api_base: https://openai-gpt-4-test-v-1.openai.azure.com/ - api_version: "2023-05-15" - api_key: os.environ/AZURE_API_KEY # The `os.environ/` prefix tells litellm to read this from the env. See https://docs.litellm.ai/docs/simple_proxy#load-api-keys-from-vault - rpm: 480 - timeout: 300 - stream_timeout: 60 + model: "*" litellm_settings: fallbacks: [{ "claude-3-5-sonnet-20240620": ["claude-3-5-sonnet-aihubmix"] }] diff --git a/litellm/proxy/auth/user_api_key_auth.py b/litellm/proxy/auth/user_api_key_auth.py index ff1acc3c9..6032a72af 100644 --- a/litellm/proxy/auth/user_api_key_auth.py +++ b/litellm/proxy/auth/user_api_key_auth.py @@ -1236,7 +1236,6 @@ def _return_user_api_key_auth_obj( start_time: datetime, user_role: Optional[LitellmUserRoles] = None, ) -> UserAPIKeyAuth: - traceback.print_stack() end_time = datetime.now() user_api_key_service_logger_obj.service_success_hook( service=ServiceTypes.AUTH, diff --git a/litellm/types/llms/anthropic.py b/litellm/types/llms/anthropic.py index bb65a372d..b0a3780b8 100644 --- a/litellm/types/llms/anthropic.py +++ b/litellm/types/llms/anthropic.py @@ -74,7 +74,7 @@ class AnthopicMessagesAssistantMessageParam(TypedDict, total=False): """ -class AnthropicImageParamSource(TypedDict): +class AnthropicContentParamSource(TypedDict): type: Literal["base64"] media_type: str data: str @@ -82,7 +82,13 @@ class AnthropicImageParamSource(TypedDict): class AnthropicMessagesImageParam(TypedDict, total=False): type: Required[Literal["image"]] - source: Required[AnthropicImageParamSource] + source: Required[AnthropicContentParamSource] + cache_control: Optional[Union[dict, ChatCompletionCachedContent]] + + +class AnthropicMessagesDocumentParam(TypedDict, total=False): + type: Required[Literal["document"]] + source: Required[AnthropicContentParamSource] cache_control: Optional[Union[dict, ChatCompletionCachedContent]] @@ -108,6 +114,7 @@ AnthropicMessagesUserMessageValues = Union[ AnthropicMessagesTextParam, AnthropicMessagesImageParam, AnthropicMessagesToolResultParam, + AnthropicMessagesDocumentParam, ] diff --git a/litellm/types/utils.py b/litellm/types/utils.py index c0a9764e8..a2b62f9cc 100644 --- a/litellm/types/utils.py +++ b/litellm/types/utils.py @@ -1322,11 +1322,6 @@ class TranscriptionResponse(OpenAIObject): class GenericImageParsingChunk(TypedDict): - # { - # "type": "base64", - # "media_type": f"image/{image_format}", - # "data": base64_data, - # } type: str media_type: str data: str diff --git a/litellm/utils.py b/litellm/utils.py index d07d86f7d..b10c94859 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -1835,6 +1835,13 @@ def supports_audio_input(model: str, custom_llm_provider: Optional[str] = None) ) +def supports_pdf_input(model: str, custom_llm_provider: Optional[str] = None) -> bool: + """Check if a given model supports pdf input in a chat completion call""" + return _supports_factory( + model=model, custom_llm_provider=custom_llm_provider, key="supports_pdf_input" + ) + + def supports_audio_output( model: str, custom_llm_provider: Optional[str] = None ) -> bool: @@ -5420,2121 +5427,6 @@ def register_prompt_template( return litellm.custom_prompt_dict -####### DEPRECATED ################ - - -def get_all_keys(llm_provider=None): - try: - global last_fetched_at_keys - # if user is using hosted product -> instantiate their env with their hosted api keys - refresh every 5 minutes - print_verbose(f"Reaches get all keys, llm_provider: {llm_provider}") - user_email = ( - os.getenv("LITELLM_EMAIL") - or litellm.email - or litellm.token - or os.getenv("LITELLM_TOKEN") - ) - if user_email: - time_delta = 0 - if last_fetched_at_keys is not None: - current_time = time.time() - time_delta = current_time - last_fetched_at_keys - if ( - time_delta > 300 or last_fetched_at_keys is None or llm_provider - ): # if the llm provider is passed in , assume this happening due to an AuthError for that provider - # make the api call - last_fetched_at = time.time() - print_verbose(f"last_fetched_at: {last_fetched_at}") - response = requests.post( - url="http://api.litellm.ai/get_all_keys", - headers={"content-type": "application/json"}, - data=json.dumps({"user_email": user_email}), - ) - print_verbose(f"get model key response: {response.text}") - data = response.json() - # update model list - for key, value in data[ - "model_keys" - ].items(): # follows the LITELLM API KEY format - _API_KEY - e.g. HUGGINGFACE_API_KEY - os.environ[key] = value - # set model alias map - for model_alias, value in data["model_alias_map"].items(): - litellm.model_alias_map[model_alias] = value - return "it worked!" - return None - return None - except Exception: - print_verbose( - f"[Non-Blocking Error] get_all_keys error - {traceback.format_exc()}" - ) - pass - - -def get_model_list(): - global last_fetched_at, print_verbose - try: - # if user is using hosted product -> get their updated model list - user_email = ( - os.getenv("LITELLM_EMAIL") - or litellm.email - or litellm.token - or os.getenv("LITELLM_TOKEN") - ) - if user_email: - # make the api call - last_fetched_at = time.time() - print_verbose(f"last_fetched_at: {last_fetched_at}") - response = requests.post( - url="http://api.litellm.ai/get_model_list", - headers={"content-type": "application/json"}, - data=json.dumps({"user_email": user_email}), - ) - print_verbose(f"get_model_list response: {response.text}") - data = response.json() - # update model list - model_list = data["model_list"] - # # check if all model providers are in environment - # model_providers = data["model_providers"] - # missing_llm_provider = None - # for item in model_providers: - # if f"{item.upper()}_API_KEY" not in os.environ: - # missing_llm_provider = item - # break - # # update environment - if required - # threading.Thread(target=get_all_keys, args=(missing_llm_provider)).start() - return model_list - return [] # return empty list by default - except Exception: - print_verbose( - f"[Non-Blocking Error] get_model_list error - {traceback.format_exc()}" - ) - - -######## Streaming Class ############################ -# wraps the completion stream to return the correct format for the model -# replicate/anthropic/cohere - -# class CustomStreamWrapper: -# def __init__( -# self, -# completion_stream, -# model, -# logging_obj: Any, -# custom_llm_provider: Optional[str] = None, -# stream_options=None, -# make_call: Optional[Callable] = None, -# _response_headers: Optional[dict] = None, -# ): -# self.model = model -# self.make_call = make_call -# self.custom_llm_provider = custom_llm_provider -# self.logging_obj: LiteLLMLoggingObject = logging_obj -# self.completion_stream = completion_stream -# self.sent_first_chunk = False -# self.sent_last_chunk = False -# self.system_fingerprint: Optional[str] = None -# self.received_finish_reason: Optional[str] = None -# self.special_tokens = [ -# "<|assistant|>", -# "<|system|>", -# "<|user|>", -# "", -# "", -# "<|im_end|>", -# "<|im_start|>", -# ] -# self.holding_chunk = "" -# self.complete_response = "" -# self.response_uptil_now = "" -# _model_info = ( -# self.logging_obj.model_call_details.get("litellm_params", {}).get( -# "model_info", {} -# ) -# or {} -# ) -# self._hidden_params = { -# "model_id": (_model_info.get("id", None)), -# } # returned as x-litellm-model-id response header in proxy - -# self._hidden_params["additional_headers"] = process_response_headers( -# _response_headers or {} -# ) # GUARANTEE OPENAI HEADERS IN RESPONSE - -# self._response_headers = _response_headers -# self.response_id = None -# self.logging_loop = None -# self.rules = Rules() -# self.stream_options = stream_options or getattr( -# logging_obj, "stream_options", None -# ) -# self.messages = getattr(logging_obj, "messages", None) -# self.sent_stream_usage = False -# self.send_stream_usage = ( -# True if self.check_send_stream_usage(self.stream_options) else False -# ) -# self.tool_call = False -# self.chunks: List = ( -# [] -# ) # keep track of the returned chunks - used for calculating the input/output tokens for stream options -# self.is_function_call = self.check_is_function_call(logging_obj=logging_obj) - -# def __iter__(self): -# return self - -# def __aiter__(self): -# return self - -# def check_send_stream_usage(self, stream_options: Optional[dict]): -# return ( -# stream_options is not None -# and stream_options.get("include_usage", False) is True -# ) - -# def check_is_function_call(self, logging_obj) -> bool: -# if hasattr(logging_obj, "optional_params") and isinstance( -# logging_obj.optional_params, dict -# ): -# if ( -# "litellm_param_is_function_call" in logging_obj.optional_params -# and logging_obj.optional_params["litellm_param_is_function_call"] -# is True -# ): -# return True - -# return False - -# def process_chunk(self, chunk: str): -# """ -# NLP Cloud streaming returns the entire response, for each chunk. Process this, to only return the delta. -# """ -# try: -# chunk = chunk.strip() -# self.complete_response = self.complete_response.strip() - -# if chunk.startswith(self.complete_response): -# # Remove last_sent_chunk only if it appears at the start of the new chunk -# chunk = chunk[len(self.complete_response) :] - -# self.complete_response += chunk -# return chunk -# except Exception as e: -# raise e - -# def safety_checker(self) -> None: -# """ -# Fixes - https://github.com/BerriAI/litellm/issues/5158 - -# if the model enters a loop and starts repeating the same chunk again, break out of loop and raise an internalservererror - allows for retries. - -# Raises - InternalServerError, if LLM enters infinite loop while streaming -# """ -# if len(self.chunks) >= litellm.REPEATED_STREAMING_CHUNK_LIMIT: -# # Get the last n chunks -# last_chunks = self.chunks[-litellm.REPEATED_STREAMING_CHUNK_LIMIT :] - -# # Extract the relevant content from the chunks -# last_contents = [chunk.choices[0].delta.content for chunk in last_chunks] - -# # Check if all extracted contents are identical -# if all(content == last_contents[0] for content in last_contents): -# if ( -# last_contents[0] is not None -# and isinstance(last_contents[0], str) -# and len(last_contents[0]) > 2 -# ): # ignore empty content - https://github.com/BerriAI/litellm/issues/5158#issuecomment-2287156946 -# # All last n chunks are identical -# raise litellm.InternalServerError( -# message="The model is repeating the same chunk = {}.".format( -# last_contents[0] -# ), -# model="", -# llm_provider="", -# ) - -# def check_special_tokens(self, chunk: str, finish_reason: Optional[str]): -# """ -# Output parse / special tokens for sagemaker + hf streaming. -# """ -# hold = False -# if ( -# self.custom_llm_provider != "huggingface" -# and self.custom_llm_provider != "sagemaker" -# ): -# return hold, chunk - -# if finish_reason: -# for token in self.special_tokens: -# if token in chunk: -# chunk = chunk.replace(token, "") -# return hold, chunk - -# if self.sent_first_chunk is True: -# return hold, chunk - -# curr_chunk = self.holding_chunk + chunk -# curr_chunk = curr_chunk.strip() - -# for token in self.special_tokens: -# if len(curr_chunk) < len(token) and curr_chunk in token: -# hold = True -# self.holding_chunk = curr_chunk -# elif len(curr_chunk) >= len(token): -# if token in curr_chunk: -# self.holding_chunk = curr_chunk.replace(token, "") -# hold = True -# else: -# pass - -# if hold is False: # reset -# self.holding_chunk = "" -# return hold, curr_chunk - -# def handle_anthropic_text_chunk(self, chunk): -# """ -# For old anthropic models - claude-1, claude-2. - -# Claude-3 is handled from within Anthropic.py VIA ModelResponseIterator() -# """ -# str_line = chunk -# if isinstance(chunk, bytes): # Handle binary data -# str_line = chunk.decode("utf-8") # Convert bytes to string -# text = "" -# is_finished = False -# finish_reason = None -# if str_line.startswith("data:"): -# data_json = json.loads(str_line[5:]) -# type_chunk = data_json.get("type", None) -# if type_chunk == "completion": -# text = data_json.get("completion") -# finish_reason = data_json.get("stop_reason") -# if finish_reason is not None: -# is_finished = True -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# elif "error" in str_line: -# raise ValueError(f"Unable to parse response. Original response: {str_line}") -# else: -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } - -# def handle_vertexai_anthropic_chunk(self, chunk): -# """ -# - MessageStartEvent(message=Message(id='msg_01LeRRgvX4gwkX3ryBVgtuYZ', content=[], model='claude-3-sonnet-20240229', role='assistant', stop_reason=None, stop_sequence=None, type='message', usage=Usage(input_tokens=8, output_tokens=1)), type='message_start'); custom_llm_provider: vertex_ai -# - ContentBlockStartEvent(content_block=ContentBlock(text='', type='text'), index=0, type='content_block_start'); custom_llm_provider: vertex_ai -# - ContentBlockDeltaEvent(delta=TextDelta(text='Hello', type='text_delta'), index=0, type='content_block_delta'); custom_llm_provider: vertex_ai -# """ -# text = "" -# prompt_tokens = None -# completion_tokens = None -# is_finished = False -# finish_reason = None -# type_chunk = getattr(chunk, "type", None) -# if type_chunk == "message_start": -# message = getattr(chunk, "message", None) -# text = "" # lets us return a chunk with usage to user -# _usage = getattr(message, "usage", None) -# if _usage is not None: -# prompt_tokens = getattr(_usage, "input_tokens", None) -# completion_tokens = getattr(_usage, "output_tokens", None) -# elif type_chunk == "content_block_delta": -# """ -# Anthropic content chunk -# chunk = {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Hello'}} -# """ -# delta = getattr(chunk, "delta", None) -# if delta is not None: -# text = getattr(delta, "text", "") -# else: -# text = "" -# elif type_chunk == "message_delta": -# """ -# Anthropic -# chunk = {'type': 'message_delta', 'delta': {'stop_reason': 'max_tokens', 'stop_sequence': None}, 'usage': {'output_tokens': 10}} -# """ -# # TODO - get usage from this chunk, set in response -# delta = getattr(chunk, "delta", None) -# if delta is not None: -# finish_reason = getattr(delta, "stop_reason", "stop") -# is_finished = True -# _usage = getattr(chunk, "usage", None) -# if _usage is not None: -# prompt_tokens = getattr(_usage, "input_tokens", None) -# completion_tokens = getattr(_usage, "output_tokens", None) - -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# "prompt_tokens": prompt_tokens, -# "completion_tokens": completion_tokens, -# } - -# def handle_predibase_chunk(self, chunk): -# try: -# if not isinstance(chunk, str): -# chunk = chunk.decode( -# "utf-8" -# ) # DO NOT REMOVE this: This is required for HF inference API + Streaming -# text = "" -# is_finished = False -# finish_reason = "" -# print_verbose(f"chunk: {chunk}") -# if chunk.startswith("data:"): -# data_json = json.loads(chunk[5:]) -# print_verbose(f"data json: {data_json}") -# if "token" in data_json and "text" in data_json["token"]: -# text = data_json["token"]["text"] -# if data_json.get("details", False) and data_json["details"].get( -# "finish_reason", False -# ): -# is_finished = True -# finish_reason = data_json["details"]["finish_reason"] -# elif data_json.get( -# "generated_text", False -# ): # if full generated text exists, then stream is complete -# text = "" # don't return the final bos token -# is_finished = True -# finish_reason = "stop" -# elif data_json.get("error", False): -# raise Exception(data_json.get("error")) -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# elif "error" in chunk: -# raise ValueError(chunk) -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# except Exception as e: -# raise e - -# def handle_huggingface_chunk(self, chunk): -# try: -# if not isinstance(chunk, str): -# chunk = chunk.decode( -# "utf-8" -# ) # DO NOT REMOVE this: This is required for HF inference API + Streaming -# text = "" -# is_finished = False -# finish_reason = "" -# print_verbose(f"chunk: {chunk}") -# if chunk.startswith("data:"): -# data_json = json.loads(chunk[5:]) -# print_verbose(f"data json: {data_json}") -# if "token" in data_json and "text" in data_json["token"]: -# text = data_json["token"]["text"] -# if data_json.get("details", False) and data_json["details"].get( -# "finish_reason", False -# ): -# is_finished = True -# finish_reason = data_json["details"]["finish_reason"] -# elif data_json.get( -# "generated_text", False -# ): # if full generated text exists, then stream is complete -# text = "" # don't return the final bos token -# is_finished = True -# finish_reason = "stop" -# elif data_json.get("error", False): -# raise Exception(data_json.get("error")) -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# elif "error" in chunk: -# raise ValueError(chunk) -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# except Exception as e: -# raise e - -# def handle_ai21_chunk(self, chunk): # fake streaming -# chunk = chunk.decode("utf-8") -# data_json = json.loads(chunk) -# try: -# text = data_json["completions"][0]["data"]["text"] -# is_finished = True -# finish_reason = "stop" -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# except Exception: -# raise ValueError(f"Unable to parse response. Original response: {chunk}") - -# def handle_maritalk_chunk(self, chunk): # fake streaming -# chunk = chunk.decode("utf-8") -# data_json = json.loads(chunk) -# try: -# text = data_json["answer"] -# is_finished = True -# finish_reason = "stop" -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# except Exception: -# raise ValueError(f"Unable to parse response. Original response: {chunk}") - -# def handle_nlp_cloud_chunk(self, chunk): -# text = "" -# is_finished = False -# finish_reason = "" -# try: -# if "dolphin" in self.model: -# chunk = self.process_chunk(chunk=chunk) -# else: -# data_json = json.loads(chunk) -# chunk = data_json["generated_text"] -# text = chunk -# if "[DONE]" in text: -# text = text.replace("[DONE]", "") -# is_finished = True -# finish_reason = "stop" -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# except Exception: -# raise ValueError(f"Unable to parse response. Original response: {chunk}") - -# def handle_aleph_alpha_chunk(self, chunk): -# chunk = chunk.decode("utf-8") -# data_json = json.loads(chunk) -# try: -# text = data_json["completions"][0]["completion"] -# is_finished = True -# finish_reason = "stop" -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# except Exception: -# raise ValueError(f"Unable to parse response. Original response: {chunk}") - -# def handle_cohere_chunk(self, chunk): -# chunk = chunk.decode("utf-8") -# data_json = json.loads(chunk) -# try: -# text = "" -# is_finished = False -# finish_reason = "" -# index: Optional[int] = None -# if "index" in data_json: -# index = data_json.get("index") -# if "text" in data_json: -# text = data_json["text"] -# elif "is_finished" in data_json: -# is_finished = data_json["is_finished"] -# finish_reason = data_json["finish_reason"] -# else: -# raise Exception(data_json) -# return { -# "index": index, -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# except Exception: -# raise ValueError(f"Unable to parse response. Original response: {chunk}") - -# def handle_cohere_chat_chunk(self, chunk): -# chunk = chunk.decode("utf-8") -# data_json = json.loads(chunk) -# print_verbose(f"chunk: {chunk}") -# try: -# text = "" -# is_finished = False -# finish_reason = "" -# if "text" in data_json: -# text = data_json["text"] -# elif "is_finished" in data_json and data_json["is_finished"] is True: -# is_finished = data_json["is_finished"] -# finish_reason = data_json["finish_reason"] -# else: -# return -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# except Exception: -# raise ValueError(f"Unable to parse response. Original response: {chunk}") - -# def handle_azure_chunk(self, chunk): -# is_finished = False -# finish_reason = "" -# text = "" -# print_verbose(f"chunk: {chunk}") -# if "data: [DONE]" in chunk: -# text = "" -# is_finished = True -# finish_reason = "stop" -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# elif chunk.startswith("data:"): -# data_json = json.loads(chunk[5:]) # chunk.startswith("data:"): -# try: -# if len(data_json["choices"]) > 0: -# delta = data_json["choices"][0]["delta"] -# text = "" if delta is None else delta.get("content", "") -# if data_json["choices"][0].get("finish_reason", None): -# is_finished = True -# finish_reason = data_json["choices"][0]["finish_reason"] -# print_verbose( -# f"text: {text}; is_finished: {is_finished}; finish_reason: {finish_reason}" -# ) -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# except Exception: -# raise ValueError( -# f"Unable to parse response. Original response: {chunk}" -# ) -# elif "error" in chunk: -# raise ValueError(f"Unable to parse response. Original response: {chunk}") -# else: -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } - -# def handle_replicate_chunk(self, chunk): -# try: -# text = "" -# is_finished = False -# finish_reason = "" -# if "output" in chunk: -# text = chunk["output"] -# if "status" in chunk: -# if chunk["status"] == "succeeded": -# is_finished = True -# finish_reason = "stop" -# elif chunk.get("error", None): -# raise Exception(chunk["error"]) -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# except Exception: -# raise ValueError(f"Unable to parse response. Original response: {chunk}") - -# def handle_openai_chat_completion_chunk(self, chunk): -# try: -# print_verbose(f"\nRaw OpenAI Chunk\n{chunk}\n") -# str_line = chunk -# text = "" -# is_finished = False -# finish_reason = None -# logprobs = None -# usage = None -# if str_line and str_line.choices and len(str_line.choices) > 0: -# if ( -# str_line.choices[0].delta is not None -# and str_line.choices[0].delta.content is not None -# ): -# text = str_line.choices[0].delta.content -# else: # function/tool calling chunk - when content is None. in this case we just return the original chunk from openai -# pass -# if str_line.choices[0].finish_reason: -# is_finished = True -# finish_reason = str_line.choices[0].finish_reason - -# # checking for logprobs -# if ( -# hasattr(str_line.choices[0], "logprobs") -# and str_line.choices[0].logprobs is not None -# ): -# logprobs = str_line.choices[0].logprobs -# else: -# logprobs = None - -# usage = getattr(str_line, "usage", None) - -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# "logprobs": logprobs, -# "original_chunk": str_line, -# "usage": usage, -# } -# except Exception as e: -# raise e - -# def handle_azure_text_completion_chunk(self, chunk): -# try: -# print_verbose(f"\nRaw OpenAI Chunk\n{chunk}\n") -# text = "" -# is_finished = False -# finish_reason = None -# choices = getattr(chunk, "choices", []) -# if len(choices) > 0: -# text = choices[0].text -# if choices[0].finish_reason is not None: -# is_finished = True -# finish_reason = choices[0].finish_reason -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } - -# except Exception as e: -# raise e - -# def handle_openai_text_completion_chunk(self, chunk): -# try: -# print_verbose(f"\nRaw OpenAI Chunk\n{chunk}\n") -# text = "" -# is_finished = False -# finish_reason = None -# usage = None -# choices = getattr(chunk, "choices", []) -# if len(choices) > 0: -# text = choices[0].text -# if choices[0].finish_reason is not None: -# is_finished = True -# finish_reason = choices[0].finish_reason -# usage = getattr(chunk, "usage", None) -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# "usage": usage, -# } - -# except Exception as e: -# raise e - -# def handle_baseten_chunk(self, chunk): -# try: -# chunk = chunk.decode("utf-8") -# if len(chunk) > 0: -# if chunk.startswith("data:"): -# data_json = json.loads(chunk[5:]) -# if "token" in data_json and "text" in data_json["token"]: -# return data_json["token"]["text"] -# else: -# return "" -# data_json = json.loads(chunk) -# if "model_output" in data_json: -# if ( -# isinstance(data_json["model_output"], dict) -# and "data" in data_json["model_output"] -# and isinstance(data_json["model_output"]["data"], list) -# ): -# return data_json["model_output"]["data"][0] -# elif isinstance(data_json["model_output"], str): -# return data_json["model_output"] -# elif "completion" in data_json and isinstance( -# data_json["completion"], str -# ): -# return data_json["completion"] -# else: -# raise ValueError( -# f"Unable to parse response. Original response: {chunk}" -# ) -# else: -# return "" -# else: -# return "" -# except Exception as e: -# verbose_logger.exception( -# "litellm.CustomStreamWrapper.handle_baseten_chunk(): Exception occured - {}".format( -# str(e) -# ) -# ) -# return "" - -# def handle_cloudlfare_stream(self, chunk): -# try: -# print_verbose(f"\nRaw OpenAI Chunk\n{chunk}\n") -# chunk = chunk.decode("utf-8") -# str_line = chunk -# text = "" -# is_finished = False -# finish_reason = None - -# if "[DONE]" in chunk: -# return {"text": text, "is_finished": True, "finish_reason": "stop"} -# elif str_line.startswith("data:"): -# data_json = json.loads(str_line[5:]) -# print_verbose(f"delta content: {data_json}") -# text = data_json["response"] -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# else: -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } - -# except Exception as e: -# raise e - -# def handle_ollama_stream(self, chunk): -# try: -# if isinstance(chunk, dict): -# json_chunk = chunk -# else: -# json_chunk = json.loads(chunk) -# if "error" in json_chunk: -# raise Exception(f"Ollama Error - {json_chunk}") - -# text = "" -# is_finished = False -# finish_reason = None -# if json_chunk["done"] is True: -# text = "" -# is_finished = True -# finish_reason = "stop" -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# elif json_chunk["response"]: -# print_verbose(f"delta content: {json_chunk}") -# text = json_chunk["response"] -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# else: -# raise Exception(f"Ollama Error - {json_chunk}") -# except Exception as e: -# raise e - -# def handle_ollama_chat_stream(self, chunk): -# # for ollama_chat/ provider -# try: -# if isinstance(chunk, dict): -# json_chunk = chunk -# else: -# json_chunk = json.loads(chunk) -# if "error" in json_chunk: -# raise Exception(f"Ollama Error - {json_chunk}") - -# text = "" -# is_finished = False -# finish_reason = None -# if json_chunk["done"] is True: -# text = "" -# is_finished = True -# finish_reason = "stop" -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# elif "message" in json_chunk: -# print_verbose(f"delta content: {json_chunk}") -# text = json_chunk["message"]["content"] -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# } -# else: -# raise Exception(f"Ollama Error - {json_chunk}") -# except Exception as e: -# raise e - -# def handle_watsonx_stream(self, chunk): -# try: -# if isinstance(chunk, dict): -# parsed_response = chunk -# elif isinstance(chunk, (str, bytes)): -# if isinstance(chunk, bytes): -# chunk = chunk.decode("utf-8") -# if "generated_text" in chunk: -# response = chunk.replace("data: ", "").strip() -# parsed_response = json.loads(response) -# else: -# return { -# "text": "", -# "is_finished": False, -# "prompt_tokens": 0, -# "completion_tokens": 0, -# } -# else: -# print_verbose(f"chunk: {chunk} (Type: {type(chunk)})") -# raise ValueError( -# f"Unable to parse response. Original response: {chunk}" -# ) -# results = parsed_response.get("results", []) -# if len(results) > 0: -# text = results[0].get("generated_text", "") -# finish_reason = results[0].get("stop_reason") -# is_finished = finish_reason != "not_finished" -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# "prompt_tokens": results[0].get("input_token_count", 0), -# "completion_tokens": results[0].get("generated_token_count", 0), -# } -# return {"text": "", "is_finished": False} -# except Exception as e: -# raise e - -# def handle_triton_stream(self, chunk): -# try: -# if isinstance(chunk, dict): -# parsed_response = chunk -# elif isinstance(chunk, (str, bytes)): -# if isinstance(chunk, bytes): -# chunk = chunk.decode("utf-8") -# if "text_output" in chunk: -# response = chunk.replace("data: ", "").strip() -# parsed_response = json.loads(response) -# else: -# return { -# "text": "", -# "is_finished": False, -# "prompt_tokens": 0, -# "completion_tokens": 0, -# } -# else: -# print_verbose(f"chunk: {chunk} (Type: {type(chunk)})") -# raise ValueError( -# f"Unable to parse response. Original response: {chunk}" -# ) -# text = parsed_response.get("text_output", "") -# finish_reason = parsed_response.get("stop_reason") -# is_finished = parsed_response.get("is_finished", False) -# return { -# "text": text, -# "is_finished": is_finished, -# "finish_reason": finish_reason, -# "prompt_tokens": parsed_response.get("input_token_count", 0), -# "completion_tokens": parsed_response.get("generated_token_count", 0), -# } -# return {"text": "", "is_finished": False} -# except Exception as e: -# raise e - -# def handle_clarifai_completion_chunk(self, chunk): -# try: -# if isinstance(chunk, dict): -# parsed_response = chunk -# elif isinstance(chunk, (str, bytes)): -# if isinstance(chunk, bytes): -# parsed_response = chunk.decode("utf-8") -# else: -# parsed_response = chunk -# else: -# raise ValueError("Unable to parse streaming chunk") -# if isinstance(parsed_response, dict): -# data_json = parsed_response -# else: -# data_json = json.loads(parsed_response) -# text = ( -# data_json.get("outputs", "")[0] -# .get("data", "") -# .get("text", "") -# .get("raw", "") -# ) -# len( -# encoding.encode( -# data_json.get("outputs", "")[0] -# .get("input", "") -# .get("data", "") -# .get("text", "") -# .get("raw", "") -# ) -# ) -# len(encoding.encode(text)) -# return { -# "text": text, -# "is_finished": True, -# } -# except Exception as e: -# verbose_logger.exception( -# "litellm.CustomStreamWrapper.handle_clarifai_chunk(): Exception occured - {}".format( -# str(e) -# ) -# ) -# return "" - -# def model_response_creator( -# self, chunk: Optional[dict] = None, hidden_params: Optional[dict] = None -# ): -# _model = self.model -# _received_llm_provider = self.custom_llm_provider -# _logging_obj_llm_provider = self.logging_obj.model_call_details.get("custom_llm_provider", None) # type: ignore -# if ( -# _received_llm_provider == "openai" -# and _received_llm_provider != _logging_obj_llm_provider -# ): -# _model = "{}/{}".format(_logging_obj_llm_provider, _model) -# if chunk is None: -# chunk = {} -# else: -# # pop model keyword -# chunk.pop("model", None) - -# model_response = ModelResponse( -# stream=True, model=_model, stream_options=self.stream_options, **chunk -# ) -# if self.response_id is not None: -# model_response.id = self.response_id -# else: -# self.response_id = model_response.id # type: ignore -# if self.system_fingerprint is not None: -# model_response.system_fingerprint = self.system_fingerprint -# if hidden_params is not None: -# model_response._hidden_params = hidden_params -# model_response._hidden_params["custom_llm_provider"] = _logging_obj_llm_provider -# model_response._hidden_params["created_at"] = time.time() -# model_response._hidden_params = { -# **model_response._hidden_params, -# **self._hidden_params, -# } - -# if ( -# len(model_response.choices) > 0 -# and getattr(model_response.choices[0], "delta") is not None -# ): -# # do nothing, if object instantiated -# pass -# else: -# model_response.choices = [StreamingChoices(finish_reason=None)] -# return model_response - -# def is_delta_empty(self, delta: Delta) -> bool: -# is_empty = True -# if delta.content is not None: -# is_empty = False -# elif delta.tool_calls is not None: -# is_empty = False -# elif delta.function_call is not None: -# is_empty = False -# return is_empty - -# def return_processed_chunk_logic( # noqa -# self, -# completion_obj: dict, -# model_response: ModelResponseStream, -# response_obj: dict, -# ): - -# print_verbose( -# f"completion_obj: {completion_obj}, model_response.choices[0]: {model_response.choices[0]}, response_obj: {response_obj}" -# ) -# if ( -# "content" in completion_obj -# and ( -# isinstance(completion_obj["content"], str) -# and len(completion_obj["content"]) > 0 -# ) -# or ( -# "tool_calls" in completion_obj -# and completion_obj["tool_calls"] is not None -# and len(completion_obj["tool_calls"]) > 0 -# ) -# or ( -# "function_call" in completion_obj -# and completion_obj["function_call"] is not None -# ) -# ): # cannot set content of an OpenAI Object to be an empty string -# self.safety_checker() -# hold, model_response_str = self.check_special_tokens( -# chunk=completion_obj["content"], -# finish_reason=model_response.choices[0].finish_reason, -# ) # filter out bos/eos tokens from openai-compatible hf endpoints -# print_verbose(f"hold - {hold}, model_response_str - {model_response_str}") -# if hold is False: -# ## check if openai/azure chunk -# original_chunk = response_obj.get("original_chunk", None) -# if original_chunk: -# model_response.id = original_chunk.id -# self.response_id = original_chunk.id -# if len(original_chunk.choices) > 0: -# choices = [] -# for choice in original_chunk.choices: -# try: -# if isinstance(choice, BaseModel): -# choice_json = choice.model_dump() -# choice_json.pop( -# "finish_reason", None -# ) # for mistral etc. which return a value in their last chunk (not-openai compatible). -# print_verbose(f"choice_json: {choice_json}") -# choices.append(StreamingChoices(**choice_json)) -# except Exception: -# choices.append(StreamingChoices()) -# print_verbose(f"choices in streaming: {choices}") -# setattr(model_response, "choices", choices) -# else: -# return -# model_response.system_fingerprint = ( -# original_chunk.system_fingerprint -# ) -# setattr( -# model_response, -# "citations", -# getattr(original_chunk, "citations", None), -# ) -# print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}") -# if self.sent_first_chunk is False: -# model_response.choices[0].delta["role"] = "assistant" -# self.sent_first_chunk = True -# elif self.sent_first_chunk is True and hasattr( -# model_response.choices[0].delta, "role" -# ): -# _initial_delta = model_response.choices[0].delta.model_dump() -# _initial_delta.pop("role", None) -# model_response.choices[0].delta = Delta(**_initial_delta) -# print_verbose( -# f"model_response.choices[0].delta: {model_response.choices[0].delta}" -# ) -# else: -# ## else -# completion_obj["content"] = model_response_str -# if self.sent_first_chunk is False: -# completion_obj["role"] = "assistant" -# self.sent_first_chunk = True - -# model_response.choices[0].delta = Delta(**completion_obj) -# _index: Optional[int] = completion_obj.get("index") -# if _index is not None: -# model_response.choices[0].index = _index -# print_verbose(f"returning model_response: {model_response}") -# return model_response -# else: -# return -# elif self.received_finish_reason is not None: -# if self.sent_last_chunk is True: -# # Bedrock returns the guardrail trace in the last chunk - we want to return this here -# if self.custom_llm_provider == "bedrock" and "trace" in model_response: -# return model_response - -# # Default - return StopIteration -# raise StopIteration -# # flush any remaining holding chunk -# if len(self.holding_chunk) > 0: -# if model_response.choices[0].delta.content is None: -# model_response.choices[0].delta.content = self.holding_chunk -# else: -# model_response.choices[0].delta.content = ( -# self.holding_chunk + model_response.choices[0].delta.content -# ) -# self.holding_chunk = "" -# # if delta is None -# _is_delta_empty = self.is_delta_empty(delta=model_response.choices[0].delta) - -# if _is_delta_empty: -# # get any function call arguments -# model_response.choices[0].finish_reason = map_finish_reason( -# finish_reason=self.received_finish_reason -# ) # ensure consistent output to openai - -# self.sent_last_chunk = True - -# return model_response -# elif ( -# model_response.choices[0].delta.tool_calls is not None -# or model_response.choices[0].delta.function_call is not None -# ): -# if self.sent_first_chunk is False: -# model_response.choices[0].delta["role"] = "assistant" -# self.sent_first_chunk = True -# return model_response -# elif ( -# len(model_response.choices) > 0 -# and hasattr(model_response.choices[0].delta, "audio") -# and model_response.choices[0].delta.audio is not None -# ): -# return model_response -# else: -# if hasattr(model_response, "usage"): -# self.chunks.append(model_response) -# return - -# def chunk_creator(self, chunk): # type: ignore # noqa: PLR0915 -# model_response = self.model_response_creator() -# response_obj: dict = {} -# try: -# # return this for all models -# completion_obj = {"content": ""} -# from litellm.litellm_core_utils.streaming_utils import ( -# generic_chunk_has_all_required_fields, -# ) -# from litellm.types.utils import GenericStreamingChunk as GChunk - -# if ( -# isinstance(chunk, dict) -# and generic_chunk_has_all_required_fields( -# chunk=chunk -# ) # check if chunk is a generic streaming chunk -# ) or ( -# self.custom_llm_provider -# and ( -# self.custom_llm_provider == "anthropic" -# or self.custom_llm_provider in litellm._custom_providers -# ) -# ): - -# if self.received_finish_reason is not None: -# if "provider_specific_fields" not in chunk: -# raise StopIteration -# anthropic_response_obj: GChunk = chunk -# completion_obj["content"] = anthropic_response_obj["text"] -# if anthropic_response_obj["is_finished"]: -# self.received_finish_reason = anthropic_response_obj[ -# "finish_reason" -# ] - -# if anthropic_response_obj["usage"] is not None: -# model_response.usage = litellm.Usage( -# **anthropic_response_obj["usage"] -# ) - -# if ( -# "tool_use" in anthropic_response_obj -# and anthropic_response_obj["tool_use"] is not None -# ): -# completion_obj["tool_calls"] = [anthropic_response_obj["tool_use"]] - -# if ( -# "provider_specific_fields" in anthropic_response_obj -# and anthropic_response_obj["provider_specific_fields"] is not None -# ): -# for key, value in anthropic_response_obj[ -# "provider_specific_fields" -# ].items(): -# setattr(model_response, key, value) - -# response_obj = anthropic_response_obj -# elif ( -# self.custom_llm_provider -# and self.custom_llm_provider == "anthropic_text" -# ): -# response_obj = self.handle_anthropic_text_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider and self.custom_llm_provider == "clarifai": -# response_obj = self.handle_clarifai_completion_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.model == "replicate" or self.custom_llm_provider == "replicate": -# response_obj = self.handle_replicate_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider and self.custom_llm_provider == "huggingface": -# response_obj = self.handle_huggingface_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider and self.custom_llm_provider == "predibase": -# response_obj = self.handle_predibase_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif ( -# self.custom_llm_provider and self.custom_llm_provider == "baseten" -# ): # baseten doesn't provide streaming -# completion_obj["content"] = self.handle_baseten_chunk(chunk) -# elif ( -# self.custom_llm_provider and self.custom_llm_provider == "ai21" -# ): # ai21 doesn't provide streaming -# response_obj = self.handle_ai21_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider and self.custom_llm_provider == "maritalk": -# response_obj = self.handle_maritalk_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider and self.custom_llm_provider == "vllm": -# completion_obj["content"] = chunk[0].outputs[0].text -# elif ( -# self.custom_llm_provider and self.custom_llm_provider == "aleph_alpha" -# ): # aleph alpha doesn't provide streaming -# response_obj = self.handle_aleph_alpha_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider == "nlp_cloud": -# try: -# response_obj = self.handle_nlp_cloud_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# except Exception as e: -# if self.received_finish_reason: -# raise e -# else: -# if self.sent_first_chunk is False: -# raise Exception("An unknown error occurred with the stream") -# self.received_finish_reason = "stop" -# elif self.custom_llm_provider == "vertex_ai": -# import proto # type: ignore - -# if self.model.startswith("claude-3"): -# response_obj = self.handle_vertexai_anthropic_chunk(chunk=chunk) -# if response_obj is None: -# return -# completion_obj["content"] = response_obj["text"] -# setattr(model_response, "usage", Usage()) -# if response_obj.get("prompt_tokens", None) is not None: -# model_response.usage.prompt_tokens = response_obj[ -# "prompt_tokens" -# ] -# if response_obj.get("completion_tokens", None) is not None: -# model_response.usage.completion_tokens = response_obj[ -# "completion_tokens" -# ] -# if hasattr(model_response.usage, "prompt_tokens"): -# model_response.usage.total_tokens = ( -# getattr(model_response.usage, "total_tokens", 0) -# + model_response.usage.prompt_tokens -# ) -# if hasattr(model_response.usage, "completion_tokens"): -# model_response.usage.total_tokens = ( -# getattr(model_response.usage, "total_tokens", 0) -# + model_response.usage.completion_tokens -# ) - -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif hasattr(chunk, "candidates") is True: -# try: -# try: -# completion_obj["content"] = chunk.text -# except Exception as e: -# if "Part has no text." in str(e): -# ## check for function calling -# function_call = ( -# chunk.candidates[0].content.parts[0].function_call -# ) - -# args_dict = {} - -# # Check if it's a RepeatedComposite instance -# for key, val in function_call.args.items(): -# if isinstance( -# val, -# proto.marshal.collections.repeated.RepeatedComposite, -# ): -# # If so, convert to list -# args_dict[key] = [v for v in val] -# else: -# args_dict[key] = val - -# try: -# args_str = json.dumps(args_dict) -# except Exception as e: -# raise e -# _delta_obj = litellm.utils.Delta( -# content=None, -# tool_calls=[ -# { -# "id": f"call_{str(uuid.uuid4())}", -# "function": { -# "arguments": args_str, -# "name": function_call.name, -# }, -# "type": "function", -# } -# ], -# ) -# _streaming_response = StreamingChoices(delta=_delta_obj) -# _model_response = ModelResponse(stream=True) -# _model_response.choices = [_streaming_response] -# response_obj = {"original_chunk": _model_response} -# else: -# raise e -# if ( -# hasattr(chunk.candidates[0], "finish_reason") -# and chunk.candidates[0].finish_reason.name -# != "FINISH_REASON_UNSPECIFIED" -# ): # every non-final chunk in vertex ai has this -# self.received_finish_reason = chunk.candidates[ -# 0 -# ].finish_reason.name -# except Exception: -# if chunk.candidates[0].finish_reason.name == "SAFETY": -# raise Exception( -# f"The response was blocked by VertexAI. {str(chunk)}" -# ) -# else: -# completion_obj["content"] = str(chunk) -# elif self.custom_llm_provider == "cohere": -# response_obj = self.handle_cohere_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider == "cohere_chat": -# response_obj = self.handle_cohere_chat_chunk(chunk) -# if response_obj is None: -# return -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] - -# elif self.custom_llm_provider == "petals": -# if len(self.completion_stream) == 0: -# if self.received_finish_reason is not None: -# raise StopIteration -# else: -# self.received_finish_reason = "stop" -# chunk_size = 30 -# new_chunk = self.completion_stream[:chunk_size] -# completion_obj["content"] = new_chunk -# self.completion_stream = self.completion_stream[chunk_size:] -# elif self.custom_llm_provider == "palm": -# # fake streaming -# response_obj = {} -# if len(self.completion_stream) == 0: -# if self.received_finish_reason is not None: -# raise StopIteration -# else: -# self.received_finish_reason = "stop" -# chunk_size = 30 -# new_chunk = self.completion_stream[:chunk_size] -# completion_obj["content"] = new_chunk -# self.completion_stream = self.completion_stream[chunk_size:] -# elif self.custom_llm_provider == "ollama": -# response_obj = self.handle_ollama_stream(chunk) -# completion_obj["content"] = response_obj["text"] -# print_verbose(f"completion obj content: {completion_obj['content']}") -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider == "ollama_chat": -# response_obj = self.handle_ollama_chat_stream(chunk) -# completion_obj["content"] = response_obj["text"] -# print_verbose(f"completion obj content: {completion_obj['content']}") -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider == "cloudflare": -# response_obj = self.handle_cloudlfare_stream(chunk) -# completion_obj["content"] = response_obj["text"] -# print_verbose(f"completion obj content: {completion_obj['content']}") -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider == "watsonx": -# response_obj = self.handle_watsonx_stream(chunk) -# completion_obj["content"] = response_obj["text"] -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider == "triton": -# response_obj = self.handle_triton_stream(chunk) -# completion_obj["content"] = response_obj["text"] -# print_verbose(f"completion obj content: {completion_obj['content']}") -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider == "text-completion-openai": -# response_obj = self.handle_openai_text_completion_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# print_verbose(f"completion obj content: {completion_obj['content']}") -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# if response_obj["usage"] is not None: -# model_response.usage = litellm.Usage( -# prompt_tokens=response_obj["usage"].prompt_tokens, -# completion_tokens=response_obj["usage"].completion_tokens, -# total_tokens=response_obj["usage"].total_tokens, -# ) -# elif self.custom_llm_provider == "text-completion-codestral": -# response_obj = litellm.MistralTextCompletionConfig()._chunk_parser( -# chunk -# ) -# completion_obj["content"] = response_obj["text"] -# print_verbose(f"completion obj content: {completion_obj['content']}") -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# if "usage" in response_obj is not None: -# model_response.usage = litellm.Usage( -# prompt_tokens=response_obj["usage"].prompt_tokens, -# completion_tokens=response_obj["usage"].completion_tokens, -# total_tokens=response_obj["usage"].total_tokens, -# ) -# elif self.custom_llm_provider == "azure_text": -# response_obj = self.handle_azure_text_completion_chunk(chunk) -# completion_obj["content"] = response_obj["text"] -# print_verbose(f"completion obj content: {completion_obj['content']}") -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# elif self.custom_llm_provider == "cached_response": -# response_obj = { -# "text": chunk.choices[0].delta.content, -# "is_finished": True, -# "finish_reason": chunk.choices[0].finish_reason, -# "original_chunk": chunk, -# "tool_calls": ( -# chunk.choices[0].delta.tool_calls -# if hasattr(chunk.choices[0].delta, "tool_calls") -# else None -# ), -# } - -# completion_obj["content"] = response_obj["text"] -# if response_obj["tool_calls"] is not None: -# completion_obj["tool_calls"] = response_obj["tool_calls"] -# print_verbose(f"completion obj content: {completion_obj['content']}") -# if hasattr(chunk, "id"): -# model_response.id = chunk.id -# self.response_id = chunk.id -# if hasattr(chunk, "system_fingerprint"): -# self.system_fingerprint = chunk.system_fingerprint -# if response_obj["is_finished"]: -# self.received_finish_reason = response_obj["finish_reason"] -# else: # openai / azure chat model -# if self.custom_llm_provider == "azure": -# if hasattr(chunk, "model"): -# # for azure, we need to pass the model from the orignal chunk -# self.model = chunk.model -# response_obj = self.handle_openai_chat_completion_chunk(chunk) -# if response_obj is None: -# return -# completion_obj["content"] = response_obj["text"] -# print_verbose(f"completion obj content: {completion_obj['content']}") -# if response_obj["is_finished"]: -# if response_obj["finish_reason"] == "error": -# raise Exception( -# "{} raised a streaming error - finish_reason: error, no content string given. Received Chunk={}".format( -# self.custom_llm_provider, response_obj -# ) -# ) -# self.received_finish_reason = response_obj["finish_reason"] -# if response_obj.get("original_chunk", None) is not None: -# if hasattr(response_obj["original_chunk"], "id"): -# model_response.id = response_obj["original_chunk"].id -# self.response_id = model_response.id -# if hasattr(response_obj["original_chunk"], "system_fingerprint"): -# model_response.system_fingerprint = response_obj[ -# "original_chunk" -# ].system_fingerprint -# self.system_fingerprint = response_obj[ -# "original_chunk" -# ].system_fingerprint -# if response_obj["logprobs"] is not None: -# model_response.choices[0].logprobs = response_obj["logprobs"] - -# if response_obj["usage"] is not None: -# if isinstance(response_obj["usage"], dict): -# model_response.usage = litellm.Usage( -# prompt_tokens=response_obj["usage"].get( -# "prompt_tokens", None -# ) -# or None, -# completion_tokens=response_obj["usage"].get( -# "completion_tokens", None -# ) -# or None, -# total_tokens=response_obj["usage"].get("total_tokens", None) -# or None, -# ) -# elif isinstance(response_obj["usage"], BaseModel): -# model_response.usage = litellm.Usage( -# **response_obj["usage"].model_dump() -# ) - -# model_response.model = self.model -# print_verbose( -# f"model_response finish reason 3: {self.received_finish_reason}; response_obj={response_obj}" -# ) -# ## FUNCTION CALL PARSING -# if ( -# response_obj is not None -# and response_obj.get("original_chunk", None) is not None -# ): # function / tool calling branch - only set for openai/azure compatible endpoints -# # enter this branch when no content has been passed in response -# original_chunk = response_obj.get("original_chunk", None) -# model_response.id = original_chunk.id -# self.response_id = original_chunk.id -# if original_chunk.choices and len(original_chunk.choices) > 0: -# delta = original_chunk.choices[0].delta -# if delta is not None and ( -# delta.function_call is not None or delta.tool_calls is not None -# ): -# try: -# model_response.system_fingerprint = ( -# original_chunk.system_fingerprint -# ) -# ## AZURE - check if arguments is not None -# if ( -# original_chunk.choices[0].delta.function_call -# is not None -# ): -# if ( -# getattr( -# original_chunk.choices[0].delta.function_call, -# "arguments", -# ) -# is None -# ): -# original_chunk.choices[ -# 0 -# ].delta.function_call.arguments = "" -# elif original_chunk.choices[0].delta.tool_calls is not None: -# if isinstance( -# original_chunk.choices[0].delta.tool_calls, list -# ): -# for t in original_chunk.choices[0].delta.tool_calls: -# if hasattr(t, "functions") and hasattr( -# t.functions, "arguments" -# ): -# if ( -# getattr( -# t.function, -# "arguments", -# ) -# is None -# ): -# t.function.arguments = "" -# _json_delta = delta.model_dump() -# print_verbose(f"_json_delta: {_json_delta}") -# if "role" not in _json_delta or _json_delta["role"] is None: -# _json_delta["role"] = ( -# "assistant" # mistral's api returns role as None -# ) -# if "tool_calls" in _json_delta and isinstance( -# _json_delta["tool_calls"], list -# ): -# for tool in _json_delta["tool_calls"]: -# if ( -# isinstance(tool, dict) -# and "function" in tool -# and isinstance(tool["function"], dict) -# and ("type" not in tool or tool["type"] is None) -# ): -# # if function returned but type set to None - mistral's api returns type: None -# tool["type"] = "function" -# model_response.choices[0].delta = Delta(**_json_delta) -# except Exception as e: -# verbose_logger.exception( -# "litellm.CustomStreamWrapper.chunk_creator(): Exception occured - {}".format( -# str(e) -# ) -# ) -# model_response.choices[0].delta = Delta() -# elif ( -# delta is not None and getattr(delta, "audio", None) is not None -# ): -# model_response.choices[0].delta.audio = delta.audio -# else: -# try: -# delta = ( -# dict() -# if original_chunk.choices[0].delta is None -# else dict(original_chunk.choices[0].delta) -# ) -# print_verbose(f"original delta: {delta}") -# model_response.choices[0].delta = Delta(**delta) -# print_verbose( -# f"new delta: {model_response.choices[0].delta}" -# ) -# except Exception: -# model_response.choices[0].delta = Delta() -# else: -# if ( -# self.stream_options is not None -# and self.stream_options["include_usage"] is True -# ): -# return model_response -# return -# print_verbose( -# f"model_response.choices[0].delta: {model_response.choices[0].delta}; completion_obj: {completion_obj}" -# ) -# print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}") - -# ## CHECK FOR TOOL USE -# if "tool_calls" in completion_obj and len(completion_obj["tool_calls"]) > 0: -# if self.is_function_call is True: # user passed in 'functions' param -# completion_obj["function_call"] = completion_obj["tool_calls"][0][ -# "function" -# ] -# completion_obj["tool_calls"] = None - -# self.tool_call = True - -# ## RETURN ARG -# return self.return_processed_chunk_logic( -# completion_obj=completion_obj, -# model_response=model_response, # type: ignore -# response_obj=response_obj, -# ) - -# except StopIteration: -# raise StopIteration -# except Exception as e: -# traceback.format_exc() -# e.message = str(e) -# raise exception_type( -# model=self.model, -# custom_llm_provider=self.custom_llm_provider, -# original_exception=e, -# ) - -# def set_logging_event_loop(self, loop): -# """ -# import litellm, asyncio - -# loop = asyncio.get_event_loop() # 👈 gets the current event loop - -# response = litellm.completion(.., stream=True) - -# response.set_logging_event_loop(loop=loop) # 👈 enables async_success callbacks for sync logging - -# for chunk in response: -# ... -# """ -# self.logging_loop = loop - -# def run_success_logging_and_cache_storage(self, processed_chunk, cache_hit: bool): -# """ -# Runs success logging in a thread and adds the response to the cache -# """ -# if litellm.disable_streaming_logging is True: -# """ -# [NOT RECOMMENDED] -# Set this via `litellm.disable_streaming_logging = True`. - -# Disables streaming logging. -# """ -# return -# ## ASYNC LOGGING -# # Create an event loop for the new thread -# if self.logging_loop is not None: -# future = asyncio.run_coroutine_threadsafe( -# self.logging_obj.async_success_handler( -# processed_chunk, None, None, cache_hit -# ), -# loop=self.logging_loop, -# ) -# future.result() -# else: -# asyncio.run( -# self.logging_obj.async_success_handler( -# processed_chunk, None, None, cache_hit -# ) -# ) -# ## SYNC LOGGING -# self.logging_obj.success_handler(processed_chunk, None, None, cache_hit) - -# ## Sync store in cache -# if self.logging_obj._llm_caching_handler is not None: -# self.logging_obj._llm_caching_handler._sync_add_streaming_response_to_cache( -# processed_chunk -# ) - -# def finish_reason_handler(self): -# model_response = self.model_response_creator() -# complete_streaming_response = litellm.stream_chunk_builder( -# chunks=self.chunks -# ) -# _finish_reason = complete_streaming_response.choices[0].finish_reason - -# print(f"_finish_reason: {_finish_reason}") -# if _finish_reason is not None: -# model_response.choices[0].finish_reason = _finish_reason -# else: -# model_response.choices[0].finish_reason = "stop" - -# ## if tool use -# if ( -# model_response.choices[0].finish_reason == "stop" and self.tool_call -# ): # don't overwrite for other - potential error finish reasons -# model_response.choices[0].finish_reason = "tool_calls" -# return model_response - -# def __next__(self): # noqa: PLR0915 -# cache_hit = False -# if ( -# self.custom_llm_provider is not None -# and self.custom_llm_provider == "cached_response" -# ): -# cache_hit = True -# try: -# if self.completion_stream is None: -# self.fetch_sync_stream() -# while True: -# if ( -# isinstance(self.completion_stream, str) -# or isinstance(self.completion_stream, bytes) -# or isinstance(self.completion_stream, ModelResponse) -# ): -# chunk = self.completion_stream -# else: -# chunk = next(self.completion_stream) -# if chunk is not None and chunk != b"": -# print_verbose( -# f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}; custom_llm_provider: {self.custom_llm_provider}" -# ) -# response: Optional[ModelResponse] = self.chunk_creator(chunk=chunk) -# print_verbose(f"PROCESSED CHUNK POST CHUNK CREATOR: {response}") - -# if response is None: -# continue -# ## LOGGING -# threading.Thread( -# target=self.run_success_logging_and_cache_storage, -# args=(response, cache_hit), -# ).start() # log response -# choice = response.choices[0] -# if isinstance(choice, StreamingChoices): -# self.response_uptil_now += choice.delta.get("content", "") or "" -# else: -# self.response_uptil_now += "" -# self.rules.post_call_rules( -# input=self.response_uptil_now, model=self.model -# ) -# # HANDLE STREAM OPTIONS -# self.chunks.append(response) -# if hasattr( -# response, "usage" -# ): # remove usage from chunk, only send on final chunk -# # Convert the object to a dictionary -# obj_dict = response.dict() - -# # Remove an attribute (e.g., 'attr2') -# if "usage" in obj_dict: -# del obj_dict["usage"] - -# # Create a new object without the removed attribute -# response = self.model_response_creator( -# chunk=obj_dict, hidden_params=response._hidden_params -# ) -# # add usage as hidden param -# if self.sent_last_chunk is True and self.stream_options is None: -# usage = calculate_total_usage(chunks=self.chunks) -# response._hidden_params["usage"] = usage -# # RETURN RESULT -# return response - -# except StopIteration: -# if self.sent_last_chunk is True: -# complete_streaming_response = litellm.stream_chunk_builder( -# chunks=self.chunks, messages=self.messages -# ) -# response = self.model_response_creator() -# if complete_streaming_response is not None: -# setattr( -# response, -# "usage", -# getattr(complete_streaming_response, "usage"), -# ) - -# ## LOGGING -# threading.Thread( -# target=self.logging_obj.success_handler, -# args=(response, None, None, cache_hit), -# ).start() # log response - -# if self.sent_stream_usage is False and self.send_stream_usage is True: -# self.sent_stream_usage = True -# return response -# raise # Re-raise StopIteration -# else: -# self.sent_last_chunk = True -# processed_chunk = self.finish_reason_handler() -# if self.stream_options is None: # add usage as hidden param -# usage = calculate_total_usage(chunks=self.chunks) -# processed_chunk._hidden_params["usage"] = usage -# ## LOGGING -# threading.Thread( -# target=self.run_success_logging_and_cache_storage, -# args=(processed_chunk, cache_hit), -# ).start() # log response -# return processed_chunk -# except Exception as e: -# traceback_exception = traceback.format_exc() -# # LOG FAILURE - handle streaming failure logging in the _next_ object, remove `handle_failure` once it's deprecated -# threading.Thread( -# target=self.logging_obj.failure_handler, args=(e, traceback_exception) -# ).start() -# if isinstance(e, OpenAIError): -# raise e -# else: -# raise exception_type( -# model=self.model, -# original_exception=e, -# custom_llm_provider=self.custom_llm_provider, -# ) - -# def fetch_sync_stream(self): -# if self.completion_stream is None and self.make_call is not None: -# # Call make_call to get the completion stream -# self.completion_stream = self.make_call(client=litellm.module_level_client) -# self._stream_iter = self.completion_stream.__iter__() - -# return self.completion_stream - -# async def fetch_stream(self): -# if self.completion_stream is None and self.make_call is not None: -# # Call make_call to get the completion stream -# self.completion_stream = await self.make_call( -# client=litellm.module_level_aclient -# ) -# self._stream_iter = self.completion_stream.__aiter__() - -# return self.completion_stream - -# async def __anext__(self): # noqa: PLR0915 -# cache_hit = False -# if ( -# self.custom_llm_provider is not None -# and self.custom_llm_provider == "cached_response" -# ): -# cache_hit = True -# try: -# if self.completion_stream is None: -# await self.fetch_stream() - -# if ( -# self.custom_llm_provider == "openai" -# or self.custom_llm_provider == "azure" -# or self.custom_llm_provider == "custom_openai" -# or self.custom_llm_provider == "text-completion-openai" -# or self.custom_llm_provider == "text-completion-codestral" -# or self.custom_llm_provider == "azure_text" -# or self.custom_llm_provider == "anthropic" -# or self.custom_llm_provider == "anthropic_text" -# or self.custom_llm_provider == "huggingface" -# or self.custom_llm_provider == "ollama" -# or self.custom_llm_provider == "ollama_chat" -# or self.custom_llm_provider == "vertex_ai" -# or self.custom_llm_provider == "vertex_ai_beta" -# or self.custom_llm_provider == "sagemaker" -# or self.custom_llm_provider == "sagemaker_chat" -# or self.custom_llm_provider == "gemini" -# or self.custom_llm_provider == "replicate" -# or self.custom_llm_provider == "cached_response" -# or self.custom_llm_provider == "predibase" -# or self.custom_llm_provider == "databricks" -# or self.custom_llm_provider == "bedrock" -# or self.custom_llm_provider == "triton" -# or self.custom_llm_provider == "watsonx" -# or self.custom_llm_provider in litellm.openai_compatible_endpoints -# or self.custom_llm_provider in litellm._custom_providers -# ): -# async for chunk in self.completion_stream: -# if chunk == "None" or chunk is None: -# raise Exception -# elif ( -# self.custom_llm_provider == "gemini" -# and hasattr(chunk, "parts") -# and len(chunk.parts) == 0 -# ): -# continue -# # chunk_creator() does logging/stream chunk building. We need to let it know its being called in_async_func, so we don't double add chunks. -# # __anext__ also calls async_success_handler, which does logging -# print_verbose(f"PROCESSED ASYNC CHUNK PRE CHUNK CREATOR: {chunk}") - -# processed_chunk: Optional[ModelResponse] = self.chunk_creator( -# chunk=chunk -# ) -# print_verbose( -# f"PROCESSED ASYNC CHUNK POST CHUNK CREATOR: {processed_chunk}" -# ) -# if processed_chunk is None: -# continue -# ## LOGGING -# ## LOGGING -# executor.submit( -# self.logging_obj.success_handler, -# result=processed_chunk, -# start_time=None, -# end_time=None, -# cache_hit=cache_hit, -# ) - -# asyncio.create_task( -# self.logging_obj.async_success_handler( -# processed_chunk, cache_hit=cache_hit -# ) -# ) - -# if self.logging_obj._llm_caching_handler is not None: -# asyncio.create_task( -# self.logging_obj._llm_caching_handler._add_streaming_response_to_cache( -# processed_chunk=processed_chunk, -# ) -# ) - -# choice = processed_chunk.choices[0] -# if isinstance(choice, StreamingChoices): -# self.response_uptil_now += choice.delta.get("content", "") or "" -# else: -# self.response_uptil_now += "" -# self.rules.post_call_rules( -# input=self.response_uptil_now, model=self.model -# ) -# self.chunks.append(processed_chunk) -# if hasattr( -# processed_chunk, "usage" -# ): # remove usage from chunk, only send on final chunk -# # Convert the object to a dictionary -# obj_dict = processed_chunk.dict() - -# # Remove an attribute (e.g., 'attr2') -# if "usage" in obj_dict: -# del obj_dict["usage"] - -# # Create a new object without the removed attribute -# processed_chunk = self.model_response_creator(chunk=obj_dict) -# print_verbose(f"final returned processed chunk: {processed_chunk}") -# return processed_chunk -# raise StopAsyncIteration -# else: # temporary patch for non-aiohttp async calls -# # example - boto3 bedrock llms -# while True: -# if isinstance(self.completion_stream, str) or isinstance( -# self.completion_stream, bytes -# ): -# chunk = self.completion_stream -# else: -# chunk = next(self.completion_stream) -# if chunk is not None and chunk != b"": -# print_verbose(f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}") -# processed_chunk: Optional[ModelResponse] = self.chunk_creator( -# chunk=chunk -# ) -# print_verbose( -# f"PROCESSED CHUNK POST CHUNK CREATOR: {processed_chunk}" -# ) -# if processed_chunk is None: -# continue -# ## LOGGING -# threading.Thread( -# target=self.logging_obj.success_handler, -# args=(processed_chunk, None, None, cache_hit), -# ).start() # log processed_chunk -# asyncio.create_task( -# self.logging_obj.async_success_handler( -# processed_chunk, cache_hit=cache_hit -# ) -# ) - -# choice = processed_chunk.choices[0] -# if isinstance(choice, StreamingChoices): -# self.response_uptil_now += ( -# choice.delta.get("content", "") or "" -# ) -# else: -# self.response_uptil_now += "" -# self.rules.post_call_rules( -# input=self.response_uptil_now, model=self.model -# ) -# # RETURN RESULT -# self.chunks.append(processed_chunk) -# return processed_chunk -# except (StopAsyncIteration, StopIteration): -# if self.sent_last_chunk is True: -# # log the final chunk with accurate streaming values -# complete_streaming_response = litellm.stream_chunk_builder( -# chunks=self.chunks, messages=self.messages -# ) -# response = self.model_response_creator() -# if complete_streaming_response is not None: -# setattr( -# response, -# "usage", -# getattr(complete_streaming_response, "usage"), -# ) -# ## LOGGING -# threading.Thread( -# target=self.logging_obj.success_handler, -# args=(response, None, None, cache_hit), -# ).start() # log response -# asyncio.create_task( -# self.logging_obj.async_success_handler( -# response, cache_hit=cache_hit -# ) -# ) -# if self.sent_stream_usage is False and self.send_stream_usage is True: -# self.sent_stream_usage = True -# return response -# raise StopAsyncIteration # Re-raise StopIteration -# else: -# self.sent_last_chunk = True -# processed_chunk = self.finish_reason_handler() -# ## LOGGING -# threading.Thread( -# target=self.logging_obj.success_handler, -# args=(processed_chunk, None, None, cache_hit), -# ).start() # log response -# asyncio.create_task( -# self.logging_obj.async_success_handler( -# processed_chunk, cache_hit=cache_hit -# ) -# ) -# return processed_chunk -# except httpx.TimeoutException as e: # if httpx read timeout error occues -# traceback_exception = traceback.format_exc() -# ## ADD DEBUG INFORMATION - E.G. LITELLM REQUEST TIMEOUT -# traceback_exception += "\nLiteLLM Default Request Timeout - {}".format( -# litellm.request_timeout -# ) -# if self.logging_obj is not None: -# ## LOGGING -# threading.Thread( -# target=self.logging_obj.failure_handler, -# args=(e, traceback_exception), -# ).start() # log response -# # Handle any exceptions that might occur during streaming -# asyncio.create_task( -# self.logging_obj.async_failure_handler(e, traceback_exception) -# ) -# raise e -# except Exception as e: -# traceback_exception = traceback.format_exc() -# if self.logging_obj is not None: -# ## LOGGING -# threading.Thread( -# target=self.logging_obj.failure_handler, -# args=(e, traceback_exception), -# ).start() # log response -# # Handle any exceptions that might occur during streaming -# asyncio.create_task( -# self.logging_obj.async_failure_handler(e, traceback_exception) # type: ignore -# ) -# ## Map to OpenAI Exception -# raise exception_type( -# model=self.model, -# custom_llm_provider=self.custom_llm_provider, -# original_exception=e, -# completion_kwargs={}, -# extra_kwargs={}, -# ) - - class TextCompletionStreamWrapper: def __init__( self, @@ -7977,7 +5869,6 @@ def get_valid_models() -> List[str]: if expected_provider_key in environ_keys: # key is set valid_providers.append(provider) - for provider in valid_providers: if provider == "azure": valid_models.append("Azure-LLM") @@ -8253,10 +6144,13 @@ def validate_chat_completion_user_messages(messages: List[AllMessageValues]): if isinstance(item, dict): if item.get("type") not in ValidUserMessageContentTypes: raise Exception("invalid content type") - except Exception: - raise Exception( - f"Invalid user message={m} at index {idx}. Please ensure all user messages are valid OpenAI chat completion messages." - ) + except Exception as e: + if "invalid content type" in str(e): + raise Exception( + f"Invalid user message={m} at index {idx}. Please ensure all user messages are valid OpenAI chat completion messages." + ) + else: + raise e return messages diff --git a/model_prices_and_context_window.json b/model_prices_and_context_window.json index e8aeac2cb..48b25523e 100644 --- a/model_prices_and_context_window.json +++ b/model_prices_and_context_window.json @@ -1898,7 +1898,8 @@ "supports_function_calling": true, "tool_use_system_prompt_tokens": 264, "supports_assistant_prefill": true, - "supports_prompt_caching": true + "supports_prompt_caching": true, + "supports_pdf_input": true }, "claude-3-opus-20240229": { "max_tokens": 4096, diff --git a/tests/llm_translation/base_llm_unit_tests.py b/tests/llm_translation/base_llm_unit_tests.py index 4f9cd9c25..96004eb4e 100644 --- a/tests/llm_translation/base_llm_unit_tests.py +++ b/tests/llm_translation/base_llm_unit_tests.py @@ -44,3 +44,30 @@ class BaseLLMChatTest(ABC): messages=messages, ) assert response is not None + + @pytest.fixture + def pdf_messages(self): + import base64 + + import requests + + # URL of the file + url = "https://storage.googleapis.com/cloud-samples-data/generative-ai/pdf/2403.05530.pdf" + + response = requests.get(url) + file_data = response.content + + encoded_file = base64.b64encode(file_data).decode("utf-8") + url = f"data:application/pdf;base64,{encoded_file}" + + image_content = [ + {"type": "text", "text": "What's this file about?"}, + { + "type": "image_url", + "image_url": {"url": url}, + }, + ] + + image_messages = [{"role": "user", "content": image_content}] + + return image_messages diff --git a/tests/llm_translation/test_anthropic_completion.py b/tests/llm_translation/test_anthropic_completion.py index 46f01e0ec..9d7c9af73 100644 --- a/tests/llm_translation/test_anthropic_completion.py +++ b/tests/llm_translation/test_anthropic_completion.py @@ -36,6 +36,7 @@ from litellm.types.llms.anthropic import AnthropicResponse from litellm.llms.anthropic.common_utils import process_anthropic_headers from httpx import Headers +from base_llm_unit_tests import BaseLLMChatTest def test_anthropic_completion_messages_translation(): @@ -624,3 +625,40 @@ def test_anthropic_tool_helper(cache_control_location): tool = AnthropicConfig()._map_tool_helper(tool=tool) assert tool["cache_control"] == {"type": "ephemeral"} + + +from litellm import completion + + +class TestAnthropicCompletion(BaseLLMChatTest): + def get_base_completion_call_args(self) -> dict: + return {"model": "claude-3-haiku-20240307"} + + def test_pdf_handling(self, pdf_messages): + from litellm.llms.custom_httpx.http_handler import HTTPHandler + from litellm.types.llms.anthropic import AnthropicMessagesDocumentParam + import json + + client = HTTPHandler() + + with patch.object(client, "post", new=MagicMock()) as mock_client: + response = completion( + model="claude-3-5-sonnet-20241022", + messages=pdf_messages, + client=client, + ) + + mock_client.assert_called_once() + + json_data = json.loads(mock_client.call_args.kwargs["data"]) + headers = mock_client.call_args.kwargs["headers"] + + assert headers["anthropic-beta"] == "pdfs-2024-09-25" + + json_data["messages"][0]["role"] == "user" + _document_validation = AnthropicMessagesDocumentParam( + **json_data["messages"][0]["content"][1] + ) + assert _document_validation["type"] == "document" + assert _document_validation["source"]["media_type"] == "application/pdf" + assert _document_validation["source"]["type"] == "base64" diff --git a/tests/local_testing/test_get_llm_provider.py b/tests/local_testing/test_get_llm_provider.py index f7126cec0..6654c10c2 100644 --- a/tests/local_testing/test_get_llm_provider.py +++ b/tests/local_testing/test_get_llm_provider.py @@ -169,3 +169,11 @@ def test_get_llm_provider_hosted_vllm(): assert custom_llm_provider == "hosted_vllm" assert model == "llama-3.1-70b-instruct" assert dynamic_api_key == "" + + +def test_get_llm_provider_watson_text(): + model, custom_llm_provider, dynamic_api_key, api_base = litellm.get_llm_provider( + model="watsonx_text/watson-text-to-speech", + ) + assert custom_llm_provider == "watsonx_text" + assert model == "watson-text-to-speech" diff --git a/tests/local_testing/test_get_model_list.py b/tests/local_testing/test_get_model_list.py deleted file mode 100644 index 7663eebf5..000000000 --- a/tests/local_testing/test_get_model_list.py +++ /dev/null @@ -1,11 +0,0 @@ -import os, sys, traceback - -sys.path.insert( - 0, os.path.abspath("../..") -) # Adds the parent directory to the system path -import litellm -from litellm import get_model_list - -print(get_model_list()) -print(get_model_list()) -# print(litellm.model_list) diff --git a/tests/local_testing/test_opentelemetry_unit_tests.py b/tests/local_testing/test_opentelemetry_unit_tests.py deleted file mode 100644 index 530adc6ab..000000000 --- a/tests/local_testing/test_opentelemetry_unit_tests.py +++ /dev/null @@ -1,41 +0,0 @@ -# What is this? -## Unit tests for opentelemetry integration - -# What is this? -## Unit test for presidio pii masking -import sys, os, asyncio, time, random -from datetime import datetime -import traceback -from dotenv import load_dotenv - -load_dotenv() -import os -import asyncio - -sys.path.insert( - 0, os.path.abspath("../..") -) # Adds the parent directory to the system path -import pytest -import litellm -from unittest.mock import patch, MagicMock, AsyncMock - - -@pytest.mark.asyncio -async def test_opentelemetry_integration(): - """ - Unit test to confirm the parent otel span is ended - """ - - parent_otel_span = MagicMock() - litellm.callbacks = ["otel"] - - await litellm.acompletion( - model="gpt-3.5-turbo", - messages=[{"role": "user", "content": "Hello, world!"}], - mock_response="Hey!", - metadata={"litellm_parent_otel_span": parent_otel_span}, - ) - - await asyncio.sleep(1) - - parent_otel_span.end.assert_called_once() diff --git a/tests/local_testing/test_utils.py b/tests/local_testing/test_utils.py index 5aa3b610c..b3f8208bf 100644 --- a/tests/local_testing/test_utils.py +++ b/tests/local_testing/test_utils.py @@ -943,3 +943,24 @@ def test_validate_chat_completion_user_messages(messages, expected_bool): ## Invalid message with pytest.raises(Exception): validate_chat_completion_user_messages(messages=messages) + + +def test_models_by_provider(): + """ + Make sure all providers from model map are in the valid providers list + """ + from litellm import models_by_provider + + providers = set() + for k, v in litellm.model_cost.items(): + if "_" in v["litellm_provider"] and "-" in v["litellm_provider"]: + continue + elif k == "sample_spec": + continue + elif v["litellm_provider"] == "sagemaker": + continue + else: + providers.add(v["litellm_provider"]) + + for provider in providers: + assert provider in models_by_provider.keys() diff --git a/tests/logging_callback_tests/base_test.py b/tests/logging_callback_tests/base_test.py new file mode 100644 index 000000000..0d1e7dfcf --- /dev/null +++ b/tests/logging_callback_tests/base_test.py @@ -0,0 +1,100 @@ +import asyncio +import httpx +import json +import pytest +import sys +from typing import Any, Dict, List +from unittest.mock import MagicMock, Mock, patch +import os + +sys.path.insert( + 0, os.path.abspath("../..") +) # Adds the parent directory to the system path +import litellm +from litellm.exceptions import BadRequestError +from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler +from litellm.utils import CustomStreamWrapper +from litellm.types.utils import ModelResponse + +# test_example.py +from abc import ABC, abstractmethod + + +class BaseLoggingCallbackTest(ABC): + """ + Abstract base test class that enforces a common test across all test classes. + """ + + @pytest.fixture + def mock_response_obj(self): + from litellm.types.utils import ( + ModelResponse, + Choices, + Message, + ChatCompletionMessageToolCall, + Function, + Usage, + CompletionTokensDetailsWrapper, + PromptTokensDetailsWrapper, + ) + + # Create a mock response object with the structure you need + return ModelResponse( + id="chatcmpl-ASId3YJWagBpBskWfoNEMPFSkmrEw", + created=1731308157, + model="gpt-4o-mini-2024-07-18", + object="chat.completion", + system_fingerprint="fp_0ba0d124f1", + choices=[ + Choices( + finish_reason="tool_calls", + index=0, + message=Message( + content=None, + role="assistant", + tool_calls=[ + ChatCompletionMessageToolCall( + function=Function( + arguments='{"city": "New York"}', name="get_weather" + ), + id="call_PngsQS5YGmIZKnswhnUOnOVb", + type="function", + ), + ChatCompletionMessageToolCall( + function=Function( + arguments='{"city": "New York"}', name="get_news" + ), + id="call_1zsDThBu0VSK7KuY7eCcJBnq", + type="function", + ), + ], + function_call=None, + ), + ) + ], + usage=Usage( + completion_tokens=46, + prompt_tokens=86, + total_tokens=132, + completion_tokens_details=CompletionTokensDetailsWrapper( + accepted_prediction_tokens=0, + audio_tokens=0, + reasoning_tokens=0, + rejected_prediction_tokens=0, + text_tokens=None, + ), + prompt_tokens_details=PromptTokensDetailsWrapper( + audio_tokens=0, cached_tokens=0, text_tokens=None, image_tokens=None + ), + ), + service_tier=None, + ) + + @abstractmethod + def test_parallel_tool_calls(self, mock_response_obj: ModelResponse): + """ + Check if parallel tool calls are correctly logged by Logging callback + + Relevant issue - https://github.com/BerriAI/litellm/issues/6677 + """ + pass diff --git a/tests/logging_callback_tests/test_opentelemetry_unit_tests.py b/tests/logging_callback_tests/test_opentelemetry_unit_tests.py new file mode 100644 index 000000000..b0d09562c --- /dev/null +++ b/tests/logging_callback_tests/test_opentelemetry_unit_tests.py @@ -0,0 +1,58 @@ +# What is this? +## Unit tests for opentelemetry integration + +# What is this? +## Unit test for presidio pii masking +import sys, os, asyncio, time, random +from datetime import datetime +import traceback +from dotenv import load_dotenv + +load_dotenv() +import os +import asyncio + +sys.path.insert( + 0, os.path.abspath("../..") +) # Adds the parent directory to the system path +import pytest +import litellm +from unittest.mock import patch, MagicMock, AsyncMock +from base_test import BaseLoggingCallbackTest +from litellm.types.utils import ModelResponse + + +class TestOpentelemetryUnitTests(BaseLoggingCallbackTest): + def test_parallel_tool_calls(self, mock_response_obj: ModelResponse): + tool_calls = mock_response_obj.choices[0].message.tool_calls + from litellm.integrations.opentelemetry import OpenTelemetry + from litellm.proxy._types import SpanAttributes + + kv_pair_dict = OpenTelemetry._tool_calls_kv_pair(tool_calls) + + assert kv_pair_dict == { + f"{SpanAttributes.LLM_COMPLETIONS}.0.function_call.arguments": '{"city": "New York"}', + f"{SpanAttributes.LLM_COMPLETIONS}.0.function_call.name": "get_weather", + f"{SpanAttributes.LLM_COMPLETIONS}.1.function_call.arguments": '{"city": "New York"}', + f"{SpanAttributes.LLM_COMPLETIONS}.1.function_call.name": "get_news", + } + + @pytest.mark.asyncio + async def test_opentelemetry_integration(self): + """ + Unit test to confirm the parent otel span is ended + """ + + parent_otel_span = MagicMock() + litellm.callbacks = ["otel"] + + await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hello, world!"}], + mock_response="Hey!", + metadata={"litellm_parent_otel_span": parent_otel_span}, + ) + + await asyncio.sleep(1) + + parent_otel_span.end.assert_called_once()