mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 19:24:27 +00:00
* refactor: cleanup unused variables + fix pyright errors * feat(health_check.py): Closes https://github.com/BerriAI/litellm/issues/5686 * fix(o1_reasoning.py): add stricter check for o-1 reasoning model * refactor(mistral/): make it easier to see mistral transformation logic * fix(openai.py): fix openai o-1 model param mapping Fixes https://github.com/BerriAI/litellm/issues/5685 * feat(main.py): infer finetuned gemini model from base model Fixes https://github.com/BerriAI/litellm/issues/5678 * docs(vertex.md): update docs to call finetuned gemini models * feat(proxy_server.py): allow admin to hide proxy model aliases Closes https://github.com/BerriAI/litellm/issues/5692 * docs(load_balancing.md): add docs on hiding alias models from proxy config * fix(base.py): don't raise notimplemented error * fix(user_api_key_auth.py): fix model max budget check * fix(router.py): fix elif * fix(user_api_key_auth.py): don't set team_id to empty str * fix(team_endpoints.py): fix response type * test(test_completion.py): handle predibase error * test(test_proxy_server.py): fix test * fix(o1_transformation.py): fix max_completion_token mapping * test(test_image_generation.py): mark flaky test
89 lines
2.6 KiB
Python
89 lines
2.6 KiB
Python
## This is a template base class to be used for adding new LLM providers via API calls
|
|
from typing import Any, Optional, Union
|
|
|
|
import httpx
|
|
import requests
|
|
|
|
import litellm
|
|
|
|
|
|
class BaseLLM:
|
|
|
|
_client_session: Optional[httpx.Client] = None
|
|
|
|
def process_response(
|
|
self,
|
|
model: str,
|
|
response: Union[requests.Response, httpx.Response],
|
|
model_response: litellm.utils.ModelResponse,
|
|
stream: bool,
|
|
logging_obj: Any,
|
|
optional_params: dict,
|
|
api_key: str,
|
|
data: Union[dict, str],
|
|
messages: list,
|
|
print_verbose,
|
|
encoding,
|
|
) -> Union[litellm.utils.ModelResponse, litellm.utils.CustomStreamWrapper]:
|
|
"""
|
|
Helper function to process the response across sync + async completion calls
|
|
"""
|
|
return model_response
|
|
|
|
def process_text_completion_response(
|
|
self,
|
|
model: str,
|
|
response: Union[requests.Response, httpx.Response],
|
|
model_response: litellm.utils.TextCompletionResponse,
|
|
stream: bool,
|
|
logging_obj: Any,
|
|
optional_params: dict,
|
|
api_key: str,
|
|
data: Union[dict, str],
|
|
messages: list,
|
|
print_verbose,
|
|
encoding,
|
|
) -> Union[litellm.utils.TextCompletionResponse, litellm.utils.CustomStreamWrapper]:
|
|
"""
|
|
Helper function to process the response across sync + async completion calls
|
|
"""
|
|
return model_response
|
|
|
|
def create_client_session(self):
|
|
if litellm.client_session:
|
|
_client_session = litellm.client_session
|
|
else:
|
|
_client_session = httpx.Client()
|
|
|
|
return _client_session
|
|
|
|
def create_aclient_session(self):
|
|
if litellm.aclient_session:
|
|
_aclient_session = litellm.aclient_session
|
|
else:
|
|
_aclient_session = httpx.AsyncClient()
|
|
|
|
return _aclient_session
|
|
|
|
def __exit__(self):
|
|
if hasattr(self, "_client_session") and self._client_session is not None:
|
|
self._client_session.close()
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if hasattr(self, "_aclient_session"):
|
|
await self._aclient_session.aclose() # type: ignore
|
|
|
|
def validate_environment(
|
|
self, *args, **kwargs
|
|
) -> Optional[Any]: # set up the environment required to run the model
|
|
return None
|
|
|
|
def completion(
|
|
self, *args, **kwargs
|
|
) -> Any: # logic for parsing in - calling - parsing out model completion calls
|
|
return None
|
|
|
|
def embedding(
|
|
self, *args, **kwargs
|
|
) -> Any: # logic for parsing in - calling - parsing out model embedding calls
|
|
return None
|