mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 18:54:30 +00:00
* build(pyproject.toml): add new dev dependencies - for type checking * build: reformat files to fit black * ci: reformat to fit black * ci(test-litellm.yml): make tests run clear * build(pyproject.toml): add ruff * fix: fix ruff checks * build(mypy/): fix mypy linting errors * fix(hashicorp_secret_manager.py): fix passing cert for tls auth * build(mypy/): resolve all mypy errors * test: update test * fix: fix black formatting * build(pre-commit-config.yaml): use poetry run black * fix(proxy_server.py): fix linting error * fix: fix ruff safe representation error
318 lines
12 KiB
Python
318 lines
12 KiB
Python
import json
|
|
from typing import Callable, List, Optional, Union
|
|
|
|
from openai import AsyncOpenAI, OpenAI
|
|
|
|
import litellm
|
|
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
|
|
from litellm.litellm_core_utils.streaming_handler import CustomStreamWrapper
|
|
from litellm.llms.base import BaseLLM
|
|
from litellm.types.llms.openai import AllMessageValues, OpenAITextCompletionUserMessage
|
|
from litellm.types.utils import LlmProviders, ModelResponse, TextCompletionResponse
|
|
from litellm.utils import ProviderConfigManager
|
|
|
|
from ..common_utils import OpenAIError
|
|
from .transformation import OpenAITextCompletionConfig
|
|
|
|
|
|
class OpenAITextCompletion(BaseLLM):
|
|
openai_text_completion_global_config = OpenAITextCompletionConfig()
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
def validate_environment(self, api_key):
|
|
headers = {
|
|
"content-type": "application/json",
|
|
}
|
|
if api_key:
|
|
headers["Authorization"] = f"Bearer {api_key}"
|
|
return headers
|
|
|
|
def completion(
|
|
self,
|
|
model_response: ModelResponse,
|
|
api_key: str,
|
|
model: str,
|
|
messages: Union[List[AllMessageValues], List[OpenAITextCompletionUserMessage]],
|
|
timeout: float,
|
|
custom_llm_provider: str,
|
|
logging_obj: LiteLLMLoggingObj,
|
|
optional_params: dict,
|
|
print_verbose: Optional[Callable] = None,
|
|
api_base: Optional[str] = None,
|
|
acompletion: bool = False,
|
|
litellm_params=None,
|
|
logger_fn=None,
|
|
client=None,
|
|
organization: Optional[str] = None,
|
|
headers: Optional[dict] = None,
|
|
):
|
|
try:
|
|
if headers is None:
|
|
headers = self.validate_environment(api_key=api_key)
|
|
if model is None or messages is None:
|
|
raise OpenAIError(status_code=422, message="Missing model or messages")
|
|
|
|
# don't send max retries to the api, if set
|
|
|
|
provider_config = ProviderConfigManager.get_provider_text_completion_config(
|
|
model=model,
|
|
provider=LlmProviders(custom_llm_provider),
|
|
)
|
|
|
|
data = provider_config.transform_text_completion_request(
|
|
model=model,
|
|
messages=messages,
|
|
optional_params=optional_params,
|
|
headers=headers,
|
|
)
|
|
max_retries = data.pop("max_retries", 2)
|
|
## LOGGING
|
|
logging_obj.pre_call(
|
|
input=messages,
|
|
api_key=api_key,
|
|
additional_args={
|
|
"headers": headers,
|
|
"api_base": api_base,
|
|
"complete_input_dict": data,
|
|
},
|
|
)
|
|
if acompletion is True:
|
|
if optional_params.get("stream", False):
|
|
return self.async_streaming(
|
|
logging_obj=logging_obj,
|
|
api_base=api_base,
|
|
api_key=api_key,
|
|
data=data,
|
|
headers=headers,
|
|
model_response=model_response,
|
|
model=model,
|
|
timeout=timeout,
|
|
max_retries=max_retries,
|
|
client=client,
|
|
organization=organization,
|
|
)
|
|
else:
|
|
return self.acompletion(api_base=api_base, data=data, headers=headers, model_response=model_response, api_key=api_key, logging_obj=logging_obj, model=model, timeout=timeout, max_retries=max_retries, organization=organization, client=client) # type: ignore
|
|
elif optional_params.get("stream", False):
|
|
return self.streaming(
|
|
logging_obj=logging_obj,
|
|
api_base=api_base,
|
|
api_key=api_key,
|
|
data=data,
|
|
headers=headers,
|
|
model_response=model_response,
|
|
model=model,
|
|
timeout=timeout,
|
|
max_retries=max_retries, # type: ignore
|
|
client=client,
|
|
organization=organization,
|
|
)
|
|
else:
|
|
if client is None:
|
|
openai_client = OpenAI(
|
|
api_key=api_key,
|
|
base_url=api_base,
|
|
http_client=litellm.client_session,
|
|
timeout=timeout,
|
|
max_retries=max_retries, # type: ignore
|
|
organization=organization,
|
|
)
|
|
else:
|
|
openai_client = client
|
|
|
|
raw_response = openai_client.completions.with_raw_response.create(**data) # type: ignore
|
|
response = raw_response.parse()
|
|
response_json = response.model_dump()
|
|
|
|
## LOGGING
|
|
logging_obj.post_call(
|
|
api_key=api_key,
|
|
original_response=response_json,
|
|
additional_args={
|
|
"headers": headers,
|
|
"api_base": api_base,
|
|
},
|
|
)
|
|
|
|
## RESPONSE OBJECT
|
|
return TextCompletionResponse(**response_json)
|
|
except Exception as e:
|
|
status_code = getattr(e, "status_code", 500)
|
|
error_headers = getattr(e, "headers", None)
|
|
error_text = getattr(e, "text", str(e))
|
|
error_response = getattr(e, "response", None)
|
|
if error_headers is None and error_response:
|
|
error_headers = getattr(error_response, "headers", None)
|
|
raise OpenAIError(
|
|
status_code=status_code, message=error_text, headers=error_headers
|
|
)
|
|
|
|
async def acompletion(
|
|
self,
|
|
logging_obj,
|
|
api_base: str,
|
|
data: dict,
|
|
headers: dict,
|
|
model_response: ModelResponse,
|
|
api_key: str,
|
|
model: str,
|
|
timeout: float,
|
|
max_retries: int,
|
|
organization: Optional[str] = None,
|
|
client=None,
|
|
):
|
|
try:
|
|
if client is None:
|
|
openai_aclient = AsyncOpenAI(
|
|
api_key=api_key,
|
|
base_url=api_base,
|
|
http_client=litellm.aclient_session,
|
|
timeout=timeout,
|
|
max_retries=max_retries,
|
|
organization=organization,
|
|
)
|
|
else:
|
|
openai_aclient = client
|
|
|
|
raw_response = await openai_aclient.completions.with_raw_response.create(
|
|
**data
|
|
)
|
|
response = raw_response.parse()
|
|
response_json = response.model_dump()
|
|
|
|
## LOGGING
|
|
logging_obj.post_call(
|
|
api_key=api_key,
|
|
original_response=response,
|
|
additional_args={
|
|
"headers": headers,
|
|
"api_base": api_base,
|
|
},
|
|
)
|
|
## RESPONSE OBJECT
|
|
response_obj = TextCompletionResponse(**response_json)
|
|
response_obj._hidden_params.original_response = json.dumps(response_json)
|
|
return response_obj
|
|
except Exception as e:
|
|
status_code = getattr(e, "status_code", 500)
|
|
error_headers = getattr(e, "headers", None)
|
|
error_text = getattr(e, "text", str(e))
|
|
error_response = getattr(e, "response", None)
|
|
if error_headers is None and error_response:
|
|
error_headers = getattr(error_response, "headers", None)
|
|
raise OpenAIError(
|
|
status_code=status_code, message=error_text, headers=error_headers
|
|
)
|
|
|
|
def streaming(
|
|
self,
|
|
logging_obj,
|
|
api_key: str,
|
|
data: dict,
|
|
headers: dict,
|
|
model_response: ModelResponse,
|
|
model: str,
|
|
timeout: float,
|
|
api_base: Optional[str] = None,
|
|
max_retries=None,
|
|
client=None,
|
|
organization=None,
|
|
):
|
|
if client is None:
|
|
openai_client = OpenAI(
|
|
api_key=api_key,
|
|
base_url=api_base,
|
|
http_client=litellm.client_session,
|
|
timeout=timeout,
|
|
max_retries=max_retries, # type: ignore
|
|
organization=organization,
|
|
)
|
|
else:
|
|
openai_client = client
|
|
|
|
try:
|
|
raw_response = openai_client.completions.with_raw_response.create(**data)
|
|
response = raw_response.parse()
|
|
except Exception as e:
|
|
status_code = getattr(e, "status_code", 500)
|
|
error_headers = getattr(e, "headers", None)
|
|
error_text = getattr(e, "text", str(e))
|
|
error_response = getattr(e, "response", None)
|
|
if error_headers is None and error_response:
|
|
error_headers = getattr(error_response, "headers", None)
|
|
raise OpenAIError(
|
|
status_code=status_code, message=error_text, headers=error_headers
|
|
)
|
|
streamwrapper = CustomStreamWrapper(
|
|
completion_stream=response,
|
|
model=model,
|
|
custom_llm_provider="text-completion-openai",
|
|
logging_obj=logging_obj,
|
|
stream_options=data.get("stream_options", None),
|
|
)
|
|
|
|
try:
|
|
for chunk in streamwrapper:
|
|
yield chunk
|
|
except Exception as e:
|
|
status_code = getattr(e, "status_code", 500)
|
|
error_headers = getattr(e, "headers", None)
|
|
error_text = getattr(e, "text", str(e))
|
|
error_response = getattr(e, "response", None)
|
|
if error_headers is None and error_response:
|
|
error_headers = getattr(error_response, "headers", None)
|
|
raise OpenAIError(
|
|
status_code=status_code, message=error_text, headers=error_headers
|
|
)
|
|
|
|
async def async_streaming(
|
|
self,
|
|
logging_obj,
|
|
api_key: str,
|
|
data: dict,
|
|
headers: dict,
|
|
model_response: ModelResponse,
|
|
model: str,
|
|
timeout: float,
|
|
max_retries: int,
|
|
api_base: Optional[str] = None,
|
|
client=None,
|
|
organization=None,
|
|
):
|
|
if client is None:
|
|
openai_client = AsyncOpenAI(
|
|
api_key=api_key,
|
|
base_url=api_base,
|
|
http_client=litellm.aclient_session,
|
|
timeout=timeout,
|
|
max_retries=max_retries,
|
|
organization=organization,
|
|
)
|
|
else:
|
|
openai_client = client
|
|
|
|
raw_response = await openai_client.completions.with_raw_response.create(**data)
|
|
response = raw_response.parse()
|
|
streamwrapper = CustomStreamWrapper(
|
|
completion_stream=response,
|
|
model=model,
|
|
custom_llm_provider="text-completion-openai",
|
|
logging_obj=logging_obj,
|
|
stream_options=data.get("stream_options", None),
|
|
)
|
|
|
|
try:
|
|
async for transformed_chunk in streamwrapper:
|
|
yield transformed_chunk
|
|
except Exception as e:
|
|
status_code = getattr(e, "status_code", 500)
|
|
error_headers = getattr(e, "headers", None)
|
|
error_text = getattr(e, "text", str(e))
|
|
error_response = getattr(e, "response", None)
|
|
if error_headers is None and error_response:
|
|
error_headers = getattr(error_response, "headers", None)
|
|
raise OpenAIError(
|
|
status_code=status_code, message=error_text, headers=error_headers
|
|
)
|