forked from phoenix/litellm-mirror
* fix(pattern_match_deployments.py): default to user input if unable to map based on wildcards * test: fix test * test: reset test name * test: update conftest to reload proxy server module between tests * ci(config.yml): move langfuse out of local_testing reduce ci/cd time * ci(config.yml): cleanup langfuse ci/cd tests * fix: update test to not use global proxy_server app module * ci: move caching to a separate test pipeline speed up ci pipeline * test: update conftest to check if proxy_server attr exists before reloading * build(conftest.py): don't block on inability to reload proxy_server * ci(config.yml): update caching unit test filter to work on 'cache' keyword as well * fix(encrypt_decrypt_utils.py): use function to get salt key * test: mark flaky test * test: handle anthropic overloaded errors * refactor: create separate ci/cd pipeline for proxy unit tests make ci/cd faster * ci(config.yml): add litellm_proxy_unit_testing to build_and_test jobs * ci(config.yml): generate prisma binaries for proxy unit tests * test: readd vertex_key.json * ci(config.yml): remove `-s` from proxy_unit_test cmd speed up test * ci: remove any 'debug' logging flag speed up ci pipeline * test: fix test * test(test_braintrust.py): rerun * test: add delay for braintrust test
299 lines
10 KiB
Python
299 lines
10 KiB
Python
import sys, os
|
|
import traceback
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
import os, io, asyncio
|
|
|
|
# this file is to test litellm/proxy
|
|
|
|
sys.path.insert(
|
|
0, os.path.abspath("../..")
|
|
) # Adds the parent directory to the system path
|
|
import pytest, time
|
|
import litellm
|
|
from litellm import embedding, completion, completion_cost, Timeout
|
|
from litellm import RateLimitError
|
|
import importlib, inspect
|
|
|
|
# test /chat/completion request to the proxy
|
|
from fastapi.testclient import TestClient
|
|
from fastapi import FastAPI
|
|
from litellm.proxy.proxy_server import (
|
|
router,
|
|
save_worker_config,
|
|
initialize,
|
|
startup_event,
|
|
) # Replace with the actual module where your FastAPI router is defined
|
|
|
|
filepath = os.path.dirname(os.path.abspath(__file__))
|
|
python_file_path = f"{filepath}/test_configs/custom_callbacks.py"
|
|
|
|
# @app.on_event("startup")
|
|
# async def wrapper_startup_event():
|
|
# initialize(config=config_fp)
|
|
|
|
# Use the app fixture in your client fixture
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
filepath = os.path.dirname(os.path.abspath(__file__))
|
|
config_fp = f"{filepath}/test_configs/test_custom_logger.yaml"
|
|
app = FastAPI()
|
|
asyncio.run(initialize(config=config_fp))
|
|
app.include_router(router) # Include your router in the test app
|
|
return TestClient(app)
|
|
|
|
|
|
# Your bearer token
|
|
token = os.getenv("PROXY_MASTER_KEY")
|
|
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
print("Testing proxy custom logger")
|
|
|
|
|
|
def test_embedding(client):
|
|
try:
|
|
litellm.set_verbose = False
|
|
from litellm.proxy.utils import get_instance_fn
|
|
|
|
my_custom_logger = get_instance_fn(
|
|
value="custom_callbacks.my_custom_logger", config_file_path=python_file_path
|
|
)
|
|
print("id of initialized custom logger", id(my_custom_logger))
|
|
litellm.callbacks = [my_custom_logger]
|
|
# Your test data
|
|
print("initialized proxy")
|
|
# import the initialized custom logger
|
|
print(litellm.callbacks)
|
|
|
|
# assert len(litellm.callbacks) == 1 # assert litellm is initialized with 1 callback
|
|
print("my_custom_logger", my_custom_logger)
|
|
assert my_custom_logger.async_success_embedding is False
|
|
|
|
test_data = {"model": "azure-embedding-model", "input": ["hello"]}
|
|
response = client.post("/embeddings", json=test_data, headers=headers)
|
|
print("made request", response.status_code, response.text)
|
|
print(
|
|
"vars my custom logger /embeddings",
|
|
vars(my_custom_logger),
|
|
"id",
|
|
id(my_custom_logger),
|
|
)
|
|
assert (
|
|
my_custom_logger.async_success_embedding is True
|
|
) # checks if the status of async_success is True, only the async_log_success_event can set this to true
|
|
assert (
|
|
my_custom_logger.async_embedding_kwargs["model"] == "azure-embedding-model"
|
|
) # checks if kwargs passed to async_log_success_event are correct
|
|
kwargs = my_custom_logger.async_embedding_kwargs
|
|
litellm_params = kwargs.get("litellm_params")
|
|
metadata = litellm_params.get("metadata", None)
|
|
print("\n\n Metadata in custom logger kwargs", litellm_params.get("metadata"))
|
|
assert metadata is not None
|
|
assert "user_api_key" in metadata
|
|
assert "headers" in metadata
|
|
proxy_server_request = litellm_params.get("proxy_server_request")
|
|
model_info = litellm_params.get("model_info")
|
|
assert proxy_server_request == {
|
|
"url": "http://testserver/embeddings",
|
|
"method": "POST",
|
|
"headers": {
|
|
"host": "testserver",
|
|
"accept": "*/*",
|
|
"accept-encoding": "gzip, deflate",
|
|
"connection": "keep-alive",
|
|
"user-agent": "testclient",
|
|
"content-length": "54",
|
|
"content-type": "application/json",
|
|
},
|
|
"body": {"model": "azure-embedding-model", "input": ["hello"]},
|
|
}
|
|
assert model_info == {
|
|
"input_cost_per_token": 0.002,
|
|
"mode": "embedding",
|
|
"id": "hello",
|
|
"db_model": False,
|
|
}
|
|
result = response.json()
|
|
print(f"Received response: {result}")
|
|
print("Passed Embedding custom logger on proxy!")
|
|
except Exception as e:
|
|
pytest.fail(f"LiteLLM Proxy test failed. Exception {str(e)}")
|
|
|
|
|
|
def test_chat_completion(client):
|
|
try:
|
|
# Your test data
|
|
litellm.set_verbose = False
|
|
from litellm.proxy.utils import get_instance_fn
|
|
|
|
my_custom_logger = get_instance_fn(
|
|
value="custom_callbacks.my_custom_logger", config_file_path=python_file_path
|
|
)
|
|
|
|
print("id of initialized custom logger", id(my_custom_logger))
|
|
|
|
litellm.callbacks = [my_custom_logger]
|
|
# import the initialized custom logger
|
|
print(litellm.callbacks)
|
|
|
|
# assert len(litellm.callbacks) == 1 # assert litellm is initialized with 1 callback
|
|
|
|
print("LiteLLM Callbacks", litellm.callbacks)
|
|
print("my_custom_logger", my_custom_logger)
|
|
assert my_custom_logger.async_success == False
|
|
|
|
test_data = {
|
|
"model": "Azure OpenAI GPT-4 Canada",
|
|
"messages": [
|
|
{"role": "user", "content": "write a litellm poem"},
|
|
],
|
|
"max_tokens": 10,
|
|
}
|
|
|
|
response = client.post("/chat/completions", json=test_data, headers=headers)
|
|
print("made request", response.status_code, response.text)
|
|
print("LiteLLM Callbacks", litellm.callbacks)
|
|
time.sleep(1) # sleep while waiting for callback to run
|
|
|
|
print(
|
|
"my_custom_logger in /chat/completions",
|
|
my_custom_logger,
|
|
"id",
|
|
id(my_custom_logger),
|
|
)
|
|
print("vars my custom logger, ", vars(my_custom_logger))
|
|
assert (
|
|
my_custom_logger.async_success == True
|
|
) # checks if the status of async_success is True, only the async_log_success_event can set this to true
|
|
assert (
|
|
my_custom_logger.async_completion_kwargs["model"] == "chatgpt-v-2"
|
|
) # checks if kwargs passed to async_log_success_event are correct
|
|
print(
|
|
"\n\n Custom Logger Async Completion args",
|
|
my_custom_logger.async_completion_kwargs,
|
|
)
|
|
litellm_params = my_custom_logger.async_completion_kwargs.get("litellm_params")
|
|
metadata = litellm_params.get("metadata", None)
|
|
print("\n\n Metadata in custom logger kwargs", litellm_params.get("metadata"))
|
|
assert metadata is not None
|
|
assert "user_api_key" in metadata
|
|
assert "user_api_key_metadata" in metadata
|
|
assert "headers" in metadata
|
|
config_model_info = litellm_params.get("model_info")
|
|
proxy_server_request_object = litellm_params.get("proxy_server_request")
|
|
|
|
assert config_model_info == {
|
|
"id": "gm",
|
|
"input_cost_per_token": 0.0002,
|
|
"mode": "chat",
|
|
"db_model": False,
|
|
}
|
|
|
|
assert "authorization" not in proxy_server_request_object["headers"]
|
|
assert proxy_server_request_object == {
|
|
"url": "http://testserver/chat/completions",
|
|
"method": "POST",
|
|
"headers": {
|
|
"host": "testserver",
|
|
"accept": "*/*",
|
|
"accept-encoding": "gzip, deflate",
|
|
"connection": "keep-alive",
|
|
"user-agent": "testclient",
|
|
"content-length": "123",
|
|
"content-type": "application/json",
|
|
},
|
|
"body": {
|
|
"model": "Azure OpenAI GPT-4 Canada",
|
|
"messages": [{"role": "user", "content": "write a litellm poem"}],
|
|
"max_tokens": 10,
|
|
},
|
|
}
|
|
result = response.json()
|
|
print(f"Received response: {result}")
|
|
print("\nPassed /chat/completions with Custom Logger!")
|
|
except Exception as e:
|
|
pytest.fail(f"LiteLLM Proxy test failed. Exception {str(e)}")
|
|
|
|
|
|
def test_chat_completion_stream(client):
|
|
try:
|
|
# Your test data
|
|
litellm.set_verbose = False
|
|
from litellm.proxy.utils import get_instance_fn
|
|
|
|
my_custom_logger = get_instance_fn(
|
|
value="custom_callbacks.my_custom_logger", config_file_path=python_file_path
|
|
)
|
|
|
|
print("id of initialized custom logger", id(my_custom_logger))
|
|
|
|
litellm.callbacks = [my_custom_logger]
|
|
import json
|
|
|
|
print("initialized proxy")
|
|
# import the initialized custom logger
|
|
print(litellm.callbacks)
|
|
|
|
print("LiteLLM Callbacks", litellm.callbacks)
|
|
print("my_custom_logger", my_custom_logger)
|
|
|
|
assert (
|
|
my_custom_logger.streaming_response_obj == None
|
|
) # no streaming response obj is set pre call
|
|
|
|
test_data = {
|
|
"model": "Azure OpenAI GPT-4 Canada",
|
|
"messages": [
|
|
{"role": "user", "content": "write 1 line poem about LiteLLM"},
|
|
],
|
|
"max_tokens": 40,
|
|
"stream": True, # streaming call
|
|
}
|
|
|
|
response = client.post("/chat/completions", json=test_data, headers=headers)
|
|
print("made request", response.status_code, response.text)
|
|
complete_response = ""
|
|
for line in response.iter_lines():
|
|
if line:
|
|
# Process the streaming data line here
|
|
print("\n\n Line", line)
|
|
print(line)
|
|
line = str(line)
|
|
|
|
json_data = line.replace("data: ", "")
|
|
|
|
if "[DONE]" in json_data:
|
|
break
|
|
|
|
# Parse the JSON string
|
|
data = json.loads(json_data)
|
|
|
|
print("\n\n decode_data", data)
|
|
|
|
# Access the content of choices[0]['message']['content']
|
|
content = data["choices"][0]["delta"].get("content", None) or ""
|
|
|
|
# Process the content as needed
|
|
print("Content:", content)
|
|
|
|
complete_response += content
|
|
|
|
print("\n\nHERE is the complete streaming response string", complete_response)
|
|
print("\n\nHERE IS the streaming Response from callback\n\n")
|
|
print(my_custom_logger.streaming_response_obj)
|
|
import time
|
|
|
|
time.sleep(0.5)
|
|
|
|
streamed_response = my_custom_logger.streaming_response_obj
|
|
assert (
|
|
complete_response == streamed_response["choices"][0]["message"]["content"]
|
|
)
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"LiteLLM Proxy test failed. Exception {str(e)}")
|