litellm-mirror/litellm/llms/snowflake/chat/transformation.py
2025-03-11 22:21:12 -04:00

217 lines
No EOL
6.9 KiB
Python

'''
Support for Snowflake REST API
'''
import httpx
from typing import List, Optional, Tuple, Any, TYPE_CHECKING
from litellm.types.llms.openai import AllMessageValues
from litellm.utils import get_secret
from litellm.types.utils import ModelResponse
from litellm.types.llms.openai import ChatCompletionAssistantMessage
from litellm.llms.databricks.streaming_utils import ModelResponseIterator
from ...openai_like.chat.transformation import OpenAIGPTConfig
if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import Logging as _LiteLLMLoggingObj
LiteLLMLoggingObj = _LiteLLMLoggingObj
else:
LiteLLMLoggingObj = Any
class SnowflakeConfig(OpenAIGPTConfig):
"""
source: https://docs.snowflake.com/en/sql-reference/functions/complete-snowflake-cortex
"""
@classmethod
def get_config(cls):
return super().get_config()
def get_supported_openai_params(self, model: str) -> List:
return [
"temperature",
"max_tokens",
"top_p",
"response_format"
]
def map_openai_params(
self,
non_default_params: dict,
optional_params: dict,
model: str,
drop_params: bool,
) -> dict:
"""
If any supported_openai_params are in non_default_params, add them to optional_params, so they are used in API call
Args:
non_default_params (dict): Non-default parameters to filter.
optional_params (dict): Optional parameters to update.
model (str): Model name for parameter support check.
Returns:
dict: Updated optional_params with supported non-default parameters.
"""
supported_openai_params = self.get_supported_openai_params(model)
for param, value in non_default_params.items():
if param in supported_openai_params:
optional_params[param] = value
return optional_params
def _convert_tool_response_to_message(
message: ChatCompletionAssistantMessage, json_mode: bool
) -> ChatCompletionAssistantMessage:
"""
if json_mode is true, convert the returned tool call response to a content with json str
e.g. input:
{"role": "assistant", "tool_calls": [{"id": "call_5ms4", "type": "function", "function": {"name": "json_tool_call", "arguments": "{\"key\": \"question\", \"value\": \"What is the capital of France?\"}"}}]}
output:
{"role": "assistant", "content": "{\"key\": \"question\", \"value\": \"What is the capital of France?\"}"}
"""
if not json_mode:
return message
_tool_calls = message.get("tool_calls")
if _tool_calls is None or len(_tool_calls) != 1:
return message
message["content"] = _tool_calls[0]["function"].get("arguments") or ""
message["tool_calls"] = None
return message
@staticmethod
def transform_response(
model: str,
raw_response: httpx.Response,
model_response: ModelResponse,
logging_obj: LiteLLMLoggingObj,
request_data: dict,
messages: List[AllMessageValues],
optional_params: dict,
litellm_params: dict,
encoding: Any,
api_key: Optional[str] = None,
json_mode: Optional[bool] = None,
) -> ModelResponse:
response_json = raw_response.json()
logging_obj.post_call(
input=messages,
api_key="",
original_response=response_json,
additional_args={"complete_input_dict": request_data},
)
if json_mode:
for choice in response_json["choices"]:
message = SnowflakeConfig._convert_tool_response_to_message(
choice.get("message"), json_mode
)
choice["message"] = message
returned_response = ModelResponse(**response_json)
returned_response.model = (
"snowflake/" + (returned_response.model or "")
)
if model is not None:
returned_response._hidden_params["model"] = model
return returned_response
def validate_environment(
self,
headers: dict,
model: str,
api_base: str = None,
api_key: Optional[str] = None,
messages: dict = None,
optional_params: dict = None,
) -> dict:
"""
Return headers to use for Snowflake completion request
Snowflake REST API Ref: https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-llm-rest-api#api-reference
Expected headers:
{
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": "Bearer " + <JWT>,
"X-Snowflake-Authorization-Token-Type": "KEYPAIR_JWT"
}
"""
if api_key is None:
raise ValueError(
"Missing Snowflake JWT key"
)
headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": "Bearer " + api_key,
"X-Snowflake-Authorization-Token-Type": "KEYPAIR_JWT"
}
)
return headers
def _get_openai_compatible_provider_info(
self, api_base: Optional[str], api_key: Optional[str]
) -> Tuple[Optional[str], Optional[str]]:
api_base = (
api_base
or f"""https://{get_secret("SNOWFLAKE_ACCOUNT_ID")}.snowflakecomputing.com/api/v2/cortex/inference:complete"""
or get_secret("SNOWFLAKE_API_BASE")
) # type: ignore
dynamic_api_key = api_key or get_secret("SNOWFLAKE_JWT")
return api_base, dynamic_api_key
def get_complete_url(
self,
api_base: Optional[str],
model: str,
optional_params: dict,
stream: Optional[bool] = None,
) -> str:
"""
If api_base is not provided, use the default DeepSeek /chat/completions endpoint.
"""
if not api_base:
api_base = f"""https://{get_secret("SNOWFLAKE_ACCOUNT_ID")}.snowflakecomputing.com/api/v2/cortex/inference:complete"""
return api_base
def transform_request(
self,
model: str,
messages: dict ,
optional_params: dict,
litellm_params: dict,
headers: dict
) -> dict:
stream: bool = optional_params.pop("stream", None) or False
extra_body = optional_params.pop("extra_body", {})
return {
"model": model,
"messages": messages,
"stream": stream,
**optional_params,
**extra_body,
}
def get_model_response_iterator(
self,
streaming_response: ModelResponse,
sync_stream: bool,
):
return ModelResponseIterator(streaming_response=streaming_response, sync_stream=sync_stream)