Merge pull request #3888 from BerriAI/litellm_add_files_proxy

[Feat] LiteLLM Proxy Add `POST /v1/files` and `GET /v1/files`
This commit is contained in:
Ishaan Jaff 2024-05-28 22:36:33 -07:00 committed by GitHub
commit d8245cbccb
6 changed files with 680 additions and 2 deletions

View file

@ -30,6 +30,8 @@ from ..types.llms.openai import (
FileTypes, FileTypes,
FileObject, FileObject,
Batch, Batch,
FileContentRequest,
HttpxBinaryResponseContent,
) )
####### ENVIRONMENT VARIABLES ################### ####### ENVIRONMENT VARIABLES ###################
@ -170,6 +172,134 @@ def create_file(
raise e raise e
async def afile_content(
file_id: str,
custom_llm_provider: Literal["openai"] = "openai",
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
) -> Coroutine[Any, Any, HttpxBinaryResponseContent]:
"""
Async: Get file contents
LiteLLM Equivalent of GET https://api.openai.com/v1/files
"""
try:
loop = asyncio.get_event_loop()
kwargs["afile_content"] = True
# Use a partial function to pass your keyword arguments
func = partial(
file_content,
file_id,
custom_llm_provider,
extra_headers,
extra_body,
**kwargs,
)
# Add the context to the function
ctx = contextvars.copy_context()
func_with_context = partial(ctx.run, func)
init_response = await loop.run_in_executor(None, func_with_context)
if asyncio.iscoroutine(init_response):
response = await init_response
else:
response = init_response # type: ignore
return response
except Exception as e:
raise e
def file_content(
file_id: str,
custom_llm_provider: Literal["openai"] = "openai",
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
) -> Union[HttpxBinaryResponseContent, Coroutine[Any, Any, HttpxBinaryResponseContent]]:
"""
Returns the contents of the specified file.
LiteLLM Equivalent of POST: POST https://api.openai.com/v1/files
"""
try:
optional_params = GenericLiteLLMParams(**kwargs)
if custom_llm_provider == "openai":
# for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there
api_base = (
optional_params.api_base
or litellm.api_base
or os.getenv("OPENAI_API_BASE")
or "https://api.openai.com/v1"
)
organization = (
optional_params.organization
or litellm.organization
or os.getenv("OPENAI_ORGANIZATION", None)
or None # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105
)
# set API KEY
api_key = (
optional_params.api_key
or litellm.api_key # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there
or litellm.openai_key
or os.getenv("OPENAI_API_KEY")
)
### TIMEOUT LOGIC ###
timeout = (
optional_params.timeout or kwargs.get("request_timeout", 600) or 600
)
# set timeout for 10 minutes by default
if (
timeout is not None
and isinstance(timeout, httpx.Timeout)
and supports_httpx_timeout(custom_llm_provider) == False
):
read_timeout = timeout.read or 600
timeout = read_timeout # default 10 min timeout
elif timeout is not None and not isinstance(timeout, httpx.Timeout):
timeout = float(timeout) # type: ignore
elif timeout is None:
timeout = 600.0
_file_content_request = FileContentRequest(
file_id=file_id,
extra_headers=extra_headers,
extra_body=extra_body,
)
_is_async = kwargs.pop("afile_content", False) is True
response = openai_files_instance.file_content(
_is_async=_is_async,
file_content_request=_file_content_request,
api_base=api_base,
api_key=api_key,
timeout=timeout,
max_retries=optional_params.max_retries,
organization=organization,
)
else:
raise litellm.exceptions.BadRequestError(
message="LiteLLM doesn't support {} for 'create_batch'. Only 'openai' is supported.".format(
custom_llm_provider
),
model="n/a",
llm_provider=custom_llm_provider,
response=httpx.Response(
status_code=400,
content="Unsupported provider",
request=httpx.Request(method="create_thread", url="https://github.com/BerriAI/litellm"), # type: ignore
),
)
return response
except Exception as e:
raise e
async def acreate_batch( async def acreate_batch(
completion_window: Literal["24h"], completion_window: Literal["24h"],
endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"], endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"],

View file

@ -1585,6 +1585,54 @@ class OpenAIFilesAPI(BaseLLM):
response = openai_client.files.create(**create_file_data) response = openai_client.files.create(**create_file_data)
return response return response
async def afile_content(
self,
file_content_request: FileContentRequest,
openai_client: AsyncOpenAI,
) -> HttpxBinaryResponseContent:
response = await openai_client.files.content(**file_content_request)
return response
def file_content(
self,
_is_async: bool,
file_content_request: FileContentRequest,
api_base: str,
api_key: Optional[str],
timeout: Union[float, httpx.Timeout],
max_retries: Optional[int],
organization: Optional[str],
client: Optional[Union[OpenAI, AsyncOpenAI]] = None,
) -> Union[
HttpxBinaryResponseContent, Coroutine[Any, Any, HttpxBinaryResponseContent]
]:
openai_client: Optional[Union[OpenAI, AsyncOpenAI]] = self.get_openai_client(
api_key=api_key,
api_base=api_base,
timeout=timeout,
max_retries=max_retries,
organization=organization,
client=client,
_is_async=_is_async,
)
if openai_client is None:
raise ValueError(
"OpenAI client is not initialized. Make sure api_key is passed or OPENAI_API_KEY is set in the environment."
)
if _is_async is True:
if not isinstance(openai_client, AsyncOpenAI):
raise ValueError(
"OpenAI client is not an instance of AsyncOpenAI. Make sure you passed an AsyncOpenAI client."
)
return self.afile_content( # type: ignore
file_content_request=file_content_request,
openai_client=openai_client,
)
response = openai_client.files.content(**file_content_request)
return response
class OpenAIBatchesAPI(BaseLLM): class OpenAIBatchesAPI(BaseLLM):
""" """

View file

@ -99,6 +99,14 @@ class LiteLLMRoutes(enum.Enum):
# moderations # moderations
"/moderations", "/moderations",
"/v1/moderations", "/v1/moderations",
# batches
"/v1/batches",
"/batches",
"/v1/batches{batch_id}",
"/batches{batch_id}",
# files
"/v1/files",
"/files",
# models # models
"/models", "/models",
"/v1/models", "/v1/models",
@ -1215,6 +1223,7 @@ class InvitationModel(LiteLLMBase):
updated_at: datetime updated_at: datetime
updated_by: str updated_by: str
class ConfigFieldInfo(LiteLLMBase): class ConfigFieldInfo(LiteLLMBase):
field_name: str field_name: str
field_value: Any field_value: Any

View file

@ -100,6 +100,13 @@ from litellm.proxy.utils import (
encrypt_value, encrypt_value,
decrypt_value, decrypt_value,
) )
from litellm import (
CreateBatchRequest,
RetrieveBatchRequest,
ListBatchRequest,
CancelBatchRequest,
CreateFileRequest,
)
from litellm.proxy.secret_managers.google_kms import load_google_kms from litellm.proxy.secret_managers.google_kms import load_google_kms
from litellm.proxy.secret_managers.aws_secret_manager import load_aws_secret_manager from litellm.proxy.secret_managers.aws_secret_manager import load_aws_secret_manager
import pydantic import pydantic
@ -142,6 +149,7 @@ from fastapi import (
Request, Request,
HTTPException, HTTPException,
status, status,
Path,
Depends, Depends,
Header, Header,
Response, Response,
@ -5027,6 +5035,447 @@ async def audio_transcriptions(
) )
######################################################################
# /v1/batches Endpoints
######################################################################
@router.post(
"/v1/batches",
dependencies=[Depends(user_api_key_auth)],
tags=["batch"],
)
@router.post(
"/batches",
dependencies=[Depends(user_api_key_auth)],
tags=["batch"],
)
async def create_batch(
request: Request,
fastapi_response: Response,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
):
"""
Create large batches of API requests for asynchronous processing.
This is the equivalent of POST https://api.openai.com/v1/batch
Supports Identical Params as: https://platform.openai.com/docs/api-reference/batch
Example Curl
```
curl http://localhost:4000/v1/batches \
-H "Authorization: Bearer sk-1234" \
-H "Content-Type: application/json" \
-d '{
"input_file_id": "file-abc123",
"endpoint": "/v1/chat/completions",
"completion_window": "24h"
}'
```
"""
global proxy_logging_obj
data: Dict = {}
try:
# Use orjson to parse JSON data, orjson speeds up requests significantly
form_data = await request.form()
data = {key: value for key, value in form_data.items() if key != "file"}
# Include original request and headers in the data
data["proxy_server_request"] = { # type: ignore
"url": str(request.url),
"method": request.method,
"headers": dict(request.headers),
"body": copy.copy(data), # use copy instead of deepcopy
}
if data.get("user", None) is None and user_api_key_dict.user_id is not None:
data["user"] = user_api_key_dict.user_id
if "metadata" not in data:
data["metadata"] = {}
data["metadata"]["user_api_key"] = user_api_key_dict.api_key
data["metadata"]["user_api_key_metadata"] = user_api_key_dict.metadata
_headers = dict(request.headers)
_headers.pop(
"authorization", None
) # do not store the original `sk-..` api key in the db
data["metadata"]["headers"] = _headers
data["metadata"]["user_api_key_alias"] = getattr(
user_api_key_dict, "key_alias", None
)
data["metadata"]["user_api_key_user_id"] = user_api_key_dict.user_id
data["metadata"]["user_api_key_team_id"] = getattr(
user_api_key_dict, "team_id", None
)
data["metadata"]["global_max_parallel_requests"] = general_settings.get(
"global_max_parallel_requests", None
)
data["metadata"]["user_api_key_team_alias"] = getattr(
user_api_key_dict, "team_alias", None
)
data["metadata"]["endpoint"] = str(request.url)
### TEAM-SPECIFIC PARAMS ###
if user_api_key_dict.team_id is not None:
team_config = await proxy_config.load_team_config(
team_id=user_api_key_dict.team_id
)
if len(team_config) == 0:
pass
else:
team_id = team_config.pop("team_id", None)
data["metadata"]["team_id"] = team_id
data = {
**team_config,
**data,
} # add the team-specific configs to the completion call
_create_batch_data = CreateBatchRequest(**data)
# for now use custom_llm_provider=="openai" -> this will change as LiteLLM adds more providers for acreate_batch
response = await litellm.acreate_batch(
custom_llm_provider="openai", **_create_batch_data
)
### ALERTING ###
data["litellm_status"] = "success" # used for alerting
### RESPONSE HEADERS ###
hidden_params = getattr(response, "_hidden_params", {}) or {}
model_id = hidden_params.get("model_id", None) or ""
cache_key = hidden_params.get("cache_key", None) or ""
api_base = hidden_params.get("api_base", None) or ""
fastapi_response.headers.update(
get_custom_headers(
user_api_key_dict=user_api_key_dict,
model_id=model_id,
cache_key=cache_key,
api_base=api_base,
version=version,
model_region=getattr(user_api_key_dict, "allowed_model_region", ""),
)
)
return response
except Exception as e:
data["litellm_status"] = "fail" # used for alerting
await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data
)
traceback.print_exc()
if isinstance(e, HTTPException):
raise ProxyException(
message=getattr(e, "message", str(e.detail)),
type=getattr(e, "type", "None"),
param=getattr(e, "param", "None"),
code=getattr(e, "status_code", status.HTTP_400_BAD_REQUEST),
)
else:
error_traceback = traceback.format_exc()
error_msg = f"{str(e)}"
raise ProxyException(
message=getattr(e, "message", error_msg),
type=getattr(e, "type", "None"),
param=getattr(e, "param", "None"),
code=getattr(e, "status_code", 500),
)
@router.get(
"/v1/batches{batch_id}",
dependencies=[Depends(user_api_key_auth)],
tags=["Batch"],
)
@router.get(
"/batches{batch_id}",
dependencies=[Depends(user_api_key_auth)],
tags=["Batch"],
)
async def retrieve_batch(
request: Request,
fastapi_response: Response,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
batch_id: str = Path(
title="Batch ID to retrieve", description="The ID of the batch to retrieve"
),
):
"""
Retrieves a batch.
This is the equivalent of GET https://api.openai.com/v1/batches/{batch_id}
Supports Identical Params as: https://platform.openai.com/docs/api-reference/batch/retrieve
Example Curl
```
curl http://localhost:4000/v1/batches/batch_abc123 \
-H "Authorization: Bearer sk-1234" \
-H "Content-Type: application/json" \
```
"""
global proxy_logging_obj
data: Dict = {}
try:
# Use orjson to parse JSON data, orjson speeds up requests significantly
form_data = await request.form()
data = {key: value for key, value in form_data.items() if key != "file"}
# Include original request and headers in the data
data["proxy_server_request"] = { # type: ignore
"url": str(request.url),
"method": request.method,
"headers": dict(request.headers),
"body": copy.copy(data), # use copy instead of deepcopy
}
if data.get("user", None) is None and user_api_key_dict.user_id is not None:
data["user"] = user_api_key_dict.user_id
if "metadata" not in data:
data["metadata"] = {}
data["metadata"]["user_api_key"] = user_api_key_dict.api_key
data["metadata"]["user_api_key_metadata"] = user_api_key_dict.metadata
_headers = dict(request.headers)
_headers.pop(
"authorization", None
) # do not store the original `sk-..` api key in the db
data["metadata"]["headers"] = _headers
data["metadata"]["user_api_key_alias"] = getattr(
user_api_key_dict, "key_alias", None
)
data["metadata"]["user_api_key_user_id"] = user_api_key_dict.user_id
data["metadata"]["user_api_key_team_id"] = getattr(
user_api_key_dict, "team_id", None
)
data["metadata"]["global_max_parallel_requests"] = general_settings.get(
"global_max_parallel_requests", None
)
data["metadata"]["user_api_key_team_alias"] = getattr(
user_api_key_dict, "team_alias", None
)
data["metadata"]["endpoint"] = str(request.url)
### TEAM-SPECIFIC PARAMS ###
if user_api_key_dict.team_id is not None:
team_config = await proxy_config.load_team_config(
team_id=user_api_key_dict.team_id
)
if len(team_config) == 0:
pass
else:
team_id = team_config.pop("team_id", None)
data["metadata"]["team_id"] = team_id
data = {
**team_config,
**data,
} # add the team-specific configs to the completion call
_retrieve_batch_request = RetrieveBatchRequest(
batch_id=batch_id,
)
# for now use custom_llm_provider=="openai" -> this will change as LiteLLM adds more providers for acreate_batch
response = await litellm.aretrieve_batch(
custom_llm_provider="openai", **_retrieve_batch_request
)
### ALERTING ###
data["litellm_status"] = "success" # used for alerting
### RESPONSE HEADERS ###
hidden_params = getattr(response, "_hidden_params", {}) or {}
model_id = hidden_params.get("model_id", None) or ""
cache_key = hidden_params.get("cache_key", None) or ""
api_base = hidden_params.get("api_base", None) or ""
fastapi_response.headers.update(
get_custom_headers(
user_api_key_dict=user_api_key_dict,
model_id=model_id,
cache_key=cache_key,
api_base=api_base,
version=version,
model_region=getattr(user_api_key_dict, "allowed_model_region", ""),
)
)
return response
except Exception as e:
data["litellm_status"] = "fail" # used for alerting
await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data
)
traceback.print_exc()
if isinstance(e, HTTPException):
raise ProxyException(
message=getattr(e, "message", str(e.detail)),
type=getattr(e, "type", "None"),
param=getattr(e, "param", "None"),
code=getattr(e, "status_code", status.HTTP_400_BAD_REQUEST),
)
else:
error_traceback = traceback.format_exc()
error_msg = f"{str(e)}"
raise ProxyException(
message=getattr(e, "message", error_msg),
type=getattr(e, "type", "None"),
param=getattr(e, "param", "None"),
code=getattr(e, "status_code", 500),
)
######################################################################
# END OF /v1/batches Endpoints Implementation
######################################################################
######################################################################
# /v1/files Endpoints
######################################################################
@router.post(
"/v1/files",
dependencies=[Depends(user_api_key_auth)],
tags=["files"],
)
@router.post(
"/files",
dependencies=[Depends(user_api_key_auth)],
tags=["files"],
)
async def create_file(
request: Request,
fastapi_response: Response,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
):
"""
Upload a file that can be used across - Assistants API, Batch API
This is the equivalent of POST https://api.openai.com/v1/files
Supports Identical Params as: https://platform.openai.com/docs/api-reference/files/create
Example Curl
```
curl https://api.openai.com/v1/files \
-H "Authorization: Bearer sk-1234" \
-F purpose="batch" \
-F file="@mydata.jsonl"
```
"""
global proxy_logging_obj
data: Dict = {}
try:
# Use orjson to parse JSON data, orjson speeds up requests significantly
form_data = await request.form()
data = {key: value for key, value in form_data.items() if key != "file"}
# Include original request and headers in the data
data["proxy_server_request"] = { # type: ignore
"url": str(request.url),
"method": request.method,
"headers": dict(request.headers),
"body": copy.copy(data), # use copy instead of deepcopy
}
if data.get("user", None) is None and user_api_key_dict.user_id is not None:
data["user"] = user_api_key_dict.user_id
if "metadata" not in data:
data["metadata"] = {}
data["metadata"]["user_api_key"] = user_api_key_dict.api_key
data["metadata"]["user_api_key_metadata"] = user_api_key_dict.metadata
_headers = dict(request.headers)
_headers.pop(
"authorization", None
) # do not store the original `sk-..` api key in the db
data["metadata"]["headers"] = _headers
data["metadata"]["user_api_key_alias"] = getattr(
user_api_key_dict, "key_alias", None
)
data["metadata"]["user_api_key_user_id"] = user_api_key_dict.user_id
data["metadata"]["user_api_key_team_id"] = getattr(
user_api_key_dict, "team_id", None
)
data["metadata"]["global_max_parallel_requests"] = general_settings.get(
"global_max_parallel_requests", None
)
data["metadata"]["user_api_key_team_alias"] = getattr(
user_api_key_dict, "team_alias", None
)
data["metadata"]["endpoint"] = str(request.url)
### TEAM-SPECIFIC PARAMS ###
if user_api_key_dict.team_id is not None:
team_config = await proxy_config.load_team_config(
team_id=user_api_key_dict.team_id
)
if len(team_config) == 0:
pass
else:
team_id = team_config.pop("team_id", None)
data["metadata"]["team_id"] = team_id
data = {
**team_config,
**data,
} # add the team-specific configs to the completion call
_create_file_request = CreateFileRequest()
# for now use custom_llm_provider=="openai" -> this will change as LiteLLM adds more providers for acreate_batch
response = await litellm.acreate_file(
custom_llm_provider="openai", **_create_file_request
)
### ALERTING ###
data["litellm_status"] = "success" # used for alerting
### RESPONSE HEADERS ###
hidden_params = getattr(response, "_hidden_params", {}) or {}
model_id = hidden_params.get("model_id", None) or ""
cache_key = hidden_params.get("cache_key", None) or ""
api_base = hidden_params.get("api_base", None) or ""
fastapi_response.headers.update(
get_custom_headers(
user_api_key_dict=user_api_key_dict,
model_id=model_id,
cache_key=cache_key,
api_base=api_base,
version=version,
model_region=getattr(user_api_key_dict, "allowed_model_region", ""),
)
)
return response
except Exception as e:
data["litellm_status"] = "fail" # used for alerting
await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data
)
traceback.print_exc()
if isinstance(e, HTTPException):
raise ProxyException(
message=getattr(e, "message", str(e.detail)),
type=getattr(e, "type", "None"),
param=getattr(e, "param", "None"),
code=getattr(e, "status_code", status.HTTP_400_BAD_REQUEST),
)
else:
error_traceback = traceback.format_exc()
error_msg = f"{str(e)}"
raise ProxyException(
message=getattr(e, "message", error_msg),
type=getattr(e, "type", "None"),
param=getattr(e, "param", "None"),
code=getattr(e, "status_code", 500),
)
@router.post( @router.post(
"/v1/moderations", "/v1/moderations",
dependencies=[Depends(user_api_key_auth)], dependencies=[Depends(user_api_key_auth)],

View file

@ -60,8 +60,6 @@ def test_create_batch():
create_batch_response.input_file_id == batch_input_file_id create_batch_response.input_file_id == batch_input_file_id
), f"Failed to create batch, expected input_file_id to be {batch_input_file_id} but got {create_batch_response.input_file_id}" ), f"Failed to create batch, expected input_file_id to be {batch_input_file_id} but got {create_batch_response.input_file_id}"
time.sleep(30)
retrieved_batch = litellm.retrieve_batch( retrieved_batch = litellm.retrieve_batch(
batch_id=create_batch_response.id, custom_llm_provider="openai" batch_id=create_batch_response.id, custom_llm_provider="openai"
) )
@ -70,6 +68,17 @@ def test_create_batch():
assert retrieved_batch.id == create_batch_response.id assert retrieved_batch.id == create_batch_response.id
file_content = litellm.file_content(
file_id=batch_input_file_id, custom_llm_provider="openai"
)
result = file_content.content
result_file_name = "batch_job_results_furniture.jsonl"
with open(result_file_name, "wb") as file:
file.write(result)
pass pass
@ -127,6 +136,18 @@ async def test_async_create_batch():
assert retrieved_batch.id == create_batch_response.id assert retrieved_batch.id == create_batch_response.id
# try to get file content for our original file
file_content = await litellm.afile_content(
file_id=batch_input_file_id, custom_llm_provider="openai"
)
print("file content = ", file_content)
# # write this file content to a file
# with open("file_content.json", "w") as f:
# json.dump(file_content, f)
def test_retrieve_batch(): def test_retrieve_batch():
pass pass

View file

@ -20,6 +20,7 @@ from openai.types.beta.assistant import Assistant
from openai.pagination import SyncCursorPage from openai.pagination import SyncCursorPage
from os import PathLike from os import PathLike
from openai.types import FileObject, Batch from openai.types import FileObject, Batch
from openai._legacy_response import HttpxBinaryResponseContent
from typing import TypedDict, List, Optional, Tuple, Mapping, IO from typing import TypedDict, List, Optional, Tuple, Mapping, IO
@ -186,6 +187,26 @@ class CreateFileRequest(TypedDict, total=False):
timeout: Optional[float] timeout: Optional[float]
class FileContentRequest(TypedDict, total=False):
"""
FileContentRequest
Used by Assistants API, Batches API, and Fine-Tunes API
Required Params:
file_id: str
Optional Params:
extra_headers: Optional[Dict[str, str]]
extra_body: Optional[Dict[str, str]] = None
timeout: Optional[float] = None
"""
file_id: str
extra_headers: Optional[Dict[str, str]]
extra_body: Optional[Dict[str, str]]
timeout: Optional[float]
# OpenAI Batches Types # OpenAI Batches Types
class CreateBatchRequest(TypedDict, total=False): class CreateBatchRequest(TypedDict, total=False):
""" """