import json import os import sys import traceback import openai import pytest from dotenv import load_dotenv load_dotenv() sys.path.insert( 0, os.path.abspath("../..") ) # Adds the parent directory to the system path from unittest.mock import AsyncMock, MagicMock, patch import litellm from litellm import completion, completion_cost, embedding litellm.set_verbose = False def test_openai_embedding(): try: litellm.set_verbose = True response = embedding( model="text-embedding-ada-002", input=["good morning from litellm", "this is another item"], metadata={"anything": "good day"}, ) litellm_response = dict(response) litellm_response_keys = set(litellm_response.keys()) litellm_response_keys.discard("_response_ms") print(litellm_response_keys) print("LiteLLM Response\n") # print(litellm_response) # same request with OpenAI 1.0+ import openai client = openai.OpenAI(api_key=os.environ["OPENAI_API_KEY"]) response = client.embeddings.create( model="text-embedding-ada-002", input=["good morning from litellm", "this is another item"], ) response = dict(response) openai_response_keys = set(response.keys()) print(openai_response_keys) assert ( litellm_response_keys == openai_response_keys ) # ENSURE the Keys in litellm response is exactly what the openai package returns assert ( len(litellm_response["data"]) == 2 ) # expect two embedding responses from litellm_response since input had two print(openai_response_keys) except Exception as e: pytest.fail(f"Error occurred: {e}") # test_openai_embedding() def test_openai_embedding_3(): try: litellm.set_verbose = True response = embedding( model="text-embedding-3-small", input=["good morning from litellm", "this is another item"], metadata={"anything": "good day"}, dimensions=5, ) print(f"response:", response) litellm_response = dict(response) litellm_response_keys = set(litellm_response.keys()) litellm_response_keys.discard("_response_ms") print(litellm_response_keys) print("LiteLLM Response\n") # print(litellm_response) # same request with OpenAI 1.0+ import openai client = openai.OpenAI(api_key=os.environ["OPENAI_API_KEY"]) response = client.embeddings.create( model="text-embedding-3-small", input=["good morning from litellm", "this is another item"], dimensions=5, ) response = dict(response) openai_response_keys = set(response.keys()) print(openai_response_keys) assert ( litellm_response_keys == openai_response_keys ) # ENSURE the Keys in litellm response is exactly what the openai package returns assert ( len(litellm_response["data"]) == 2 ) # expect two embedding responses from litellm_response since input had two print(openai_response_keys) except Exception as e: pytest.fail(f"Error occurred: {e}") @pytest.mark.parametrize( "model, api_base, api_key", [ # ("azure/azure-embedding-model", None, None), ("together_ai/togethercomputer/m2-bert-80M-8k-retrieval", None, None), ], ) @pytest.mark.parametrize("sync_mode", [True, False]) @pytest.mark.asyncio async def test_openai_azure_embedding_simple(model, api_base, api_key, sync_mode): try: os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True" litellm.model_cost = litellm.get_model_cost_map(url="") # litellm.set_verbose = True if sync_mode: response = embedding( model=model, input=["good morning from litellm"], api_base=api_base, api_key=api_key, ) else: response = await litellm.aembedding( model=model, input=["good morning from litellm"], api_base=api_base, api_key=api_key, ) # print(await response) print(response) print(response._hidden_params) response_keys = set(dict(response).keys()) response_keys.discard("_response_ms") assert set(["usage", "model", "object", "data"]) == set( response_keys ) # assert litellm response has expected keys from OpenAI embedding response request_cost = litellm.completion_cost( completion_response=response, call_type="embedding" ) print("Calculated request cost=", request_cost) assert isinstance(response.usage, litellm.Usage) except Exception as e: pytest.fail(f"Error occurred: {e}") # test_openai_azure_embedding_simple() import base64 import requests litellm.set_verbose = True url = "https://dummyimage.com/100/100/fff&text=Test+image" response = requests.get(url) file_data = response.content encoded_file = base64.b64encode(file_data).decode("utf-8") base64_image = f"data:image/png;base64,{encoded_file}" from openai.types.embedding import Embedding def _azure_ai_image_mock_response(*args, **kwargs): new_response = MagicMock() new_response.headers = {"azureml-model-group": "offer-cohere-embed-multili-paygo"} new_response.json.return_value = { "data": [Embedding(embedding=[1234], index=0, object="embedding")], "model": "", "object": "list", "usage": {"prompt_tokens": 1, "total_tokens": 2}, } return new_response @pytest.mark.parametrize( "model, api_base, api_key", [ ( "azure_ai/Cohere-embed-v3-multilingual-jzu", "https://Cohere-embed-v3-multilingual-jzu.eastus2.models.ai.azure.com", os.getenv("AZURE_AI_COHERE_API_KEY_2"), ) ], ) @pytest.mark.parametrize("sync_mode", [True]) # , False @pytest.mark.asyncio async def test_azure_ai_embedding_image(model, api_base, api_key, sync_mode): try: os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True" litellm.model_cost = litellm.get_model_cost_map(url="") input = base64_image if sync_mode: client = HTTPHandler() else: client = AsyncHTTPHandler() with patch.object( client, "post", side_effect=_azure_ai_image_mock_response ) as mock_client: if sync_mode: response = embedding( model=model, input=[input], api_base=api_base, api_key=api_key, client=client, ) else: response = await litellm.aembedding( model=model, input=[input], api_base=api_base, api_key=api_key, client=client, ) print(response) assert len(response.data) == 1 print(response._hidden_params) response_keys = set(dict(response).keys()) response_keys.discard("_response_ms") assert set(["usage", "model", "object", "data"]) == set( response_keys ) # assert litellm response has expected keys from OpenAI embedding response request_cost = litellm.completion_cost(completion_response=response) print("Calculated request cost=", request_cost) assert isinstance(response.usage, litellm.Usage) except Exception as e: pytest.fail(f"Error occurred: {e}") def test_openai_azure_embedding_timeouts(): try: response = embedding( model="azure/azure-embedding-model", input=["good morning from litellm"], timeout=0.00001, ) print(response) except openai.APITimeoutError: print("Good job got timeout error!") pass except Exception as e: pytest.fail( f"Expected timeout error, did not get the correct error. Instead got {e}" ) # test_openai_azure_embedding_timeouts() def test_openai_embedding_timeouts(): try: response = embedding( model="text-embedding-ada-002", input=["good morning from litellm"], timeout=0.00001, ) print(response) except openai.APITimeoutError: print("Good job got OpenAI timeout error!") pass except Exception as e: pytest.fail( f"Expected timeout error, did not get the correct error. Instead got {e}" ) # test_openai_embedding_timeouts() def test_openai_azure_embedding(): try: api_key = os.environ["AZURE_API_KEY"] api_base = os.environ["AZURE_API_BASE"] api_version = os.environ["AZURE_API_VERSION"] os.environ["AZURE_API_VERSION"] = "" os.environ["AZURE_API_BASE"] = "" os.environ["AZURE_API_KEY"] = "" response = embedding( model="azure/azure-embedding-model", input=["good morning from litellm", "this is another item"], api_key=api_key, api_base=api_base, api_version=api_version, ) print(response) os.environ["AZURE_API_VERSION"] = api_version os.environ["AZURE_API_BASE"] = api_base os.environ["AZURE_API_KEY"] = api_key except Exception as e: pytest.fail(f"Error occurred: {e}") @pytest.mark.skipif( os.environ.get("CIRCLE_OIDC_TOKEN") is None, reason="Cannot run without being in CircleCI Runner", ) def test_openai_azure_embedding_with_oidc_and_cf(): # TODO: Switch to our own Azure account, currently using ai.moda's account os.environ["AZURE_TENANT_ID"] = "17c0a27a-1246-4aa1-a3b6-d294e80e783c" os.environ["AZURE_CLIENT_ID"] = "4faf5422-b2bd-45e8-a6d7-46543a38acd0" old_key = os.environ["AZURE_API_KEY"] os.environ.pop("AZURE_API_KEY", None) try: response = embedding( model="azure/text-embedding-ada-002", input=["Hello"], azure_ad_token="oidc/circleci/", api_base="https://eastus2-litellm.openai.azure.com/", api_version="2024-06-01", ) print(response) except Exception as e: pytest.fail(f"Error occurred: {e}") finally: os.environ["AZURE_API_KEY"] = old_key from openai.types.embedding import Embedding def _openai_mock_response(*args, **kwargs): new_response = MagicMock() new_response.headers = {"hello": "world"} new_response.parse.return_value = ( openai.types.create_embedding_response.CreateEmbeddingResponse( data=[Embedding(embedding=[1234, 45667], index=0, object="embedding")], model="azure/test", object="list", usage=openai.types.create_embedding_response.Usage( prompt_tokens=1, total_tokens=2 ), ) ) return new_response def test_openai_azure_embedding_optional_arg(): with patch.object( openai.resources.embeddings.Embeddings, "create", side_effect=_openai_mock_response, ) as mock_client: _ = litellm.embedding( model="azure/test", input=["test"], api_version="test", api_base="test", azure_ad_token="test", ) assert mock_client.called_once_with(model="test", input=["test"], timeout=600) assert "azure_ad_token" not in mock_client.call_args.kwargs # test_openai_azure_embedding() # test_openai_embedding() @pytest.mark.parametrize( "model, api_base", [ ("embed-english-v2.0", None), ], ) @pytest.mark.parametrize("sync_mode", [True, False]) @pytest.mark.asyncio async def test_cohere_embedding(sync_mode, model, api_base): try: # litellm.set_verbose=True data = { "model": model, "input": ["good morning from litellm", "this is another item"], "input_type": "search_query", "api_base": api_base, } if sync_mode: response = embedding(**data) else: response = await litellm.aembedding(**data) print(f"response:", response) assert isinstance(response.usage, litellm.Usage) except Exception as e: pytest.fail(f"Error occurred: {e}") # test_cohere_embedding() @pytest.mark.parametrize("custom_llm_provider", ["cohere", "cohere_chat"]) @pytest.mark.asyncio() async def test_cohere_embedding3(custom_llm_provider): try: litellm.set_verbose = True response = await litellm.aembedding( model=f"{custom_llm_provider}/embed-english-v3.0", input=["good morning from litellm", "this is another item"], timeout=None, max_retries=0, ) print(f"response:", response) except Exception as e: pytest.fail(f"Error occurred: {e}") # test_cohere_embedding3() @pytest.mark.parametrize( "model", [ "bedrock/amazon.titan-embed-text-v1", "bedrock/amazon.titan-embed-image-v1", "bedrock/amazon.titan-embed-text-v2:0", ], ) @pytest.mark.parametrize("sync_mode", [True, False]) # , @pytest.mark.asyncio async def test_bedrock_embedding_titan(model, sync_mode): try: # this tests if we support str input for bedrock embedding litellm.set_verbose = True litellm.enable_cache() import time current_time = str(time.time()) # DO NOT MAKE THE INPUT A LIST in this test if sync_mode: response = embedding( model=model, input=f"good morning from litellm, attempting to embed data {current_time}", # input should always be a string in this test aws_region_name="us-west-2", ) else: response = await litellm.aembedding( model=model, input=f"good morning from litellm, attempting to embed data {current_time}", # input should always be a string in this test aws_region_name="us-west-2", ) print("response:", response) assert isinstance( response["data"][0]["embedding"], list ), "Expected response to be a list" print("type of first embedding:", type(response["data"][0]["embedding"][0])) assert all( isinstance(x, float) for x in response["data"][0]["embedding"] ), "Expected response to be a list of floats" except Exception as e: pytest.fail(f"Error occurred: {e}") @pytest.mark.parametrize( "model", [ "bedrock/amazon.titan-embed-text-v1", "bedrock/amazon.titan-embed-image-v1", "bedrock/amazon.titan-embed-text-v2:0", ], ) @pytest.mark.parametrize("sync_mode", [True]) # True, @pytest.mark.asyncio async def test_bedrock_embedding_titan_caching(model, sync_mode): try: # this tests if we support str input for bedrock embedding litellm.set_verbose = True litellm.enable_cache() import time current_time = str(time.time()) # DO NOT MAKE THE INPUT A LIST in this test if sync_mode: response = embedding( model=model, input=f"good morning from litellm, attempting to embed data {current_time}", # input should always be a string in this test aws_region_name="us-west-2", ) else: response = await litellm.aembedding( model=model, input=f"good morning from litellm, attempting to embed data {current_time}", # input should always be a string in this test aws_region_name="us-west-2", ) print("response:", response) assert isinstance( response["data"][0]["embedding"], list ), "Expected response to be a list" print("type of first embedding:", type(response["data"][0]["embedding"][0])) assert all( isinstance(x, float) for x in response["data"][0]["embedding"] ), "Expected response to be a list of floats" # this also tests if we can return a cache response for this scenario import time start_time = time.time() if sync_mode: response = embedding( model=model, input=f"good morning from litellm, attempting to embed data {current_time}", # input should always be a string in this test ) else: response = await litellm.aembedding( model=model, input=f"good morning from litellm, attempting to embed data {current_time}", # input should always be a string in this test ) print(response) end_time = time.time() print(response._hidden_params) print(f"Embedding 2 response time: {end_time - start_time} seconds") assert end_time - start_time < 0.1 litellm.disable_cache() assert isinstance(response.usage, litellm.Usage) except Exception as e: pytest.fail(f"Error occurred: {e}") # test_bedrock_embedding_titan() def test_bedrock_embedding_cohere(): try: litellm.set_verbose = False response = embedding( model="cohere.embed-multilingual-v3", input=[ "good morning from litellm, attempting to embed data", "lets test a second string for good measure", ], aws_region_name="os.environ/AWS_REGION_NAME_2", ) assert isinstance( response["data"][0]["embedding"], list ), "Expected response to be a list" print(f"type of first embedding:", type(response["data"][0]["embedding"][0])) assert all( isinstance(x, float) for x in response["data"][0]["embedding"] ), "Expected response to be a list of floats" # print(f"response:", response) assert isinstance(response.usage, litellm.Usage) except Exception as e: pytest.fail(f"Error occurred: {e}") # test_bedrock_embedding_cohere() def test_demo_tokens_as_input_to_embeddings_fails_for_titan(): litellm.set_verbose = True with pytest.raises( litellm.BadRequestError, match='litellm.BadRequestError: BedrockException - {"message":"Malformed input request: expected type: String, found: JSONArray, please reformat your input and try again."}', ): litellm.embedding(model="amazon.titan-embed-text-v1", input=[[1]]) with pytest.raises( litellm.BadRequestError, match='litellm.BadRequestError: BedrockException - {"message":"Malformed input request: expected type: String, found: Integer, please reformat your input and try again."}', ): litellm.embedding( model="amazon.titan-embed-text-v1", input=[1], ) # comment out hf tests - since hf endpoints are unstable def test_hf_embedding(): try: # huggingface/microsoft/codebert-base # huggingface/facebook/bart-large response = embedding( model="huggingface/sentence-transformers/all-MiniLM-L6-v2", input=["good morning from litellm", "this is another item"], ) print(f"response:", response) assert isinstance(response.usage, litellm.Usage) except Exception as e: # Note: Huggingface inference API is unstable and fails with "model loading errors all the time" pass # test_hf_embedding() from unittest.mock import MagicMock, patch def tgi_mock_post(*args, **kwargs): import json expected_data = { "inputs": { "source_sentence": "good morning from litellm", "sentences": ["this is another item"], } } assert ( json.loads(kwargs["data"]) == expected_data ), "Data does not match the expected data" mock_response = MagicMock() mock_response.status_code = 200 mock_response.headers = {"Content-Type": "application/json"} mock_response.json.return_value = [0.7708950042724609] return mock_response from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler @pytest.mark.parametrize("sync_mode", [True, False]) @pytest.mark.asyncio async def test_hf_embedding_sentence_sim(sync_mode): try: # huggingface/microsoft/codebert-base # huggingface/facebook/bart-large if sync_mode is True: client = HTTPHandler(concurrent_limit=1) else: client = AsyncHTTPHandler(concurrent_limit=1) with patch.object(client, "post", side_effect=tgi_mock_post) as mock_client: data = { "model": "huggingface/TaylorAI/bge-micro-v2", "input": ["good morning from litellm", "this is another item"], "client": client, } if sync_mode is True: response = embedding(**data) else: response = await litellm.aembedding(**data) print(f"response:", response) mock_client.assert_called_once() assert isinstance(response.usage, litellm.Usage) except Exception as e: # Note: Huggingface inference API is unstable and fails with "model loading errors all the time" raise e # test async embeddings def test_aembedding(): try: import asyncio async def embedding_call(): try: response = await litellm.aembedding( model="text-embedding-ada-002", input=["good morning from litellm", "this is another item"], ) print(response) return response except Exception as e: pytest.fail(f"Error occurred: {e}") response = asyncio.run(embedding_call()) print("Before caclulating cost, response", response) cost = litellm.completion_cost(completion_response=response) print("COST=", cost) assert cost == float("1e-06") except Exception as e: pytest.fail(f"Error occurred: {e}") # test_aembedding() def test_aembedding_azure(): try: import asyncio async def embedding_call(): try: response = await litellm.aembedding( model="azure/azure-embedding-model", input=["good morning from litellm", "this is another item"], ) print(response) print( "hidden params - custom_llm_provider", response._hidden_params["custom_llm_provider"], ) assert response._hidden_params["custom_llm_provider"] == "azure" assert isinstance(response.usage, litellm.Usage) except Exception as e: pytest.fail(f"Error occurred: {e}") asyncio.run(embedding_call()) except Exception as e: pytest.fail(f"Error occurred: {e}") # test_aembedding_azure() @pytest.mark.skip(reason="AWS Suspended Account") def test_sagemaker_embeddings(): try: response = litellm.embedding( model="sagemaker/berri-benchmarking-gpt-j-6b-fp16", input=["good morning from litellm", "this is another item"], input_cost_per_second=0.000420, ) print(f"response: {response}") cost = completion_cost(completion_response=response) assert ( cost > 0.0 and cost < 1.0 ) # should never be > $1 for a single embedding call except Exception as e: pytest.fail(f"Error occurred: {e}") @pytest.mark.skip(reason="AWS Suspended Account") @pytest.mark.asyncio async def test_sagemaker_aembeddings(): try: response = await litellm.aembedding( model="sagemaker/berri-benchmarking-gpt-j-6b-fp16", input=["good morning from litellm", "this is another item"], input_cost_per_second=0.000420, ) print(f"response: {response}") cost = completion_cost(completion_response=response) assert ( cost > 0.0 and cost < 1.0 ) # should never be > $1 for a single embedding call except Exception as e: pytest.fail(f"Error occurred: {e}") def test_mistral_embeddings(): try: litellm.set_verbose = True response = litellm.embedding( model="mistral/mistral-embed", input=["good morning from litellm"], ) print(f"response: {response}") assert isinstance(response.usage, litellm.Usage) except Exception as e: pytest.fail(f"Error occurred: {e}") def test_fireworks_embeddings(): try: litellm.set_verbose = True response = litellm.embedding( model="fireworks_ai/nomic-ai/nomic-embed-text-v1.5", input=["good morning from litellm"], ) print(f"response: {response}") assert isinstance(response.usage, litellm.Usage) cost = completion_cost(completion_response=response) print("cost", cost) assert cost > 0.0 print(response._hidden_params) assert response._hidden_params["response_cost"] > 0.0 except Exception as e: pytest.fail(f"Error occurred: {e}") def test_watsonx_embeddings(): def mock_wx_embed_request(method: str, url: str, **kwargs): mock_response = MagicMock() mock_response.status_code = 200 mock_response.headers = {"Content-Type": "application/json"} mock_response.json.return_value = { "model_id": "ibm/slate-30m-english-rtrvr", "created_at": "2024-01-01T00:00:00.00Z", "results": [{"embedding": [0.0] * 254}], "input_token_count": 8, } return mock_response try: litellm.set_verbose = True with patch("requests.request", side_effect=mock_wx_embed_request): response = litellm.embedding( model="watsonx/ibm/slate-30m-english-rtrvr", input=["good morning from litellm"], token="secret-token", ) print(f"response: {response}") assert isinstance(response.usage, litellm.Usage) except litellm.RateLimitError as e: pass except Exception as e: pytest.fail(f"Error occurred: {e}") @pytest.mark.asyncio async def test_watsonx_aembeddings(): def mock_async_client(*args, **kwargs): mocked_client = MagicMock() async def mock_send(request, *args, stream: bool = False, **kwags): mock_response = MagicMock() mock_response.status_code = 200 mock_response.headers = {"Content-Type": "application/json"} mock_response.json.return_value = { "model_id": "ibm/slate-30m-english-rtrvr", "created_at": "2024-01-01T00:00:00.00Z", "results": [{"embedding": [0.0] * 254}], "input_token_count": 8, } mock_response.is_error = False return mock_response mocked_client.send = mock_send return mocked_client try: litellm.set_verbose = True with patch("httpx.AsyncClient", side_effect=mock_async_client): response = await litellm.aembedding( model="watsonx/ibm/slate-30m-english-rtrvr", input=["good morning from litellm"], token="secret-token", ) print(f"response: {response}") assert isinstance(response.usage, litellm.Usage) except litellm.RateLimitError as e: pass except Exception as e: pytest.fail(f"Error occurred: {e}") # test_mistral_embeddings() @pytest.mark.skip( reason="Community maintained embedding provider - they are quite unstable" ) def test_voyage_embeddings(): try: litellm.set_verbose = True response = litellm.embedding( model="voyage/voyage-01", input=["good morning from litellm"], ) print(f"response: {response}") except Exception as e: pytest.fail(f"Error occurred: {e}") @pytest.mark.asyncio async def test_triton_embeddings(): try: litellm.set_verbose = True response = await litellm.aembedding( model="triton/my-triton-model", api_base="https://exampleopenaiendpoint-production.up.railway.app/triton/embeddings", input=["good morning from litellm"], ) print(f"response: {response}") # stubbed endpoint is setup to return this assert response.data[0]["embedding"] == [0.1, 0.2] except Exception as e: pytest.fail(f"Error occurred: {e}") @pytest.mark.parametrize("sync_mode", [True, False]) @pytest.mark.parametrize( "input", ["good morning from litellm", ["good morning from litellm"]] # ) @pytest.mark.asyncio async def test_gemini_embeddings(sync_mode, input): try: litellm.set_verbose = True if sync_mode: response = litellm.embedding( model="gemini/text-embedding-004", input=input, ) else: response = await litellm.aembedding( model="gemini/text-embedding-004", input=input, ) print(f"response: {response}") # stubbed endpoint is setup to return this assert isinstance(response.data[0]["embedding"], list) assert response.usage.prompt_tokens > 0 except Exception as e: pytest.fail(f"Error occurred: {e}") @pytest.mark.parametrize("sync_mode", [True, False]) @pytest.mark.asyncio async def test_databricks_embeddings(sync_mode): try: litellm.set_verbose = True litellm.drop_params = True if sync_mode: response = litellm.embedding( model="databricks/databricks-bge-large-en", input=["good morning from litellm"], instruction="Represent this sentence for searching relevant passages:", ) else: response = await litellm.aembedding( model="databricks/databricks-bge-large-en", input=["good morning from litellm"], instruction="Represent this sentence for searching relevant passages:", ) print(f"response: {response}") openai.types.CreateEmbeddingResponse.model_validate( response.model_dump(), strict=True ) # stubbed endpoint is setup to return this # assert response.data[0]["embedding"] == [0.1, 0.2, 0.3] except Exception as e: pytest.fail(f"Error occurred: {e}") # test_voyage_embeddings() # def test_xinference_embeddings(): # try: # litellm.set_verbose = True # response = litellm.embedding( # model="xinference/bge-base-en", # input=["good morning from litellm"], # ) # print(f"response: {response}") # except Exception as e: # pytest.fail(f"Error occurred: {e}") # test_xinference_embeddings() # test_sagemaker_embeddings() # def local_proxy_embeddings(): # litellm.set_verbose=True # response = embedding( # model="openai/custom_embedding", # input=["good morning from litellm"], # api_base="http://0.0.0.0:8000/" # ) # print(response) # local_proxy_embeddings() @pytest.mark.parametrize("sync_mode", [True, False]) @pytest.mark.asyncio async def test_hf_embedddings_with_optional_params(sync_mode): litellm.set_verbose = True if sync_mode: client = HTTPHandler(concurrent_limit=1) mock_obj = MagicMock() else: client = AsyncHTTPHandler(concurrent_limit=1) mock_obj = AsyncMock() with patch.object(client, "post", new=mock_obj) as mock_client: try: if sync_mode: response = embedding( model="huggingface/jinaai/jina-embeddings-v2-small-en", input=["good morning from litellm"], top_p=10, top_k=10, wait_for_model=True, client=client, ) else: response = await litellm.aembedding( model="huggingface/jinaai/jina-embeddings-v2-small-en", input=["good morning from litellm"], top_p=10, top_k=10, wait_for_model=True, client=client, ) except Exception: pass mock_client.assert_called_once() print(f"mock_client.call_args.kwargs: {mock_client.call_args.kwargs}") assert "options" in mock_client.call_args.kwargs["data"] json_data = json.loads(mock_client.call_args.kwargs["data"]) assert "wait_for_model" in json_data["options"] assert json_data["options"]["wait_for_model"] is True assert json_data["parameters"]["top_p"] == 10 assert json_data["parameters"]["top_k"] == 10 @pytest.mark.parametrize( "model", [ "text-embedding-ada-002", "azure/azure-embedding-model", ], ) def test_embedding_response_ratelimit_headers(model): response = embedding( model=model, input=["Hello world"], ) hidden_params = response._hidden_params additional_headers = hidden_params.get("additional_headers", {}) print(additional_headers) assert "x-ratelimit-remaining-requests" in additional_headers assert int(additional_headers["x-ratelimit-remaining-requests"]) > 0 assert "x-ratelimit-remaining-tokens" in additional_headers assert int(additional_headers["x-ratelimit-remaining-tokens"]) > 0 @pytest.mark.parametrize( "input, input_type", [ ( [ "" ], "image", ), (["hello world"], "text"), ], ) def test_cohere_img_embeddings(input, input_type): litellm.set_verbose = True response = embedding( model="cohere/embed-english-v3.0", input=input, ) if input_type == "image": assert response.usage.prompt_tokens_details.image_tokens > 0 else: assert response.usage.prompt_tokens_details.text_tokens > 0 @pytest.mark.parametrize("sync_mode", [True, False]) @pytest.mark.asyncio async def test_embedding_with_extra_headers(sync_mode): input = ["hello world"] from litellm.llms.custom_httpx.http_handler import HTTPHandler, AsyncHTTPHandler if sync_mode: client = HTTPHandler() else: client = AsyncHTTPHandler() data = { "model": "cohere/embed-english-v3.0", "input": input, "extra_headers": {"my-test-param": "hello-world"}, "client": client, } with patch.object(client, "post") as mock_post: try: if sync_mode: embedding(**data) else: await litellm.aembedding(**data) except Exception as e: print(e) mock_post.assert_called_once() assert "my-test-param" in mock_post.call_args.kwargs["headers"]