mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 18:54:30 +00:00
fix(openai.py): handling extra headers
This commit is contained in:
parent
9e072f87bd
commit
a94c09c13c
6 changed files with 98 additions and 118 deletions
|
@ -5,7 +5,8 @@ from .base import BaseLLM
|
||||||
from litellm.utils import ModelResponse, Choices, Message, CustomStreamWrapper, convert_to_model_response_object, Usage
|
from litellm.utils import ModelResponse, Choices, Message, CustomStreamWrapper, convert_to_model_response_object, Usage
|
||||||
from typing import Callable, Optional
|
from typing import Callable, Optional
|
||||||
import aiohttp, requests
|
import aiohttp, requests
|
||||||
import litellm, openai
|
import litellm
|
||||||
|
from openai import OpenAI, AsyncOpenAI
|
||||||
|
|
||||||
class OpenAIError(Exception):
|
class OpenAIError(Exception):
|
||||||
def __init__(self, status_code, message, request: Optional[httpx.Request]=None, response: Optional[httpx.Response]=None):
|
def __init__(self, status_code, message, request: Optional[httpx.Request]=None, response: Optional[httpx.Response]=None):
|
||||||
|
@ -154,46 +155,9 @@ class OpenAITextCompletionConfig():
|
||||||
and v is not None}
|
and v is not None}
|
||||||
|
|
||||||
class OpenAIChatCompletion(BaseLLM):
|
class OpenAIChatCompletion(BaseLLM):
|
||||||
openai_client: openai.Client
|
|
||||||
openai_aclient: openai.AsyncClient
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.openai_client = openai.OpenAI()
|
|
||||||
self.openai_aclient = openai.AsyncOpenAI()
|
|
||||||
|
|
||||||
def validate_environment(self, api_key, api_base, headers):
|
|
||||||
if headers is None:
|
|
||||||
headers = {
|
|
||||||
"content-type": "application/json",
|
|
||||||
}
|
|
||||||
if api_key:
|
|
||||||
headers["Authorization"] = f"Bearer {api_key}"
|
|
||||||
|
|
||||||
self.openai_client.api_key = api_key
|
|
||||||
self.openai_aclient.api_key = api_key
|
|
||||||
if api_base:
|
|
||||||
if self.openai_client.base_url is None or self.openai_client.base_url != api_base:
|
|
||||||
if api_base.endswith("/"):
|
|
||||||
self.openai_client._base_url = httpx.URL(url=api_base)
|
|
||||||
else:
|
|
||||||
self.openai_client._base_url = httpx.URL(url=api_base+"/")
|
|
||||||
if self.openai_aclient.base_url is None or self.openai_aclient.base_url != api_base:
|
|
||||||
if api_base.endswith("/"):
|
|
||||||
self.openai_aclient._base_url = httpx.URL(url=api_base)
|
|
||||||
else:
|
|
||||||
self.openai_aclient._base_url = httpx.URL(url=api_base+"/")
|
|
||||||
|
|
||||||
return headers
|
|
||||||
|
|
||||||
def _retry_request(self, *args, **kwargs):
|
|
||||||
self._num_retry_httpx_errors -= 1
|
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
original_function = kwargs.pop("original_function")
|
|
||||||
|
|
||||||
return original_function(*args, **kwargs)
|
|
||||||
|
|
||||||
def completion(self,
|
def completion(self,
|
||||||
model_response: ModelResponse,
|
model_response: ModelResponse,
|
||||||
|
@ -211,7 +175,8 @@ class OpenAIChatCompletion(BaseLLM):
|
||||||
super().completion()
|
super().completion()
|
||||||
exception_mapping_worked = False
|
exception_mapping_worked = False
|
||||||
try:
|
try:
|
||||||
headers = self.validate_environment(api_key=api_key, api_base=api_base, headers=headers)
|
if headers:
|
||||||
|
optional_params["extra_headers"] = headers
|
||||||
if model is None or messages is None:
|
if model is None or messages is None:
|
||||||
raise OpenAIError(status_code=422, message=f"Missing model or messages")
|
raise OpenAIError(status_code=422, message=f"Missing model or messages")
|
||||||
|
|
||||||
|
@ -232,13 +197,14 @@ class OpenAIChatCompletion(BaseLLM):
|
||||||
try:
|
try:
|
||||||
if acompletion is True:
|
if acompletion is True:
|
||||||
if optional_params.get("stream", False):
|
if optional_params.get("stream", False):
|
||||||
return self.async_streaming(logging_obj=logging_obj, data=data, model=model)
|
return self.async_streaming(logging_obj=logging_obj, data=data, model=model, api_base=api_base, api_key=api_key)
|
||||||
else:
|
else:
|
||||||
return self.acompletion(data=data, model_response=model_response)
|
return self.acompletion(data=data, model_response=model_response, api_base=api_base, api_key=api_key)
|
||||||
elif optional_params.get("stream", False):
|
elif optional_params.get("stream", False):
|
||||||
return self.streaming(logging_obj=logging_obj, data=data, model=model)
|
return self.streaming(logging_obj=logging_obj, data=data, model=model, api_base=api_base, api_key=api_key)
|
||||||
else:
|
else:
|
||||||
response = self.openai_client.chat.completions.create(**data) # type: ignore
|
openai_client = OpenAI(api_key=api_key, base_url=api_base)
|
||||||
|
response = openai_client.chat.completions.create(**data) # type: ignore
|
||||||
return convert_to_model_response_object(response_object=json.loads(response.model_dump_json()), model_response_object=model_response)
|
return convert_to_model_response_object(response_object=json.loads(response.model_dump_json()), model_response_object=model_response)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if "Conversation roles must alternate user/assistant" in str(e) or "user and assistant roles should be alternating" in str(e):
|
if "Conversation roles must alternate user/assistant" in str(e) or "user and assistant roles should be alternating" in str(e):
|
||||||
|
@ -267,10 +233,13 @@ class OpenAIChatCompletion(BaseLLM):
|
||||||
|
|
||||||
async def acompletion(self,
|
async def acompletion(self,
|
||||||
data: dict,
|
data: dict,
|
||||||
model_response: ModelResponse):
|
model_response: ModelResponse,
|
||||||
|
api_base: str,
|
||||||
|
api_key: str):
|
||||||
response = None
|
response = None
|
||||||
try:
|
try:
|
||||||
response = await self.openai_aclient.chat.completions.create(**data)
|
openai_aclient = AsyncOpenAI(api_key=api_key, base_url=api_base)
|
||||||
|
response = await openai_aclient.chat.completions.create(**data)
|
||||||
return convert_to_model_response_object(response_object=json.loads(response.model_dump_json()), model_response_object=model_response)
|
return convert_to_model_response_object(response_object=json.loads(response.model_dump_json()), model_response_object=model_response)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if response and hasattr(response, "text"):
|
if response and hasattr(response, "text"):
|
||||||
|
@ -281,9 +250,12 @@ class OpenAIChatCompletion(BaseLLM):
|
||||||
def streaming(self,
|
def streaming(self,
|
||||||
logging_obj,
|
logging_obj,
|
||||||
data: dict,
|
data: dict,
|
||||||
model: str
|
model: str,
|
||||||
|
api_key: str,
|
||||||
|
api_base: str
|
||||||
):
|
):
|
||||||
response = self.openai_client.chat.completions.create(**data)
|
openai_client = OpenAI(api_key=api_key, base_url=api_base)
|
||||||
|
response = openai_client.chat.completions.create(**data)
|
||||||
streamwrapper = CustomStreamWrapper(completion_stream=response, model=model, custom_llm_provider="openai",logging_obj=logging_obj)
|
streamwrapper = CustomStreamWrapper(completion_stream=response, model=model, custom_llm_provider="openai",logging_obj=logging_obj)
|
||||||
for transformed_chunk in streamwrapper:
|
for transformed_chunk in streamwrapper:
|
||||||
yield transformed_chunk
|
yield transformed_chunk
|
||||||
|
@ -291,8 +263,11 @@ class OpenAIChatCompletion(BaseLLM):
|
||||||
async def async_streaming(self,
|
async def async_streaming(self,
|
||||||
logging_obj,
|
logging_obj,
|
||||||
data: dict,
|
data: dict,
|
||||||
model: str):
|
model: str,
|
||||||
response = await self.openai_aclient.chat.completions.create(**data)
|
api_key: str,
|
||||||
|
api_base: str):
|
||||||
|
openai_aclient = AsyncOpenAI(api_key=api_key, base_url=api_base)
|
||||||
|
response = await openai_aclient.chat.completions.create(**data)
|
||||||
streamwrapper = CustomStreamWrapper(completion_stream=response, model=model, custom_llm_provider="openai",logging_obj=logging_obj)
|
streamwrapper = CustomStreamWrapper(completion_stream=response, model=model, custom_llm_provider="openai",logging_obj=logging_obj)
|
||||||
async for transformed_chunk in streamwrapper:
|
async for transformed_chunk in streamwrapper:
|
||||||
yield transformed_chunk
|
yield transformed_chunk
|
||||||
|
@ -309,8 +284,7 @@ class OpenAIChatCompletion(BaseLLM):
|
||||||
super().embedding()
|
super().embedding()
|
||||||
exception_mapping_worked = False
|
exception_mapping_worked = False
|
||||||
try:
|
try:
|
||||||
headers = self.validate_environment(api_key, api_base=api_base, headers=None)
|
openai_client = OpenAI(api_key=api_key, api_base=api_base)
|
||||||
api_base = f"{api_base}/embeddings"
|
|
||||||
model = model
|
model = model
|
||||||
data = {
|
data = {
|
||||||
"model": model,
|
"model": model,
|
||||||
|
@ -325,7 +299,7 @@ class OpenAIChatCompletion(BaseLLM):
|
||||||
additional_args={"complete_input_dict": data},
|
additional_args={"complete_input_dict": data},
|
||||||
)
|
)
|
||||||
## COMPLETION CALL
|
## COMPLETION CALL
|
||||||
response = self.openai_client.embeddings.create(**data) # type: ignore
|
response = openai_client.embeddings.create(**data) # type: ignore
|
||||||
## LOGGING
|
## LOGGING
|
||||||
logging_obj.post_call(
|
logging_obj.post_call(
|
||||||
input=input,
|
input=input,
|
||||||
|
|
|
@ -941,8 +941,6 @@ def completion(
|
||||||
{
|
{
|
||||||
"HTTP-Referer": openrouter_site_url,
|
"HTTP-Referer": openrouter_site_url,
|
||||||
"X-Title": openrouter_app_name,
|
"X-Title": openrouter_app_name,
|
||||||
"Content-Type": "application/json",
|
|
||||||
"Authorization": f"Bearer {api_key}"
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,8 @@ def test_async_response_openai():
|
||||||
|
|
||||||
asyncio.run(test_get_response())
|
asyncio.run(test_get_response())
|
||||||
|
|
||||||
|
# test_async_response_openai()
|
||||||
|
|
||||||
def test_async_response_azure():
|
def test_async_response_azure():
|
||||||
import asyncio
|
import asyncio
|
||||||
litellm.set_verbose = True
|
litellm.set_verbose = True
|
||||||
|
@ -80,6 +82,8 @@ def test_async_anyscale_response():
|
||||||
|
|
||||||
asyncio.run(test_get_response())
|
asyncio.run(test_get_response())
|
||||||
|
|
||||||
|
# test_async_anyscale_response()
|
||||||
|
|
||||||
def test_get_response_streaming():
|
def test_get_response_streaming():
|
||||||
import asyncio
|
import asyncio
|
||||||
async def test_async_call():
|
async def test_async_call():
|
||||||
|
@ -87,7 +91,7 @@ def test_get_response_streaming():
|
||||||
messages = [{"content": user_message, "role": "user"}]
|
messages = [{"content": user_message, "role": "user"}]
|
||||||
try:
|
try:
|
||||||
litellm.set_verbose = True
|
litellm.set_verbose = True
|
||||||
response = await acompletion(model="azure/chatgpt-v-2", messages=messages, stream=True)
|
response = await acompletion(model="gpt-3.5-turbo", messages=messages, stream=True)
|
||||||
print(type(response))
|
print(type(response))
|
||||||
|
|
||||||
import inspect
|
import inspect
|
||||||
|
@ -110,7 +114,7 @@ def test_get_response_streaming():
|
||||||
asyncio.run(test_async_call())
|
asyncio.run(test_async_call())
|
||||||
|
|
||||||
|
|
||||||
test_get_response_streaming()
|
# test_get_response_streaming()
|
||||||
|
|
||||||
def test_get_response_non_openai_streaming():
|
def test_get_response_non_openai_streaming():
|
||||||
import asyncio
|
import asyncio
|
||||||
|
@ -141,3 +145,5 @@ def test_get_response_non_openai_streaming():
|
||||||
pytest.fail(f"An exception occurred: {e}")
|
pytest.fail(f"An exception occurred: {e}")
|
||||||
return response
|
return response
|
||||||
asyncio.run(test_async_call())
|
asyncio.run(test_async_call())
|
||||||
|
|
||||||
|
test_get_response_non_openai_streaming()
|
|
@ -494,7 +494,7 @@ def test_completion_openrouter1():
|
||||||
print(response)
|
print(response)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pytest.fail(f"Error occurred: {e}")
|
pytest.fail(f"Error occurred: {e}")
|
||||||
# test_completion_openrouter1()
|
test_completion_openrouter1()
|
||||||
|
|
||||||
def test_completion_hf_model_no_provider():
|
def test_completion_hf_model_no_provider():
|
||||||
try:
|
try:
|
||||||
|
@ -562,7 +562,7 @@ def test_completion_azure():
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pytest.fail(f"Error occurred: {e}")
|
pytest.fail(f"Error occurred: {e}")
|
||||||
|
|
||||||
test_completion_azure()
|
# test_completion_azure()
|
||||||
|
|
||||||
def test_azure_openai_ad_token():
|
def test_azure_openai_ad_token():
|
||||||
# this tests if the azure ad token is set in the request header
|
# this tests if the azure ad token is set in the request header
|
||||||
|
|
|
@ -1,69 +1,69 @@
|
||||||
# import sys, os
|
import sys, os
|
||||||
# import traceback
|
import traceback
|
||||||
# from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
# import copy
|
import copy
|
||||||
|
|
||||||
# load_dotenv()
|
load_dotenv()
|
||||||
# sys.path.insert(
|
sys.path.insert(
|
||||||
# 0, os.path.abspath("../..")
|
0, os.path.abspath("../..")
|
||||||
# ) # Adds the parent directory to the system path
|
) # Adds the parent directory to the system path
|
||||||
# import asyncio
|
import asyncio
|
||||||
# from litellm import Router, Timeout
|
from litellm import Router, Timeout
|
||||||
|
|
||||||
|
|
||||||
# async def call_acompletion(semaphore, router: Router, input_data):
|
async def call_acompletion(semaphore, router: Router, input_data):
|
||||||
# async with semaphore:
|
async with semaphore:
|
||||||
# try:
|
try:
|
||||||
# # Use asyncio.wait_for to set a timeout for the task
|
# Use asyncio.wait_for to set a timeout for the task
|
||||||
# response = await router.acompletion(**input_data)
|
response = await router.acompletion(**input_data)
|
||||||
# # Handle the response as needed
|
# Handle the response as needed
|
||||||
# return response
|
return response
|
||||||
# except Timeout:
|
except Timeout:
|
||||||
# print(f"Task timed out: {input_data}")
|
print(f"Task timed out: {input_data}")
|
||||||
# return None # You may choose to return something else or raise an exception
|
return None # You may choose to return something else or raise an exception
|
||||||
|
|
||||||
|
|
||||||
# async def main():
|
async def main():
|
||||||
# # Initialize the Router
|
# Initialize the Router
|
||||||
# model_list= [{
|
model_list= [{
|
||||||
# "model_name": "gpt-3.5-turbo",
|
"model_name": "gpt-3.5-turbo",
|
||||||
# "litellm_params": {
|
"litellm_params": {
|
||||||
# "model": "gpt-3.5-turbo",
|
"model": "gpt-3.5-turbo",
|
||||||
# "api_key": os.getenv("OPENAI_API_KEY"),
|
"api_key": os.getenv("OPENAI_API_KEY"),
|
||||||
# },
|
},
|
||||||
# }, {
|
}, {
|
||||||
# "model_name": "gpt-3.5-turbo",
|
"model_name": "gpt-3.5-turbo",
|
||||||
# "litellm_params": {
|
"litellm_params": {
|
||||||
# "model": "azure/chatgpt-v-2",
|
"model": "azure/chatgpt-v-2",
|
||||||
# "api_key": os.getenv("AZURE_API_KEY"),
|
"api_key": os.getenv("AZURE_API_KEY"),
|
||||||
# "api_base": os.getenv("AZURE_API_BASE"),
|
"api_base": os.getenv("AZURE_API_BASE"),
|
||||||
# "api_version": os.getenv("AZURE_API_VERSION")
|
"api_version": os.getenv("AZURE_API_VERSION")
|
||||||
# },
|
},
|
||||||
# }, {
|
}, {
|
||||||
# "model_name": "gpt-3.5-turbo",
|
"model_name": "gpt-3.5-turbo",
|
||||||
# "litellm_params": {
|
"litellm_params": {
|
||||||
# "model": "azure/chatgpt-functioncalling",
|
"model": "azure/chatgpt-functioncalling",
|
||||||
# "api_key": os.getenv("AZURE_API_KEY"),
|
"api_key": os.getenv("AZURE_API_KEY"),
|
||||||
# "api_base": os.getenv("AZURE_API_BASE"),
|
"api_base": os.getenv("AZURE_API_BASE"),
|
||||||
# "api_version": os.getenv("AZURE_API_VERSION")
|
"api_version": os.getenv("AZURE_API_VERSION")
|
||||||
# },
|
},
|
||||||
# }]
|
}]
|
||||||
# router = Router(model_list=model_list, num_retries=3, timeout=10)
|
router = Router(model_list=model_list, num_retries=3, timeout=10)
|
||||||
|
|
||||||
# # Create a semaphore with a capacity of 100
|
# Create a semaphore with a capacity of 100
|
||||||
# semaphore = asyncio.Semaphore(100)
|
semaphore = asyncio.Semaphore(100)
|
||||||
|
|
||||||
# # List to hold all task references
|
# List to hold all task references
|
||||||
# tasks = []
|
tasks = []
|
||||||
|
|
||||||
# # Launch 1000 tasks
|
# Launch 1000 tasks
|
||||||
# for _ in range(1000):
|
for _ in range(1000):
|
||||||
# task = asyncio.create_task(call_acompletion(semaphore, router, {"model": "gpt-3.5-turbo", "messages": [{"role":"user", "content": "Hey, how's it going?"}]}))
|
task = asyncio.create_task(call_acompletion(semaphore, router, {"model": "gpt-3.5-turbo", "messages": [{"role":"user", "content": "Hey, how's it going?"}]}))
|
||||||
# tasks.append(task)
|
tasks.append(task)
|
||||||
|
|
||||||
# # Wait for all tasks to complete
|
# Wait for all tasks to complete
|
||||||
# responses = await asyncio.gather(*tasks)
|
responses = await asyncio.gather(*tasks)
|
||||||
# # Process responses as needed
|
# Process responses as needed
|
||||||
# print(f"NUMBER OF COMPLETED TASKS: {len(responses)}")
|
print(f"NUMBER OF COMPLETED TASKS: {len(responses)}")
|
||||||
# # Run the main function
|
# Run the main function
|
||||||
# asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|
|
@ -506,6 +506,8 @@ class Logging:
|
||||||
|
|
||||||
# User Logging -> if you pass in a custom logging function
|
# User Logging -> if you pass in a custom logging function
|
||||||
headers = additional_args.get("headers", {})
|
headers = additional_args.get("headers", {})
|
||||||
|
if headers is None:
|
||||||
|
headers = {}
|
||||||
data = additional_args.get("complete_input_dict", {})
|
data = additional_args.get("complete_input_dict", {})
|
||||||
api_base = additional_args.get("api_base", "")
|
api_base = additional_args.get("api_base", "")
|
||||||
masked_headers = {k: v[:-40] + '*' * 40 if len(v) > 40 else v for k, v in headers.items()}
|
masked_headers = {k: v[:-40] + '*' * 40 if len(v) > 40 else v for k, v in headers.items()}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue