feat(databricks.py): add embedding model support

This commit is contained in:
Krrish Dholakia 2024-05-23 18:22:03 -07:00
parent d2229dcd21
commit 43353c28b3
7 changed files with 310 additions and 18 deletions

View file

@ -5,8 +5,14 @@ import json
from enum import Enum
import requests, copy # type: ignore
import time
from typing import Callable, Optional, List, Union, Tuple
from litellm.utils import ModelResponse, Usage, map_finish_reason, CustomStreamWrapper
from typing import Callable, Optional, List, Union, Tuple, Literal
from litellm.utils import (
ModelResponse,
Usage,
map_finish_reason,
CustomStreamWrapper,
EmbeddingResponse,
)
import litellm
from .prompt_templates.factory import prompt_factory, custom_prompt
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
@ -155,6 +161,48 @@ class DatabricksConfig:
raise e
class DatabricksEmbeddingConfig:
"""
Reference: https://learn.microsoft.com/en-us/azure/databricks/machine-learning/foundation-models/api-reference#--embedding-task
"""
instruction: Optional[str] = (
None # An optional instruction to pass to the embedding model. BGE Authors recommend 'Represent this sentence for searching relevant passages:' for retrieval queries
)
def __init__(self, instruction: Optional[str] = None) -> None:
locals_ = locals()
for key, value in locals_.items():
if key != "self" and value is not None:
setattr(self.__class__, key, value)
@classmethod
def get_config(cls):
return {
k: v
for k, v in cls.__dict__.items()
if not k.startswith("__")
and not isinstance(
v,
(
types.FunctionType,
types.BuiltinFunctionType,
classmethod,
staticmethod,
),
)
and v is not None
}
def get_supported_openai_params(
self,
): # no optional openai embedding params supported
return []
def map_openai_params(self, non_default_params: dict, optional_params: dict):
return optional_params
class DatabricksChatCompletion(BaseLLM):
def __init__(self) -> None:
super().__init__()
@ -162,7 +210,10 @@ class DatabricksChatCompletion(BaseLLM):
# makes headers for API call
def _validate_environment(
self, api_key: Optional[str], api_base: Optional[str]
self,
api_key: Optional[str],
api_base: Optional[str],
endpoint_type: Literal["chat_completions", "embeddings"],
) -> Tuple[str, dict]:
if api_key is None:
raise DatabricksError(
@ -181,7 +232,10 @@ class DatabricksChatCompletion(BaseLLM):
"Content-Type": "application/json",
}
api_base = "{}/chat/completions".format(api_base)
if endpoint_type == "chat_completions":
api_base = "{}/chat/completions".format(api_base)
elif endpoint_type == "embeddings":
api_base = "{}/embeddings".format(api_base)
return api_base, headers
def process_response(
@ -374,7 +428,7 @@ class DatabricksChatCompletion(BaseLLM):
client: Optional[Union[HTTPHandler, AsyncHTTPHandler]] = None,
):
api_base, headers = self._validate_environment(
api_base=api_base, api_key=api_key
api_base=api_base, api_key=api_key, endpoint_type="chat_completions"
)
## Load Config
config = litellm.DatabricksConfig().get_config()
@ -501,6 +555,124 @@ class DatabricksChatCompletion(BaseLLM):
return ModelResponse(**response_json)
def embedding(self):
# logic for parsing in - calling - parsing out model embedding calls
pass
async def aembedding(
self,
input: list,
data: dict,
model_response: ModelResponse,
timeout: float,
api_key: str,
api_base: str,
logging_obj,
headers: dict,
client=None,
) -> EmbeddingResponse:
response = None
try:
if client is None or isinstance(client, AsyncHTTPHandler):
self.async_client = AsyncHTTPHandler(timeout=timeout) # type: ignore
else:
self.async_client = client
try:
response = await self.async_client.post(
api_base,
headers=headers,
data=json.dumps(data),
) # type: ignore
response.raise_for_status()
response_json = response.json()
except httpx.HTTPStatusError as e:
raise DatabricksError(
status_code=e.response.status_code,
message=response.text if response else str(e),
)
except httpx.TimeoutException as e:
raise DatabricksError(
status_code=408, message="Timeout error occurred."
)
except Exception as e:
raise DatabricksError(status_code=500, message=str(e))
## LOGGING
logging_obj.post_call(
input=input,
api_key=api_key,
additional_args={"complete_input_dict": data},
original_response=response_json,
)
return EmbeddingResponse(**response_json)
except Exception as e:
## LOGGING
logging_obj.post_call(
input=input,
api_key=api_key,
original_response=str(e),
)
raise e
def embedding(
self,
model: str,
input: list,
timeout: float,
logging_obj,
api_key: Optional[str],
api_base: Optional[str],
optional_params: dict,
model_response: Optional[litellm.utils.EmbeddingResponse] = None,
client=None,
aembedding=None,
) -> EmbeddingResponse:
api_base, headers = self._validate_environment(
api_base=api_base, api_key=api_key, endpoint_type="embeddings"
)
model = model
data = {"model": model, "input": input, **optional_params}
## LOGGING
logging_obj.pre_call(
input=input,
api_key=api_key,
additional_args={"complete_input_dict": data, "api_base": api_base},
)
if aembedding == True:
return self.aembedding(data=data, input=input, logging_obj=logging_obj, model_response=model_response, api_base=api_base, api_key=api_key, timeout=timeout, client=client, headers=headers) # type: ignore
if client is None or isinstance(client, AsyncHTTPHandler):
self.client = HTTPHandler(timeout=timeout) # type: ignore
else:
self.client = client
## EMBEDDING CALL
try:
response = self.client.post(
api_base,
headers=headers,
data=json.dumps(data),
) # type: ignore
response.raise_for_status() # type: ignore
response_json = response.json() # type: ignore
except httpx.HTTPStatusError as e:
raise DatabricksError(
status_code=e.response.status_code,
message=response.text if response else str(e),
)
except httpx.TimeoutException as e:
raise DatabricksError(status_code=408, message="Timeout error occurred.")
except Exception as e:
raise DatabricksError(status_code=500, message=str(e))
## LOGGING
logging_obj.post_call(
input=input,
api_key=api_key,
additional_args={"complete_input_dict": data},
original_response=response_json,
)
return litellm.EmbeddingResponse(**response_json)