diff --git a/litellm/llms/openai.py b/litellm/llms/openai.py index 6bfd1aad3..67572fde4 100644 --- a/litellm/llms/openai.py +++ b/litellm/llms/openai.py @@ -5,7 +5,8 @@ from .base import BaseLLM from litellm.utils import ModelResponse, Choices, Message, CustomStreamWrapper, convert_to_model_response_object, Usage from typing import Callable, Optional import aiohttp, requests -import litellm, openai +import litellm +from openai import OpenAI, AsyncOpenAI class OpenAIError(Exception): def __init__(self, status_code, message, request: Optional[httpx.Request]=None, response: Optional[httpx.Response]=None): @@ -154,46 +155,9 @@ class OpenAITextCompletionConfig(): and v is not None} class OpenAIChatCompletion(BaseLLM): - openai_client: openai.Client - openai_aclient: openai.AsyncClient def __init__(self) -> None: super().__init__() - self.openai_client = openai.OpenAI() - self.openai_aclient = openai.AsyncOpenAI() - - def validate_environment(self, api_key, api_base, headers): - if headers is None: - headers = { - "content-type": "application/json", - } - if api_key: - headers["Authorization"] = f"Bearer {api_key}" - - self.openai_client.api_key = api_key - self.openai_aclient.api_key = api_key - if api_base: - if self.openai_client.base_url is None or self.openai_client.base_url != api_base: - if api_base.endswith("/"): - self.openai_client._base_url = httpx.URL(url=api_base) - else: - self.openai_client._base_url = httpx.URL(url=api_base+"/") - if self.openai_aclient.base_url is None or self.openai_aclient.base_url != api_base: - if api_base.endswith("/"): - self.openai_aclient._base_url = httpx.URL(url=api_base) - else: - self.openai_aclient._base_url = httpx.URL(url=api_base+"/") - - return headers - - def _retry_request(self, *args, **kwargs): - self._num_retry_httpx_errors -= 1 - - time.sleep(1) - - original_function = kwargs.pop("original_function") - - return original_function(*args, **kwargs) def completion(self, model_response: ModelResponse, @@ -211,7 +175,8 @@ class OpenAIChatCompletion(BaseLLM): super().completion() exception_mapping_worked = False try: - headers = self.validate_environment(api_key=api_key, api_base=api_base, headers=headers) + if headers: + optional_params["extra_headers"] = headers if model is None or messages is None: raise OpenAIError(status_code=422, message=f"Missing model or messages") @@ -232,13 +197,14 @@ class OpenAIChatCompletion(BaseLLM): try: if acompletion is True: if optional_params.get("stream", False): - return self.async_streaming(logging_obj=logging_obj, data=data, model=model) + return self.async_streaming(logging_obj=logging_obj, data=data, model=model, api_base=api_base, api_key=api_key) else: - return self.acompletion(data=data, model_response=model_response) + return self.acompletion(data=data, model_response=model_response, api_base=api_base, api_key=api_key) elif optional_params.get("stream", False): - return self.streaming(logging_obj=logging_obj, data=data, model=model) + return self.streaming(logging_obj=logging_obj, data=data, model=model, api_base=api_base, api_key=api_key) else: - response = self.openai_client.chat.completions.create(**data) # type: ignore + openai_client = OpenAI(api_key=api_key, base_url=api_base) + response = openai_client.chat.completions.create(**data) # type: ignore return convert_to_model_response_object(response_object=json.loads(response.model_dump_json()), model_response_object=model_response) except Exception as e: if "Conversation roles must alternate user/assistant" in str(e) or "user and assistant roles should be alternating" in str(e): @@ -267,10 +233,13 @@ class OpenAIChatCompletion(BaseLLM): async def acompletion(self, data: dict, - model_response: ModelResponse): + model_response: ModelResponse, + api_base: str, + api_key: str): response = None try: - response = await self.openai_aclient.chat.completions.create(**data) + openai_aclient = AsyncOpenAI(api_key=api_key, base_url=api_base) + response = await openai_aclient.chat.completions.create(**data) return convert_to_model_response_object(response_object=json.loads(response.model_dump_json()), model_response_object=model_response) except Exception as e: if response and hasattr(response, "text"): @@ -281,9 +250,12 @@ class OpenAIChatCompletion(BaseLLM): def streaming(self, logging_obj, data: dict, - model: str + model: str, + api_key: str, + api_base: str ): - response = self.openai_client.chat.completions.create(**data) + openai_client = OpenAI(api_key=api_key, base_url=api_base) + response = openai_client.chat.completions.create(**data) streamwrapper = CustomStreamWrapper(completion_stream=response, model=model, custom_llm_provider="openai",logging_obj=logging_obj) for transformed_chunk in streamwrapper: yield transformed_chunk @@ -291,8 +263,11 @@ class OpenAIChatCompletion(BaseLLM): async def async_streaming(self, logging_obj, data: dict, - model: str): - response = await self.openai_aclient.chat.completions.create(**data) + model: str, + api_key: str, + api_base: str): + openai_aclient = AsyncOpenAI(api_key=api_key, base_url=api_base) + response = await openai_aclient.chat.completions.create(**data) streamwrapper = CustomStreamWrapper(completion_stream=response, model=model, custom_llm_provider="openai",logging_obj=logging_obj) async for transformed_chunk in streamwrapper: yield transformed_chunk @@ -309,8 +284,7 @@ class OpenAIChatCompletion(BaseLLM): super().embedding() exception_mapping_worked = False try: - headers = self.validate_environment(api_key, api_base=api_base, headers=None) - api_base = f"{api_base}/embeddings" + openai_client = OpenAI(api_key=api_key, api_base=api_base) model = model data = { "model": model, @@ -325,7 +299,7 @@ class OpenAIChatCompletion(BaseLLM): additional_args={"complete_input_dict": data}, ) ## COMPLETION CALL - response = self.openai_client.embeddings.create(**data) # type: ignore + response = openai_client.embeddings.create(**data) # type: ignore ## LOGGING logging_obj.post_call( input=input, diff --git a/litellm/main.py b/litellm/main.py index 0fd84fe43..3fbbcdd6e 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -941,8 +941,6 @@ def completion( { "HTTP-Referer": openrouter_site_url, "X-Title": openrouter_app_name, - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}" } ) diff --git a/litellm/tests/test_async_fn.py b/litellm/tests/test_async_fn.py index 09e7aba3c..943b9a695 100644 --- a/litellm/tests/test_async_fn.py +++ b/litellm/tests/test_async_fn.py @@ -50,6 +50,8 @@ def test_async_response_openai(): asyncio.run(test_get_response()) +# test_async_response_openai() + def test_async_response_azure(): import asyncio litellm.set_verbose = True @@ -80,6 +82,8 @@ def test_async_anyscale_response(): asyncio.run(test_get_response()) +# test_async_anyscale_response() + def test_get_response_streaming(): import asyncio async def test_async_call(): @@ -87,7 +91,7 @@ def test_get_response_streaming(): messages = [{"content": user_message, "role": "user"}] try: litellm.set_verbose = True - response = await acompletion(model="azure/chatgpt-v-2", messages=messages, stream=True) + response = await acompletion(model="gpt-3.5-turbo", messages=messages, stream=True) print(type(response)) import inspect @@ -110,7 +114,7 @@ def test_get_response_streaming(): asyncio.run(test_async_call()) -test_get_response_streaming() +# test_get_response_streaming() def test_get_response_non_openai_streaming(): import asyncio @@ -141,3 +145,5 @@ def test_get_response_non_openai_streaming(): pytest.fail(f"An exception occurred: {e}") return response asyncio.run(test_async_call()) + +test_get_response_non_openai_streaming() \ No newline at end of file diff --git a/litellm/tests/test_completion.py b/litellm/tests/test_completion.py index eb7f05065..e48cbf81a 100644 --- a/litellm/tests/test_completion.py +++ b/litellm/tests/test_completion.py @@ -494,7 +494,7 @@ def test_completion_openrouter1(): print(response) except Exception as e: pytest.fail(f"Error occurred: {e}") -# test_completion_openrouter1() +test_completion_openrouter1() def test_completion_hf_model_no_provider(): try: @@ -562,7 +562,7 @@ def test_completion_azure(): except Exception as e: pytest.fail(f"Error occurred: {e}") -test_completion_azure() +# test_completion_azure() def test_azure_openai_ad_token(): # this tests if the azure ad token is set in the request header diff --git a/litellm/tests/test_loadtest_router.py b/litellm/tests/test_loadtest_router.py index da031be69..325164515 100644 --- a/litellm/tests/test_loadtest_router.py +++ b/litellm/tests/test_loadtest_router.py @@ -1,69 +1,69 @@ -# import sys, os -# import traceback -# from dotenv import load_dotenv -# import copy +import sys, os +import traceback +from dotenv import load_dotenv +import copy -# load_dotenv() -# sys.path.insert( -# 0, os.path.abspath("../..") -# ) # Adds the parent directory to the system path -# import asyncio -# from litellm import Router, Timeout +load_dotenv() +sys.path.insert( + 0, os.path.abspath("../..") +) # Adds the parent directory to the system path +import asyncio +from litellm import Router, Timeout -# async def call_acompletion(semaphore, router: Router, input_data): -# async with semaphore: -# try: -# # Use asyncio.wait_for to set a timeout for the task -# response = await router.acompletion(**input_data) -# # Handle the response as needed -# return response -# except Timeout: -# print(f"Task timed out: {input_data}") -# return None # You may choose to return something else or raise an exception +async def call_acompletion(semaphore, router: Router, input_data): + async with semaphore: + try: + # Use asyncio.wait_for to set a timeout for the task + response = await router.acompletion(**input_data) + # Handle the response as needed + return response + except Timeout: + print(f"Task timed out: {input_data}") + return None # You may choose to return something else or raise an exception -# async def main(): -# # Initialize the Router -# model_list= [{ -# "model_name": "gpt-3.5-turbo", -# "litellm_params": { -# "model": "gpt-3.5-turbo", -# "api_key": os.getenv("OPENAI_API_KEY"), -# }, -# }, { -# "model_name": "gpt-3.5-turbo", -# "litellm_params": { -# "model": "azure/chatgpt-v-2", -# "api_key": os.getenv("AZURE_API_KEY"), -# "api_base": os.getenv("AZURE_API_BASE"), -# "api_version": os.getenv("AZURE_API_VERSION") -# }, -# }, { -# "model_name": "gpt-3.5-turbo", -# "litellm_params": { -# "model": "azure/chatgpt-functioncalling", -# "api_key": os.getenv("AZURE_API_KEY"), -# "api_base": os.getenv("AZURE_API_BASE"), -# "api_version": os.getenv("AZURE_API_VERSION") -# }, -# }] -# router = Router(model_list=model_list, num_retries=3, timeout=10) +async def main(): + # Initialize the Router + model_list= [{ + "model_name": "gpt-3.5-turbo", + "litellm_params": { + "model": "gpt-3.5-turbo", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + }, { + "model_name": "gpt-3.5-turbo", + "litellm_params": { + "model": "azure/chatgpt-v-2", + "api_key": os.getenv("AZURE_API_KEY"), + "api_base": os.getenv("AZURE_API_BASE"), + "api_version": os.getenv("AZURE_API_VERSION") + }, + }, { + "model_name": "gpt-3.5-turbo", + "litellm_params": { + "model": "azure/chatgpt-functioncalling", + "api_key": os.getenv("AZURE_API_KEY"), + "api_base": os.getenv("AZURE_API_BASE"), + "api_version": os.getenv("AZURE_API_VERSION") + }, + }] + router = Router(model_list=model_list, num_retries=3, timeout=10) -# # Create a semaphore with a capacity of 100 -# semaphore = asyncio.Semaphore(100) + # Create a semaphore with a capacity of 100 + semaphore = asyncio.Semaphore(100) -# # List to hold all task references -# tasks = [] + # List to hold all task references + tasks = [] -# # Launch 1000 tasks -# for _ in range(1000): -# task = asyncio.create_task(call_acompletion(semaphore, router, {"model": "gpt-3.5-turbo", "messages": [{"role":"user", "content": "Hey, how's it going?"}]})) -# tasks.append(task) + # Launch 1000 tasks + for _ in range(1000): + task = asyncio.create_task(call_acompletion(semaphore, router, {"model": "gpt-3.5-turbo", "messages": [{"role":"user", "content": "Hey, how's it going?"}]})) + tasks.append(task) -# # Wait for all tasks to complete -# responses = await asyncio.gather(*tasks) -# # Process responses as needed -# print(f"NUMBER OF COMPLETED TASKS: {len(responses)}") -# # Run the main function -# asyncio.run(main()) + # Wait for all tasks to complete + responses = await asyncio.gather(*tasks) + # Process responses as needed + print(f"NUMBER OF COMPLETED TASKS: {len(responses)}") +# Run the main function +asyncio.run(main()) diff --git a/litellm/utils.py b/litellm/utils.py index 79579c84c..3463831f2 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -506,6 +506,8 @@ class Logging: # User Logging -> if you pass in a custom logging function headers = additional_args.get("headers", {}) + if headers is None: + headers = {} data = additional_args.get("complete_input_dict", {}) api_base = additional_args.get("api_base", "") masked_headers = {k: v[:-40] + '*' * 40 if len(v) > 40 else v for k, v in headers.items()}