mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 19:24:27 +00:00
451 lines
15 KiB
Python
451 lines
15 KiB
Python
# What is this?
|
||
## Handler file for calling claude-3 on vertex ai
|
||
import copy
|
||
import json
|
||
import os
|
||
import time
|
||
import types
|
||
import uuid
|
||
from enum import Enum
|
||
from typing import Any, Callable, List, Optional, Tuple, Union
|
||
|
||
import httpx # type: ignore
|
||
import requests # type: ignore
|
||
|
||
import litellm
|
||
from litellm.litellm_core_utils.core_helpers import map_finish_reason
|
||
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
|
||
from litellm.types.llms.anthropic import (
|
||
AnthropicMessagesTool,
|
||
AnthropicMessagesToolChoice,
|
||
)
|
||
from litellm.types.llms.openai import (
|
||
ChatCompletionToolParam,
|
||
ChatCompletionToolParamFunctionChunk,
|
||
)
|
||
from litellm.types.utils import ResponseFormatChunk
|
||
from litellm.utils import CustomStreamWrapper, ModelResponse, Usage
|
||
|
||
from .prompt_templates.factory import (
|
||
construct_tool_use_system_prompt,
|
||
contains_tag,
|
||
custom_prompt,
|
||
extract_between_tags,
|
||
parse_xml_params,
|
||
prompt_factory,
|
||
response_schema_prompt,
|
||
)
|
||
|
||
|
||
class VertexAIError(Exception):
|
||
def __init__(self, status_code, message):
|
||
self.status_code = status_code
|
||
self.message = message
|
||
self.request = httpx.Request(
|
||
method="POST", url=" https://cloud.google.com/vertex-ai/"
|
||
)
|
||
self.response = httpx.Response(status_code=status_code, request=self.request)
|
||
super().__init__(
|
||
self.message
|
||
) # Call the base class constructor with the parameters it needs
|
||
|
||
|
||
class VertexAIAnthropicConfig:
|
||
"""
|
||
Reference:https://docs.anthropic.com/claude/reference/messages_post
|
||
|
||
Note that the API for Claude on Vertex differs from the Anthropic API documentation in the following ways:
|
||
|
||
- `model` is not a valid parameter. The model is instead specified in the Google Cloud endpoint URL.
|
||
- `anthropic_version` is a required parameter and must be set to "vertex-2023-10-16".
|
||
|
||
The class `VertexAIAnthropicConfig` provides configuration for the VertexAI's Anthropic API interface. Below are the parameters:
|
||
|
||
- `max_tokens` Required (integer) max tokens,
|
||
- `anthropic_version` Required (string) version of anthropic for bedrock - e.g. "bedrock-2023-05-31"
|
||
- `system` Optional (string) the system prompt, conversion from openai format to this is handled in factory.py
|
||
- `temperature` Optional (float) The amount of randomness injected into the response
|
||
- `top_p` Optional (float) Use nucleus sampling.
|
||
- `top_k` Optional (int) Only sample from the top K options for each subsequent token
|
||
- `stop_sequences` Optional (List[str]) Custom text sequences that cause the model to stop generating
|
||
|
||
Note: Please make sure to modify the default parameters as required for your use case.
|
||
"""
|
||
|
||
max_tokens: Optional[int] = (
|
||
4096 # anthropic max - setting this doesn't impact response, but is required by anthropic.
|
||
)
|
||
system: Optional[str] = None
|
||
temperature: Optional[float] = None
|
||
top_p: Optional[float] = None
|
||
top_k: Optional[int] = None
|
||
stop_sequences: Optional[List[str]] = None
|
||
|
||
def __init__(
|
||
self,
|
||
max_tokens: Optional[int] = None,
|
||
anthropic_version: Optional[str] = None,
|
||
) -> None:
|
||
locals_ = locals()
|
||
for key, value in locals_.items():
|
||
if key == "max_tokens" and value is None:
|
||
value = self.max_tokens
|
||
if key != "self" and value is not None:
|
||
setattr(self.__class__, key, value)
|
||
|
||
@classmethod
|
||
def get_config(cls):
|
||
return {
|
||
k: v
|
||
for k, v in cls.__dict__.items()
|
||
if not k.startswith("__")
|
||
and not isinstance(
|
||
v,
|
||
(
|
||
types.FunctionType,
|
||
types.BuiltinFunctionType,
|
||
classmethod,
|
||
staticmethod,
|
||
),
|
||
)
|
||
and v is not None
|
||
}
|
||
|
||
def get_supported_openai_params(self):
|
||
return [
|
||
"max_tokens",
|
||
"tools",
|
||
"tool_choice",
|
||
"stream",
|
||
"stop",
|
||
"temperature",
|
||
"top_p",
|
||
"response_format",
|
||
]
|
||
|
||
def map_openai_params(self, non_default_params: dict, optional_params: dict):
|
||
for param, value in non_default_params.items():
|
||
if param == "max_tokens":
|
||
optional_params["max_tokens"] = value
|
||
if param == "tools":
|
||
optional_params["tools"] = value
|
||
if param == "tool_choice":
|
||
_tool_choice: Optional[AnthropicMessagesToolChoice] = None
|
||
if value == "auto":
|
||
_tool_choice = {"type": "auto"}
|
||
elif value == "required":
|
||
_tool_choice = {"type": "any"}
|
||
elif isinstance(value, dict):
|
||
_tool_choice = {"type": "tool", "name": value["function"]["name"]}
|
||
|
||
if _tool_choice is not None:
|
||
optional_params["tool_choice"] = _tool_choice
|
||
if param == "stream":
|
||
optional_params["stream"] = value
|
||
if param == "stop":
|
||
optional_params["stop_sequences"] = value
|
||
if param == "temperature":
|
||
optional_params["temperature"] = value
|
||
if param == "top_p":
|
||
optional_params["top_p"] = value
|
||
if param == "response_format" and "response_schema" in value:
|
||
"""
|
||
When using tools in this way: - https://docs.anthropic.com/en/docs/build-with-claude/tool-use#json-mode
|
||
- You usually want to provide a single tool
|
||
- You should set tool_choice (see Forcing tool use) to instruct the model to explicitly use that tool
|
||
- Remember that the model will pass the input to the tool, so the name of the tool and description should be from the model’s perspective.
|
||
"""
|
||
_tool_choice = None
|
||
_tool_choice = {"name": "json_tool_call", "type": "tool"}
|
||
|
||
_tool = AnthropicMessagesTool(
|
||
name="json_tool_call",
|
||
input_schema={
|
||
"type": "object",
|
||
"properties": {"values": value["response_schema"]}, # type: ignore
|
||
},
|
||
)
|
||
|
||
optional_params["tools"] = [_tool]
|
||
optional_params["tool_choice"] = _tool_choice
|
||
optional_params["json_mode"] = True
|
||
|
||
return optional_params
|
||
|
||
|
||
"""
|
||
- Run client init
|
||
- Support async completion, streaming
|
||
"""
|
||
|
||
|
||
def refresh_auth(
|
||
credentials,
|
||
) -> str: # used when user passes in credentials as json string
|
||
from google.auth.transport.requests import Request # type: ignore[import-untyped]
|
||
|
||
if credentials.token is None:
|
||
credentials.refresh(Request())
|
||
|
||
if not credentials.token:
|
||
raise RuntimeError("Could not resolve API token from the credentials")
|
||
|
||
return credentials.token
|
||
|
||
|
||
def get_vertex_client(
|
||
client: Any,
|
||
vertex_project: Optional[str],
|
||
vertex_location: Optional[str],
|
||
vertex_credentials: Optional[str],
|
||
) -> Tuple[Any, Optional[str]]:
|
||
args = locals()
|
||
from litellm.llms.vertex_httpx import VertexLLM
|
||
|
||
try:
|
||
from anthropic import AnthropicVertex
|
||
except Exception:
|
||
raise VertexAIError(
|
||
status_code=400,
|
||
message="""vertexai import failed please run `pip install -U google-cloud-aiplatform "anthropic[vertex]"`""",
|
||
)
|
||
|
||
access_token: Optional[str] = None
|
||
|
||
if client is None:
|
||
_credentials, cred_project_id = VertexLLM().load_auth(
|
||
credentials=vertex_credentials, project_id=vertex_project
|
||
)
|
||
|
||
vertex_ai_client = AnthropicVertex(
|
||
project_id=vertex_project or cred_project_id,
|
||
region=vertex_location or "us-central1",
|
||
access_token=_credentials.token,
|
||
)
|
||
access_token = _credentials.token
|
||
else:
|
||
vertex_ai_client = client
|
||
access_token = client.access_token
|
||
|
||
return vertex_ai_client, access_token
|
||
|
||
|
||
def create_vertex_anthropic_url(
|
||
vertex_location: str, vertex_project: str, model: str, stream: bool
|
||
) -> str:
|
||
if stream is True:
|
||
return f"https://{vertex_location}-aiplatform.googleapis.com/v1/projects/{vertex_project}/locations/{vertex_location}/publishers/anthropic/models/{model}:streamRawPredict"
|
||
else:
|
||
return f"https://{vertex_location}-aiplatform.googleapis.com/v1/projects/{vertex_project}/locations/{vertex_location}/publishers/anthropic/models/{model}:rawPredict"
|
||
|
||
|
||
def completion(
|
||
model: str,
|
||
messages: list,
|
||
model_response: ModelResponse,
|
||
print_verbose: Callable,
|
||
encoding,
|
||
logging_obj,
|
||
optional_params: dict,
|
||
custom_prompt_dict: dict,
|
||
headers: Optional[dict],
|
||
timeout: Union[float, httpx.Timeout],
|
||
vertex_project=None,
|
||
vertex_location=None,
|
||
vertex_credentials=None,
|
||
litellm_params=None,
|
||
logger_fn=None,
|
||
acompletion: bool = False,
|
||
client=None,
|
||
):
|
||
try:
|
||
import vertexai
|
||
from anthropic import AnthropicVertex
|
||
|
||
from litellm.llms.anthropic import AnthropicChatCompletion
|
||
from litellm.llms.vertex_httpx import VertexLLM
|
||
except:
|
||
raise VertexAIError(
|
||
status_code=400,
|
||
message="""vertexai import failed please run `pip install -U google-cloud-aiplatform "anthropic[vertex]"`""",
|
||
)
|
||
|
||
if not (
|
||
hasattr(vertexai, "preview") or hasattr(vertexai.preview, "language_models")
|
||
):
|
||
raise VertexAIError(
|
||
status_code=400,
|
||
message="""Upgrade vertex ai. Run `pip install "google-cloud-aiplatform>=1.38"`""",
|
||
)
|
||
try:
|
||
|
||
vertex_httpx_logic = VertexLLM()
|
||
|
||
access_token, project_id = vertex_httpx_logic._ensure_access_token(
|
||
credentials=vertex_credentials, project_id=vertex_project
|
||
)
|
||
|
||
anthropic_chat_completions = AnthropicChatCompletion()
|
||
|
||
## Load Config
|
||
config = litellm.VertexAIAnthropicConfig.get_config()
|
||
for k, v in config.items():
|
||
if k not in optional_params:
|
||
optional_params[k] = v
|
||
|
||
## CONSTRUCT API BASE
|
||
stream = optional_params.get("stream", False)
|
||
|
||
api_base = create_vertex_anthropic_url(
|
||
vertex_location=vertex_location or "us-central1",
|
||
vertex_project=vertex_project or project_id,
|
||
model=model,
|
||
stream=stream,
|
||
)
|
||
|
||
if headers is not None:
|
||
vertex_headers = headers
|
||
else:
|
||
vertex_headers = {}
|
||
|
||
vertex_headers.update({"Authorization": "Bearer {}".format(access_token)})
|
||
|
||
optional_params.update(
|
||
{"anthropic_version": "vertex-2023-10-16", "is_vertex_request": True}
|
||
)
|
||
|
||
return anthropic_chat_completions.completion(
|
||
model=model,
|
||
messages=messages,
|
||
api_base=api_base,
|
||
custom_prompt_dict=custom_prompt_dict,
|
||
model_response=model_response,
|
||
print_verbose=print_verbose,
|
||
encoding=encoding,
|
||
api_key=access_token,
|
||
logging_obj=logging_obj,
|
||
optional_params=optional_params,
|
||
acompletion=acompletion,
|
||
litellm_params=litellm_params,
|
||
logger_fn=logger_fn,
|
||
headers=vertex_headers,
|
||
client=client,
|
||
timeout=timeout,
|
||
)
|
||
|
||
except Exception as e:
|
||
raise VertexAIError(status_code=500, message=str(e))
|
||
|
||
|
||
async def async_completion(
|
||
model: str,
|
||
messages: list,
|
||
data: dict,
|
||
model_response: ModelResponse,
|
||
print_verbose: Callable,
|
||
logging_obj,
|
||
vertex_project=None,
|
||
vertex_location=None,
|
||
optional_params=None,
|
||
client=None,
|
||
access_token=None,
|
||
):
|
||
from anthropic import AsyncAnthropicVertex
|
||
|
||
if client is None:
|
||
vertex_ai_client = AsyncAnthropicVertex(
|
||
project_id=vertex_project, region=vertex_location, access_token=access_token
|
||
)
|
||
else:
|
||
vertex_ai_client = client
|
||
|
||
## LOGGING
|
||
logging_obj.pre_call(
|
||
input=messages,
|
||
api_key=None,
|
||
additional_args={
|
||
"complete_input_dict": optional_params,
|
||
},
|
||
)
|
||
message = await vertex_ai_client.messages.create(**data) # type: ignore
|
||
text_content = message.content[0].text
|
||
## TOOL CALLING - OUTPUT PARSE
|
||
if text_content is not None and contains_tag("invoke", text_content):
|
||
function_name = extract_between_tags("tool_name", text_content)[0]
|
||
function_arguments_str = extract_between_tags("invoke", text_content)[0].strip()
|
||
function_arguments_str = f"<invoke>{function_arguments_str}</invoke>"
|
||
function_arguments = parse_xml_params(function_arguments_str)
|
||
_message = litellm.Message(
|
||
tool_calls=[
|
||
{
|
||
"id": f"call_{uuid.uuid4()}",
|
||
"type": "function",
|
||
"function": {
|
||
"name": function_name,
|
||
"arguments": json.dumps(function_arguments),
|
||
},
|
||
}
|
||
],
|
||
content=None,
|
||
)
|
||
model_response.choices[0].message = _message # type: ignore
|
||
else:
|
||
model_response.choices[0].message.content = text_content # type: ignore
|
||
model_response.choices[0].finish_reason = map_finish_reason(message.stop_reason)
|
||
|
||
## CALCULATING USAGE
|
||
prompt_tokens = message.usage.input_tokens
|
||
completion_tokens = message.usage.output_tokens
|
||
|
||
model_response.created = int(time.time())
|
||
model_response.model = model
|
||
usage = Usage(
|
||
prompt_tokens=prompt_tokens,
|
||
completion_tokens=completion_tokens,
|
||
total_tokens=prompt_tokens + completion_tokens,
|
||
)
|
||
setattr(model_response, "usage", usage)
|
||
return model_response
|
||
|
||
|
||
async def async_streaming(
|
||
model: str,
|
||
messages: list,
|
||
data: dict,
|
||
model_response: ModelResponse,
|
||
print_verbose: Callable,
|
||
logging_obj,
|
||
vertex_project=None,
|
||
vertex_location=None,
|
||
optional_params=None,
|
||
client=None,
|
||
access_token=None,
|
||
):
|
||
from anthropic import AsyncAnthropicVertex
|
||
|
||
if client is None:
|
||
vertex_ai_client = AsyncAnthropicVertex(
|
||
project_id=vertex_project, region=vertex_location, access_token=access_token
|
||
)
|
||
else:
|
||
vertex_ai_client = client
|
||
|
||
## LOGGING
|
||
logging_obj.pre_call(
|
||
input=messages,
|
||
api_key=None,
|
||
additional_args={
|
||
"complete_input_dict": optional_params,
|
||
},
|
||
)
|
||
response = await vertex_ai_client.messages.create(**data, stream=True) # type: ignore
|
||
logging_obj.post_call(input=messages, api_key=None, original_response=response)
|
||
|
||
streamwrapper = CustomStreamWrapper(
|
||
completion_stream=response,
|
||
model=model,
|
||
custom_llm_provider="vertex_ai",
|
||
logging_obj=logging_obj,
|
||
)
|
||
|
||
return streamwrapper
|