mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 03:04:13 +00:00
feat(vertex_ai_partner.py): initial working commit for calling vertex ai mistral
Closes https://github.com/BerriAI/litellm/issues/4874
This commit is contained in:
parent
1a8f45e8da
commit
5b71421a7b
10 changed files with 343 additions and 140 deletions
|
@ -15,8 +15,14 @@ import requests # type: ignore
|
|||
import litellm
|
||||
from litellm.litellm_core_utils.core_helpers import map_finish_reason
|
||||
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
|
||||
from litellm.types.llms.databricks import GenericStreamingChunk
|
||||
from litellm.types.utils import ProviderField
|
||||
from litellm.types.llms.openai import (
|
||||
ChatCompletionDeltaChunk,
|
||||
ChatCompletionResponseMessage,
|
||||
ChatCompletionToolCallChunk,
|
||||
ChatCompletionToolCallFunctionChunk,
|
||||
ChatCompletionUsageBlock,
|
||||
)
|
||||
from litellm.types.utils import GenericStreamingChunk, ProviderField
|
||||
from litellm.utils import CustomStreamWrapper, EmbeddingResponse, ModelResponse, Usage
|
||||
|
||||
from .base import BaseLLM
|
||||
|
@ -114,71 +120,6 @@ class DatabricksConfig:
|
|||
optional_params["stop"] = value
|
||||
return optional_params
|
||||
|
||||
def _chunk_parser(self, chunk_data: str) -> GenericStreamingChunk:
|
||||
try:
|
||||
text = ""
|
||||
is_finished = False
|
||||
finish_reason = None
|
||||
logprobs = None
|
||||
usage = None
|
||||
original_chunk = None # this is used for function/tool calling
|
||||
chunk_data = chunk_data.replace("data:", "")
|
||||
chunk_data = chunk_data.strip()
|
||||
if len(chunk_data) == 0 or chunk_data == "[DONE]":
|
||||
return {
|
||||
"text": "",
|
||||
"is_finished": is_finished,
|
||||
"finish_reason": finish_reason,
|
||||
}
|
||||
chunk_data_dict = json.loads(chunk_data)
|
||||
str_line = litellm.ModelResponse(**chunk_data_dict, stream=True)
|
||||
|
||||
if len(str_line.choices) > 0:
|
||||
if (
|
||||
str_line.choices[0].delta is not None # type: ignore
|
||||
and str_line.choices[0].delta.content is not None # type: ignore
|
||||
):
|
||||
text = str_line.choices[0].delta.content # type: ignore
|
||||
else: # function/tool calling chunk - when content is None. in this case we just return the original chunk from openai
|
||||
original_chunk = str_line
|
||||
if str_line.choices[0].finish_reason:
|
||||
is_finished = True
|
||||
finish_reason = str_line.choices[0].finish_reason
|
||||
if finish_reason == "content_filter":
|
||||
if hasattr(str_line.choices[0], "content_filter_result"):
|
||||
error_message = json.dumps(
|
||||
str_line.choices[0].content_filter_result # type: ignore
|
||||
)
|
||||
else:
|
||||
error_message = "Azure Response={}".format(
|
||||
str(dict(str_line))
|
||||
)
|
||||
raise litellm.AzureOpenAIError(
|
||||
status_code=400, message=error_message
|
||||
)
|
||||
|
||||
# checking for logprobs
|
||||
if (
|
||||
hasattr(str_line.choices[0], "logprobs")
|
||||
and str_line.choices[0].logprobs is not None
|
||||
):
|
||||
logprobs = str_line.choices[0].logprobs
|
||||
else:
|
||||
logprobs = None
|
||||
|
||||
usage = getattr(str_line, "usage", None)
|
||||
|
||||
return GenericStreamingChunk(
|
||||
text=text,
|
||||
is_finished=is_finished,
|
||||
finish_reason=finish_reason,
|
||||
logprobs=logprobs,
|
||||
original_chunk=original_chunk,
|
||||
usage=usage,
|
||||
)
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
|
||||
class DatabricksEmbeddingConfig:
|
||||
"""
|
||||
|
@ -236,7 +177,9 @@ async def make_call(
|
|||
if response.status_code != 200:
|
||||
raise DatabricksError(status_code=response.status_code, message=response.text)
|
||||
|
||||
completion_stream = response.aiter_lines()
|
||||
completion_stream = ModelResponseIterator(
|
||||
streaming_response=response.aiter_lines(), sync_stream=False
|
||||
)
|
||||
# LOGGING
|
||||
logging_obj.post_call(
|
||||
input=messages,
|
||||
|
@ -248,6 +191,38 @@ async def make_call(
|
|||
return completion_stream
|
||||
|
||||
|
||||
def make_sync_call(
|
||||
client: Optional[HTTPHandler],
|
||||
api_base: str,
|
||||
headers: dict,
|
||||
data: str,
|
||||
model: str,
|
||||
messages: list,
|
||||
logging_obj,
|
||||
):
|
||||
if client is None:
|
||||
client = HTTPHandler() # Create a new client if none provided
|
||||
|
||||
response = client.post(api_base, headers=headers, data=data, stream=True)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise DatabricksError(status_code=response.status_code, message=response.read())
|
||||
|
||||
completion_stream = ModelResponseIterator(
|
||||
streaming_response=response.iter_lines(), sync_stream=True
|
||||
)
|
||||
|
||||
# LOGGING
|
||||
logging_obj.post_call(
|
||||
input=messages,
|
||||
api_key="",
|
||||
original_response="first stream response received",
|
||||
additional_args={"complete_input_dict": data},
|
||||
)
|
||||
|
||||
return completion_stream
|
||||
|
||||
|
||||
class DatabricksChatCompletion(BaseLLM):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
@ -259,6 +234,7 @@ class DatabricksChatCompletion(BaseLLM):
|
|||
api_key: Optional[str],
|
||||
api_base: Optional[str],
|
||||
endpoint_type: Literal["chat_completions", "embeddings"],
|
||||
custom_endpoint: Optional[bool],
|
||||
) -> Tuple[str, dict]:
|
||||
if api_key is None:
|
||||
raise DatabricksError(
|
||||
|
@ -277,9 +253,9 @@ class DatabricksChatCompletion(BaseLLM):
|
|||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
if endpoint_type == "chat_completions":
|
||||
if endpoint_type == "chat_completions" and custom_endpoint is not True:
|
||||
api_base = "{}/chat/completions".format(api_base)
|
||||
elif endpoint_type == "embeddings":
|
||||
elif endpoint_type == "embeddings" and custom_endpoint is not True:
|
||||
api_base = "{}/embeddings".format(api_base)
|
||||
return api_base, headers
|
||||
|
||||
|
@ -464,8 +440,12 @@ class DatabricksChatCompletion(BaseLLM):
|
|||
timeout: Optional[Union[float, httpx.Timeout]] = None,
|
||||
client: Optional[Union[HTTPHandler, AsyncHTTPHandler]] = None,
|
||||
):
|
||||
custom_endpoint: Optional[bool] = optional_params.pop("custom_endpoint", None)
|
||||
api_base, headers = self._validate_environment(
|
||||
api_base=api_base, api_key=api_key, endpoint_type="chat_completions"
|
||||
api_base=api_base,
|
||||
api_key=api_key,
|
||||
endpoint_type="chat_completions",
|
||||
custom_endpoint=custom_endpoint,
|
||||
)
|
||||
## Load Config
|
||||
config = litellm.DatabricksConfig().get_config()
|
||||
|
@ -475,7 +455,8 @@ class DatabricksChatCompletion(BaseLLM):
|
|||
): # completion(top_k=3) > anthropic_config(top_k=3) <- allows for dynamic variables to be passed in
|
||||
optional_params[k] = v
|
||||
|
||||
stream = optional_params.pop("stream", None)
|
||||
stream: bool = optional_params.pop("stream", None) or False
|
||||
optional_params["stream"] = stream
|
||||
|
||||
data = {
|
||||
"model": model,
|
||||
|
@ -539,41 +520,26 @@ class DatabricksChatCompletion(BaseLLM):
|
|||
timeout=timeout,
|
||||
)
|
||||
else:
|
||||
if client is None or isinstance(client, AsyncHTTPHandler):
|
||||
self.client = HTTPHandler(timeout=timeout) # type: ignore
|
||||
else:
|
||||
self.client = client
|
||||
if client is None or not isinstance(client, HTTPHandler):
|
||||
client = HTTPHandler(timeout=timeout) # type: ignore
|
||||
## COMPLETION CALL
|
||||
if (
|
||||
stream is not None and stream == True
|
||||
): # if function call - fake the streaming (need complete blocks for output parsing in openai format)
|
||||
print_verbose("makes dbrx streaming POST request")
|
||||
data["stream"] = stream
|
||||
try:
|
||||
response = self.client.post(
|
||||
api_base, headers=headers, data=json.dumps(data), stream=stream
|
||||
)
|
||||
response.raise_for_status()
|
||||
completion_stream = response.iter_lines()
|
||||
except httpx.HTTPStatusError as e:
|
||||
raise DatabricksError(
|
||||
status_code=e.response.status_code, message=response.text
|
||||
)
|
||||
except httpx.TimeoutException as e:
|
||||
raise DatabricksError(
|
||||
status_code=408, message="Timeout error occurred."
|
||||
)
|
||||
except Exception as e:
|
||||
raise DatabricksError(status_code=408, message=str(e))
|
||||
|
||||
streaming_response = CustomStreamWrapper(
|
||||
completion_stream=completion_stream,
|
||||
if stream is True:
|
||||
return CustomStreamWrapper(
|
||||
completion_stream=None,
|
||||
make_call=partial(
|
||||
make_sync_call,
|
||||
client=None,
|
||||
api_base=api_base,
|
||||
headers=headers, # type: ignore
|
||||
data=json.dumps(data),
|
||||
model=model,
|
||||
messages=messages,
|
||||
logging_obj=logging_obj,
|
||||
),
|
||||
model=model,
|
||||
custom_llm_provider="databricks",
|
||||
custom_llm_provider="vertex_ai_beta",
|
||||
logging_obj=logging_obj,
|
||||
)
|
||||
return streaming_response
|
||||
|
||||
else:
|
||||
try:
|
||||
response = self.client.post(
|
||||
|
@ -667,7 +633,10 @@ class DatabricksChatCompletion(BaseLLM):
|
|||
aembedding=None,
|
||||
) -> EmbeddingResponse:
|
||||
api_base, headers = self._validate_environment(
|
||||
api_base=api_base, api_key=api_key, endpoint_type="embeddings"
|
||||
api_base=api_base,
|
||||
api_key=api_key,
|
||||
endpoint_type="embeddings",
|
||||
custom_endpoint=False,
|
||||
)
|
||||
model = model
|
||||
data = {"model": model, "input": input, **optional_params}
|
||||
|
@ -716,3 +685,126 @@ class DatabricksChatCompletion(BaseLLM):
|
|||
)
|
||||
|
||||
return litellm.EmbeddingResponse(**response_json)
|
||||
|
||||
|
||||
class ModelResponseIterator:
|
||||
def __init__(self, streaming_response, sync_stream: bool):
|
||||
self.streaming_response = streaming_response
|
||||
|
||||
def chunk_parser(self, chunk: dict) -> GenericStreamingChunk:
|
||||
try:
|
||||
processed_chunk = litellm.ModelResponse(**chunk, stream=True) # type: ignore
|
||||
|
||||
text = ""
|
||||
tool_use: Optional[ChatCompletionToolCallChunk] = None
|
||||
is_finished = False
|
||||
finish_reason = ""
|
||||
usage: Optional[ChatCompletionUsageBlock] = None
|
||||
|
||||
if processed_chunk.choices[0].delta.content is not None: # type: ignore
|
||||
text = processed_chunk.choices[0].delta.content # type: ignore
|
||||
|
||||
if (
|
||||
processed_chunk.choices[0].delta.tool_calls is not None # type: ignore
|
||||
and len(processed_chunk.choices[0].delta.tool_calls) > 0 # type: ignore
|
||||
and processed_chunk.choices[0].delta.tool_calls[0].function is not None # type: ignore
|
||||
and processed_chunk.choices[0].delta.tool_calls[0].function.arguments # type: ignore
|
||||
is not None
|
||||
):
|
||||
tool_use = ChatCompletionToolCallChunk(
|
||||
id=processed_chunk.choices[0].delta.tool_calls[0].id, # type: ignore
|
||||
type="function",
|
||||
function=ChatCompletionToolCallFunctionChunk(
|
||||
name=processed_chunk.choices[0]
|
||||
.delta.tool_calls[0] # type: ignore
|
||||
.function.name,
|
||||
arguments=processed_chunk.choices[0]
|
||||
.delta.tool_calls[0] # type: ignore
|
||||
.function.arguments,
|
||||
),
|
||||
index=processed_chunk.choices[0].index,
|
||||
)
|
||||
|
||||
if processed_chunk.choices[0].finish_reason is not None:
|
||||
is_finished = True
|
||||
finish_reason = processed_chunk.choices[0].finish_reason
|
||||
|
||||
if hasattr(processed_chunk, "usage"):
|
||||
usage = processed_chunk.usage # type: ignore
|
||||
|
||||
return GenericStreamingChunk(
|
||||
text=text,
|
||||
tool_use=tool_use,
|
||||
is_finished=is_finished,
|
||||
finish_reason=finish_reason,
|
||||
usage=usage,
|
||||
index=0,
|
||||
)
|
||||
except json.JSONDecodeError:
|
||||
raise ValueError(f"Failed to decode JSON from chunk: {chunk}")
|
||||
|
||||
# Sync iterator
|
||||
def __iter__(self):
|
||||
self.response_iterator = self.streaming_response
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
try:
|
||||
chunk = self.response_iterator.__next__()
|
||||
except StopIteration:
|
||||
raise StopIteration
|
||||
except ValueError as e:
|
||||
raise RuntimeError(f"Error receiving chunk from stream: {e}")
|
||||
|
||||
try:
|
||||
chunk = chunk.replace("data:", "")
|
||||
chunk = chunk.strip()
|
||||
if len(chunk) > 0:
|
||||
json_chunk = json.loads(chunk)
|
||||
return self.chunk_parser(chunk=json_chunk)
|
||||
else:
|
||||
return GenericStreamingChunk(
|
||||
text="",
|
||||
is_finished=False,
|
||||
finish_reason="",
|
||||
usage=None,
|
||||
index=0,
|
||||
tool_use=None,
|
||||
)
|
||||
except StopIteration:
|
||||
raise StopIteration
|
||||
except ValueError as e:
|
||||
raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
|
||||
|
||||
# Async iterator
|
||||
def __aiter__(self):
|
||||
self.async_response_iterator = self.streaming_response.__aiter__()
|
||||
return self
|
||||
|
||||
async def __anext__(self):
|
||||
try:
|
||||
chunk = await self.async_response_iterator.__anext__()
|
||||
except StopAsyncIteration:
|
||||
raise StopAsyncIteration
|
||||
except ValueError as e:
|
||||
raise RuntimeError(f"Error receiving chunk from stream: {e}")
|
||||
|
||||
try:
|
||||
chunk = chunk.replace("data:", "")
|
||||
chunk = chunk.strip()
|
||||
if len(chunk) > 0:
|
||||
json_chunk = json.loads(chunk)
|
||||
return self.chunk_parser(chunk=json_chunk)
|
||||
else:
|
||||
return GenericStreamingChunk(
|
||||
text="",
|
||||
is_finished=False,
|
||||
finish_reason="",
|
||||
usage=None,
|
||||
index=0,
|
||||
tool_use=None,
|
||||
)
|
||||
except StopAsyncIteration:
|
||||
raise StopAsyncIteration
|
||||
except ValueError as e:
|
||||
raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue