# # this tests if the router is initialized correctly # import asyncio # import os # import sys # import time # import traceback # import pytest # sys.path.insert( # 0, os.path.abspath("../..") # ) # Adds the parent directory to the system path # from collections import defaultdict # from concurrent.futures import ThreadPoolExecutor # from dotenv import load_dotenv # import litellm # from litellm import Router # load_dotenv() # # every time we load the router we should have 4 clients: # # Async # # Sync # # Async + Stream # # Sync + Stream # def test_init_clients(): # litellm.set_verbose = True # import logging # from litellm._logging import verbose_router_logger # verbose_router_logger.setLevel(logging.DEBUG) # try: # print("testing init 4 clients with diff timeouts") # model_list = [ # { # "model_name": "gpt-3.5-turbo", # "litellm_params": { # "model": "azure/chatgpt-v-3", # "api_key": os.getenv("AZURE_API_KEY"), # "api_version": os.getenv("AZURE_API_VERSION"), # "api_base": os.getenv("AZURE_API_BASE"), # "timeout": 0.01, # "stream_timeout": 0.000_001, # "max_retries": 7, # }, # }, # ] # router = Router(model_list=model_list, set_verbose=True) # for elem in router.model_list: # model_id = elem["model_info"]["id"] # assert router.cache.get_cache(f"{model_id}_client") is not None # assert router.cache.get_cache(f"{model_id}_async_client") is not None # assert router.cache.get_cache(f"{model_id}_stream_client") is not None # assert router.cache.get_cache(f"{model_id}_stream_async_client") is not None # # check if timeout for stream/non stream clients is set correctly # async_client = router.cache.get_cache(f"{model_id}_async_client") # stream_async_client = router.cache.get_cache( # f"{model_id}_stream_async_client" # ) # assert async_client.timeout == 0.01 # assert stream_async_client.timeout == 0.000_001 # print(vars(async_client)) # print() # print(async_client._base_url) # assert ( # async_client._base_url # == "https://openai-gpt-4-test-v-1.openai.azure.com/openai/" # ) # assert ( # stream_async_client._base_url # == "https://openai-gpt-4-test-v-1.openai.azure.com/openai/" # ) # print("PASSED !") # except Exception as e: # traceback.print_exc() # pytest.fail(f"Error occurred: {e}") # # test_init_clients() # def test_init_clients_basic(): # litellm.set_verbose = True # try: # print("Test basic client init") # model_list = [ # { # "model_name": "gpt-3.5-turbo", # "litellm_params": { # "model": "azure/chatgpt-v-3", # "api_key": os.getenv("AZURE_API_KEY"), # "api_version": os.getenv("AZURE_API_VERSION"), # "api_base": os.getenv("AZURE_API_BASE"), # }, # }, # ] # router = Router(model_list=model_list) # for elem in router.model_list: # model_id = elem["model_info"]["id"] # assert router.cache.get_cache(f"{model_id}_client") is not None # assert router.cache.get_cache(f"{model_id}_async_client") is not None # assert router.cache.get_cache(f"{model_id}_stream_client") is not None # assert router.cache.get_cache(f"{model_id}_stream_async_client") is not None # print("PASSED !") # # see if we can init clients without timeout or max retries set # except Exception as e: # traceback.print_exc() # pytest.fail(f"Error occurred: {e}") # # test_init_clients_basic() # def test_init_clients_basic_azure_cloudflare(): # # init azure + cloudflare # # init OpenAI gpt-3.5 # # init OpenAI text-embedding # # init OpenAI comptaible - Mistral/mistral-medium # # init OpenAI compatible - xinference/bge # litellm.set_verbose = True # try: # print("Test basic client init") # model_list = [ # { # "model_name": "azure-cloudflare", # "litellm_params": { # "model": "azure/chatgpt-v-3", # "api_key": os.getenv("AZURE_API_KEY"), # "api_version": os.getenv("AZURE_API_VERSION"), # "api_base": "https://gateway.ai.cloudflare.com/v1/0399b10e77ac6668c80404a5ff49eb37/litellm-test/azure-openai/openai-gpt-4-test-v-1", # }, # }, # { # "model_name": "gpt-openai", # "litellm_params": { # "model": "gpt-3.5-turbo", # "api_key": os.getenv("OPENAI_API_KEY"), # }, # }, # { # "model_name": "text-embedding-ada-002", # "litellm_params": { # "model": "text-embedding-ada-002", # "api_key": os.getenv("OPENAI_API_KEY"), # }, # }, # { # "model_name": "mistral", # "litellm_params": { # "model": "mistral/mistral-tiny", # "api_key": os.getenv("MISTRAL_API_KEY"), # }, # }, # { # "model_name": "bge-base-en", # "litellm_params": { # "model": "xinference/bge-base-en", # "api_base": "http://127.0.0.1:9997/v1", # "api_key": os.getenv("OPENAI_API_KEY"), # }, # }, # ] # router = Router(model_list=model_list) # for elem in router.model_list: # model_id = elem["model_info"]["id"] # assert router.cache.get_cache(f"{model_id}_client") is not None # assert router.cache.get_cache(f"{model_id}_async_client") is not None # assert router.cache.get_cache(f"{model_id}_stream_client") is not None # assert router.cache.get_cache(f"{model_id}_stream_async_client") is not None # print("PASSED !") # # see if we can init clients without timeout or max retries set # except Exception as e: # traceback.print_exc() # pytest.fail(f"Error occurred: {e}") # # test_init_clients_basic_azure_cloudflare() # def test_timeouts_router(): # """ # Test the timeouts of the router with multiple clients. This HASas to raise a timeout error # """ # import openai # litellm.set_verbose = True # try: # print("testing init 4 clients with diff timeouts") # model_list = [ # { # "model_name": "gpt-3.5-turbo", # "litellm_params": { # "model": "azure/chatgpt-v-3", # "api_key": os.getenv("AZURE_API_KEY"), # "api_version": os.getenv("AZURE_API_VERSION"), # "api_base": os.getenv("AZURE_API_BASE"), # "timeout": 0.000001, # "stream_timeout": 0.000_001, # }, # }, # ] # router = Router(model_list=model_list, num_retries=0) # print("PASSED !") # async def test(): # try: # await router.acompletion( # model="gpt-3.5-turbo", # messages=[ # {"role": "user", "content": "hello, write a 20 pg essay"} # ], # ) # except Exception as e: # raise e # asyncio.run(test()) # except openai.APITimeoutError as e: # print( # "Passed: Raised correct exception. Got openai.APITimeoutError\nGood Job", e # ) # print(type(e)) # pass # except Exception as e: # pytest.fail( # f"Did not raise error `openai.APITimeoutError`. Instead raised error type: {type(e)}, Error: {e}" # ) # # test_timeouts_router() # def test_stream_timeouts_router(): # """ # Test the stream timeouts router. See if it selected the correct client with stream timeout # """ # import openai # litellm.set_verbose = True # try: # print("testing init 4 clients with diff timeouts") # model_list = [ # { # "model_name": "gpt-3.5-turbo", # "litellm_params": { # "model": "azure/chatgpt-v-3", # "api_key": os.getenv("AZURE_API_KEY"), # "api_version": os.getenv("AZURE_API_VERSION"), # "api_base": os.getenv("AZURE_API_BASE"), # "timeout": 200, # regular calls will not timeout, stream calls will # "stream_timeout": 10, # }, # }, # ] # router = Router(model_list=model_list) # print("PASSED !") # data = { # "model": "gpt-3.5-turbo", # "messages": [{"role": "user", "content": "hello, write a 20 pg essay"}], # "stream": True, # } # selected_client = router._get_client( # deployment=router.model_list[0], # kwargs=data, # client_type=None, # ) # print("Select client timeout", selected_client.timeout) # assert selected_client.timeout == 10 # # make actual call # response = router.completion(**data) # for chunk in response: # print(f"chunk: {chunk}") # except openai.APITimeoutError as e: # print( # "Passed: Raised correct exception. Got openai.APITimeoutError\nGood Job", e # ) # print(type(e)) # pass # except Exception as e: # pytest.fail( # f"Did not raise error `openai.APITimeoutError`. Instead raised error type: {type(e)}, Error: {e}" # ) # # test_stream_timeouts_router() # def test_xinference_embedding(): # # [Test Init Xinference] this tests if we init xinference on the router correctly # # [Test Exception Mapping] tests that xinference is an openai comptiable provider # print("Testing init xinference") # print( # "this tests if we create an OpenAI client for Xinference, with the correct API BASE" # ) # model_list = [ # { # "model_name": "xinference", # "litellm_params": { # "model": "xinference/bge-base-en", # "api_base": "os.environ/XINFERENCE_API_BASE", # }, # } # ] # router = Router(model_list=model_list) # print(router.model_list) # print(router.model_list[0]) # assert ( # router.model_list[0]["litellm_params"]["api_base"] == "http://0.0.0.0:9997" # ) # set in env # openai_client = router._get_client( # deployment=router.model_list[0], # kwargs={"input": ["hello"], "model": "xinference"}, # ) # assert openai_client._base_url == "http://0.0.0.0:9997" # assert "xinference" in litellm.openai_compatible_providers # print("passed") # # test_xinference_embedding() # def test_router_init_gpt_4_vision_enhancements(): # try: # # tests base_url set when any base_url with /openai/deployments passed to router # print("Testing Azure GPT_Vision enhancements") # model_list = [ # { # "model_name": "gpt-4-vision-enhancements", # "litellm_params": { # "model": "azure/gpt-4-vision", # "api_key": os.getenv("AZURE_API_KEY"), # "base_url": "https://gpt-4-vision-resource.openai.azure.com/openai/deployments/gpt-4-vision/extensions/", # "dataSources": [ # { # "type": "AzureComputerVision", # "parameters": { # "endpoint": "os.environ/AZURE_VISION_ENHANCE_ENDPOINT", # "key": "os.environ/AZURE_VISION_ENHANCE_KEY", # }, # } # ], # }, # } # ] # router = Router(model_list=model_list) # print(router.model_list) # print(router.model_list[0]) # assert ( # router.model_list[0]["litellm_params"]["base_url"] # == "https://gpt-4-vision-resource.openai.azure.com/openai/deployments/gpt-4-vision/extensions/" # ) # set in env # assert ( # router.model_list[0]["litellm_params"]["dataSources"][0]["parameters"][ # "endpoint" # ] # == os.environ["AZURE_VISION_ENHANCE_ENDPOINT"] # ) # assert ( # router.model_list[0]["litellm_params"]["dataSources"][0]["parameters"][ # "key" # ] # == os.environ["AZURE_VISION_ENHANCE_KEY"] # ) # azure_client = router._get_client( # deployment=router.model_list[0], # kwargs={"stream": True, "model": "gpt-4-vision-enhancements"}, # client_type="async", # ) # assert ( # azure_client._base_url # == "https://gpt-4-vision-resource.openai.azure.com/openai/deployments/gpt-4-vision/extensions/" # ) # print("passed") # except Exception as e: # pytest.fail(f"Error occurred: {e}") # @pytest.mark.parametrize("sync_mode", [True, False]) # @pytest.mark.asyncio # async def test_openai_with_organization(sync_mode): # try: # print("Testing OpenAI with organization") # model_list = [ # { # "model_name": "openai-bad-org", # "litellm_params": { # "model": "gpt-3.5-turbo", # "organization": "org-ikDc4ex8NB", # }, # }, # { # "model_name": "openai-good-org", # "litellm_params": {"model": "gpt-3.5-turbo"}, # }, # ] # router = Router(model_list=model_list) # print(router.model_list) # print(router.model_list[0]) # if sync_mode: # openai_client = router._get_client( # deployment=router.model_list[0], # kwargs={"input": ["hello"], "model": "openai-bad-org"}, # ) # print(vars(openai_client)) # assert openai_client.organization == "org-ikDc4ex8NB" # # bad org raises error # try: # response = router.completion( # model="openai-bad-org", # messages=[{"role": "user", "content": "this is a test"}], # ) # pytest.fail( # "Request should have failed - This organization does not exist" # ) # except Exception as e: # print("Got exception: " + str(e)) # assert "header should match organization for API key" in str( # e # ) or "No such organization" in str(e) # # good org works # response = router.completion( # model="openai-good-org", # messages=[{"role": "user", "content": "this is a test"}], # max_tokens=5, # ) # else: # openai_client = router._get_client( # deployment=router.model_list[0], # kwargs={"input": ["hello"], "model": "openai-bad-org"}, # client_type="async", # ) # print(vars(openai_client)) # assert openai_client.organization == "org-ikDc4ex8NB" # # bad org raises error # try: # response = await router.acompletion( # model="openai-bad-org", # messages=[{"role": "user", "content": "this is a test"}], # ) # pytest.fail( # "Request should have failed - This organization does not exist" # ) # except Exception as e: # print("Got exception: " + str(e)) # assert "header should match organization for API key" in str( # e # ) or "No such organization" in str(e) # # good org works # response = await router.acompletion( # model="openai-good-org", # messages=[{"role": "user", "content": "this is a test"}], # max_tokens=5, # ) # except Exception as e: # pytest.fail(f"Error occurred: {e}") # def test_init_clients_azure_command_r_plus(): # # This tests that the router uses the OpenAI client for Azure/Command-R+ # # For azure/command-r-plus we need to use openai.OpenAI because of how the Azure provider requires requests being sent # litellm.set_verbose = True # import logging # from litellm._logging import verbose_router_logger # verbose_router_logger.setLevel(logging.DEBUG) # try: # print("testing init 4 clients with diff timeouts") # model_list = [ # { # "model_name": "gpt-3.5-turbo", # "litellm_params": { # "model": "azure/command-r-plus", # "api_key": os.getenv("AZURE_COHERE_API_KEY"), # "api_base": os.getenv("AZURE_COHERE_API_BASE"), # "timeout": 0.01, # "stream_timeout": 0.000_001, # "max_retries": 7, # }, # }, # ] # router = Router(model_list=model_list, set_verbose=True) # for elem in router.model_list: # model_id = elem["model_info"]["id"] # async_client = router.cache.get_cache(f"{model_id}_async_client") # stream_async_client = router.cache.get_cache( # f"{model_id}_stream_async_client" # ) # # Assert the Async Clients used are OpenAI clients and not Azure # # For using Azure/Command-R-Plus and Azure/Mistral the clients NEED to be OpenAI clients used # # this is weirdness introduced on Azure's side # assert "openai.AsyncOpenAI" in str(async_client) # assert "openai.AsyncOpenAI" in str(stream_async_client) # print("PASSED !") # except Exception as e: # traceback.print_exc() # pytest.fail(f"Error occurred: {e}") # @pytest.mark.asyncio # async def test_aaaaatext_completion_with_organization(): # try: # print("Testing Text OpenAI with organization") # model_list = [ # { # "model_name": "openai-bad-org", # "litellm_params": { # "model": "text-completion-openai/gpt-3.5-turbo-instruct", # "api_key": os.getenv("OPENAI_API_KEY", None), # "organization": "org-ikDc4ex8NB", # }, # }, # { # "model_name": "openai-good-org", # "litellm_params": { # "model": "text-completion-openai/gpt-3.5-turbo-instruct", # "api_key": os.getenv("OPENAI_API_KEY", None), # "organization": os.getenv("OPENAI_ORGANIZATION", None), # }, # }, # ] # router = Router(model_list=model_list) # print(router.model_list) # print(router.model_list[0]) # openai_client = router._get_client( # deployment=router.model_list[0], # kwargs={"input": ["hello"], "model": "openai-bad-org"}, # ) # print(vars(openai_client)) # assert openai_client.organization == "org-ikDc4ex8NB" # # bad org raises error # try: # response = await router.atext_completion( # model="openai-bad-org", # prompt="this is a test", # ) # pytest.fail("Request should have failed - This organization does not exist") # except Exception as e: # print("Got exception: " + str(e)) # assert "header should match organization for API key" in str( # e # ) or "No such organization" in str(e) # # good org works # response = await router.atext_completion( # model="openai-good-org", # prompt="this is a test", # max_tokens=5, # ) # print("working response: ", response) # except Exception as e: # pytest.fail(f"Error occurred: {e}") # def test_init_clients_async_mode(): # litellm.set_verbose = True # import logging # from litellm._logging import verbose_router_logger # from litellm.types.router import RouterGeneralSettings # verbose_router_logger.setLevel(logging.DEBUG) # try: # print("testing init 4 clients with diff timeouts") # model_list = [ # { # "model_name": "gpt-3.5-turbo", # "litellm_params": { # "model": "azure/chatgpt-v-3", # "api_key": os.getenv("AZURE_API_KEY"), # "api_version": os.getenv("AZURE_API_VERSION"), # "api_base": os.getenv("AZURE_API_BASE"), # "timeout": 0.01, # "stream_timeout": 0.000_001, # "max_retries": 7, # }, # }, # ] # router = Router( # model_list=model_list, # set_verbose=True, # router_general_settings=RouterGeneralSettings(async_only_mode=True), # ) # for elem in router.model_list: # model_id = elem["model_info"]["id"] # # sync clients not initialized in async_only_mode=True # assert router.cache.get_cache(f"{model_id}_client") is None # assert router.cache.get_cache(f"{model_id}_stream_client") is None # # only async clients initialized in async_only_mode=True # assert router.cache.get_cache(f"{model_id}_async_client") is not None # assert router.cache.get_cache(f"{model_id}_stream_async_client") is not None # except Exception as e: # pytest.fail(f"Error occurred: {e}") # @pytest.mark.parametrize( # "environment,expected_models", # [ # ("development", ["gpt-3.5-turbo"]), # ("production", ["gpt-4", "gpt-3.5-turbo", "gpt-4o"]), # ], # ) # def test_init_router_with_supported_environments(environment, expected_models): # """ # Tests that the correct models are setup on router when LITELLM_ENVIRONMENT is set # """ # os.environ["LITELLM_ENVIRONMENT"] = environment # model_list = [ # { # "model_name": "gpt-3.5-turbo", # "litellm_params": { # "model": "azure/chatgpt-v-3", # "api_key": os.getenv("AZURE_API_KEY"), # "api_version": os.getenv("AZURE_API_VERSION"), # "api_base": os.getenv("AZURE_API_BASE"), # "timeout": 0.01, # "stream_timeout": 0.000_001, # "max_retries": 7, # }, # "model_info": {"supported_environments": ["development", "production"]}, # }, # { # "model_name": "gpt-4", # "litellm_params": { # "model": "openai/gpt-4", # "api_key": os.getenv("OPENAI_API_KEY"), # "timeout": 0.01, # "stream_timeout": 0.000_001, # "max_retries": 7, # }, # "model_info": {"supported_environments": ["production"]}, # }, # { # "model_name": "gpt-4o", # "litellm_params": { # "model": "openai/gpt-4o", # "api_key": os.getenv("OPENAI_API_KEY"), # "timeout": 0.01, # "stream_timeout": 0.000_001, # "max_retries": 7, # }, # "model_info": {"supported_environments": ["production"]}, # }, # ] # router = Router(model_list=model_list, set_verbose=True) # _model_list = router.get_model_names() # print("model_list: ", _model_list) # print("expected_models: ", expected_models) # assert set(_model_list) == set(expected_models) # os.environ.pop("LITELLM_ENVIRONMENT")