forked from phoenix/litellm-mirror
test(test_custom_callback_input.py): add bedrock testing
n n
This commit is contained in:
parent
6a3ba74183
commit
b09ecb986e
4 changed files with 165 additions and 36 deletions
|
@ -482,7 +482,7 @@ def completion(
|
|||
logging_obj.post_call(
|
||||
input=prompt,
|
||||
api_key="",
|
||||
original_response=response_body,
|
||||
original_response=json.dumps(response_body),
|
||||
additional_args={"complete_input_dict": data},
|
||||
)
|
||||
print_verbose(f"raw model_response: {response}")
|
||||
|
|
|
@ -683,9 +683,14 @@ def completion(
|
|||
logger_fn=logger_fn
|
||||
)
|
||||
|
||||
# if "stream" in optional_params and optional_params["stream"] == True:
|
||||
# response = CustomStreamWrapper(model_response, model, custom_llm_provider="text-completion-openai", logging_obj=logging)
|
||||
# return response
|
||||
if optional_params.get("stream", False) or acompletion == True:
|
||||
## LOGGING
|
||||
logging.post_call(
|
||||
input=messages,
|
||||
api_key=api_key,
|
||||
original_response=model_response,
|
||||
additional_args={"headers": headers},
|
||||
)
|
||||
response = model_response
|
||||
elif (
|
||||
"replicate" in model or
|
||||
|
@ -730,8 +735,16 @@ def completion(
|
|||
)
|
||||
if "stream" in optional_params and optional_params["stream"] == True:
|
||||
# don't try to access stream object,
|
||||
response = CustomStreamWrapper(model_response, model, logging_obj=logging, custom_llm_provider="replicate")
|
||||
return response
|
||||
model_response = CustomStreamWrapper(model_response, model, logging_obj=logging, custom_llm_provider="replicate") # type: ignore
|
||||
|
||||
if optional_params.get("stream", False) or acompletion == True:
|
||||
## LOGGING
|
||||
logging.post_call(
|
||||
input=messages,
|
||||
api_key=replicate_key,
|
||||
original_response=model_response,
|
||||
)
|
||||
|
||||
response = model_response
|
||||
|
||||
elif custom_llm_provider=="anthropic":
|
||||
|
@ -751,7 +764,7 @@ def completion(
|
|||
custom_prompt_dict
|
||||
or litellm.custom_prompt_dict
|
||||
)
|
||||
model_response = anthropic.completion(
|
||||
response = anthropic.completion(
|
||||
model=model,
|
||||
messages=messages,
|
||||
api_base=api_base,
|
||||
|
@ -767,9 +780,16 @@ def completion(
|
|||
)
|
||||
if "stream" in optional_params and optional_params["stream"] == True:
|
||||
# don't try to access stream object,
|
||||
response = CustomStreamWrapper(model_response, model, custom_llm_provider="anthropic", logging_obj=logging)
|
||||
return response
|
||||
response = model_response
|
||||
response = CustomStreamWrapper(response, model, custom_llm_provider="anthropic", logging_obj=logging)
|
||||
|
||||
if optional_params.get("stream", False) or acompletion == True:
|
||||
## LOGGING
|
||||
logging.post_call(
|
||||
input=messages,
|
||||
api_key=api_key,
|
||||
original_response=response,
|
||||
)
|
||||
response = response
|
||||
elif custom_llm_provider == "nlp_cloud":
|
||||
nlp_cloud_key = (
|
||||
api_key or litellm.nlp_cloud_key or get_secret("NLP_CLOUD_API_KEY") or litellm.api_key
|
||||
|
@ -782,7 +802,7 @@ def completion(
|
|||
or "https://api.nlpcloud.io/v1/gpu/"
|
||||
)
|
||||
|
||||
model_response = nlp_cloud.completion(
|
||||
response = nlp_cloud.completion(
|
||||
model=model,
|
||||
messages=messages,
|
||||
api_base=api_base,
|
||||
|
@ -798,9 +818,17 @@ def completion(
|
|||
|
||||
if "stream" in optional_params and optional_params["stream"] == True:
|
||||
# don't try to access stream object,
|
||||
response = CustomStreamWrapper(model_response, model, custom_llm_provider="nlp_cloud", logging_obj=logging)
|
||||
return response
|
||||
response = model_response
|
||||
response = CustomStreamWrapper(response, model, custom_llm_provider="nlp_cloud", logging_obj=logging)
|
||||
|
||||
if optional_params.get("stream", False) or acompletion == True:
|
||||
## LOGGING
|
||||
logging.post_call(
|
||||
input=messages,
|
||||
api_key=api_key,
|
||||
original_response=response,
|
||||
)
|
||||
|
||||
response = response
|
||||
elif custom_llm_provider == "aleph_alpha":
|
||||
aleph_alpha_key = (
|
||||
api_key or litellm.aleph_alpha_key or get_secret("ALEPH_ALPHA_API_KEY") or get_secret("ALEPHALPHA_API_KEY") or litellm.api_key
|
||||
|
@ -1202,7 +1230,7 @@ def completion(
|
|||
custom_prompt_dict
|
||||
or litellm.custom_prompt_dict
|
||||
)
|
||||
model_response = bedrock.completion(
|
||||
response = bedrock.completion(
|
||||
model=model,
|
||||
messages=messages,
|
||||
custom_prompt_dict=litellm.custom_prompt_dict,
|
||||
|
@ -1220,16 +1248,24 @@ def completion(
|
|||
# don't try to access stream object,
|
||||
if "ai21" in model:
|
||||
response = CustomStreamWrapper(
|
||||
model_response, model, custom_llm_provider="bedrock", logging_obj=logging
|
||||
response, model, custom_llm_provider="bedrock", logging_obj=logging
|
||||
)
|
||||
else:
|
||||
response = CustomStreamWrapper(
|
||||
iter(model_response), model, custom_llm_provider="bedrock", logging_obj=logging
|
||||
iter(response), model, custom_llm_provider="bedrock", logging_obj=logging
|
||||
)
|
||||
return response
|
||||
|
||||
if optional_params.get("stream", False):
|
||||
## LOGGING
|
||||
logging.post_call(
|
||||
input=messages,
|
||||
api_key=None,
|
||||
original_response=response,
|
||||
)
|
||||
|
||||
|
||||
## RESPONSE OBJECT
|
||||
response = model_response
|
||||
response = response
|
||||
elif custom_llm_provider == "vllm":
|
||||
model_response = vllm.completion(
|
||||
model=model,
|
||||
|
|
|
@ -53,6 +53,9 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse
|
|||
|
||||
def log_post_api_call(self, kwargs, response_obj, start_time, end_time):
|
||||
try:
|
||||
print("IN POST CALL API")
|
||||
print(f"kwargs input: {kwargs['input']}")
|
||||
print(f"kwargs original response: {kwargs['original_response']}")
|
||||
## START TIME
|
||||
assert isinstance(start_time, datetime)
|
||||
## END TIME
|
||||
|
@ -67,8 +70,8 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse
|
|||
assert isinstance(kwargs['start_time'], Optional[datetime])
|
||||
assert isinstance(kwargs['stream'], bool)
|
||||
assert isinstance(kwargs['user'], Optional[str])
|
||||
assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)
|
||||
assert isinstance(kwargs['api_key'], str)
|
||||
assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str))
|
||||
assert isinstance(kwargs['api_key'], Optional[str])
|
||||
assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) or inspect.iscoroutine(kwargs['original_response']) or inspect.isasyncgen(kwargs['original_response'])
|
||||
assert isinstance(kwargs['additional_args'], Optional[dict])
|
||||
assert isinstance(kwargs['log_event_type'], str)
|
||||
|
@ -92,9 +95,9 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse
|
|||
assert isinstance(kwargs['start_time'], Optional[datetime])
|
||||
assert isinstance(kwargs['stream'], bool)
|
||||
assert isinstance(kwargs['user'], Optional[str])
|
||||
assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)
|
||||
assert isinstance(kwargs['api_key'], str)
|
||||
assert inspect.isasyncgen(kwargs['original_response']) or inspect.iscoroutine(kwargs['original_response'])
|
||||
assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str))
|
||||
assert isinstance(kwargs['api_key'], Optional[str])
|
||||
assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) or inspect.isasyncgen(kwargs['original_response']) or inspect.iscoroutine(kwargs['original_response'])
|
||||
assert isinstance(kwargs['additional_args'], Optional[dict])
|
||||
assert isinstance(kwargs['log_event_type'], str)
|
||||
except:
|
||||
|
@ -117,8 +120,8 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse
|
|||
assert isinstance(kwargs['start_time'], Optional[datetime])
|
||||
assert isinstance(kwargs['stream'], bool)
|
||||
assert isinstance(kwargs['user'], Optional[str])
|
||||
assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)
|
||||
assert isinstance(kwargs['api_key'], str)
|
||||
assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str))
|
||||
assert isinstance(kwargs['api_key'], Optional[str])
|
||||
assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper))
|
||||
assert isinstance(kwargs['additional_args'], Optional[dict])
|
||||
assert isinstance(kwargs['log_event_type'], str)
|
||||
|
@ -142,8 +145,8 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse
|
|||
assert isinstance(kwargs['start_time'], Optional[datetime])
|
||||
assert isinstance(kwargs['stream'], bool)
|
||||
assert isinstance(kwargs['user'], Optional[str])
|
||||
assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)
|
||||
assert isinstance(kwargs['api_key'], str)
|
||||
assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str))
|
||||
assert isinstance(kwargs['api_key'], Optional[str])
|
||||
assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) or kwargs["original_response"] == None
|
||||
assert isinstance(kwargs['additional_args'], Optional[dict])
|
||||
assert isinstance(kwargs['log_event_type'], str)
|
||||
|
@ -185,9 +188,9 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse
|
|||
assert isinstance(kwargs['start_time'], Optional[datetime])
|
||||
assert isinstance(kwargs['stream'], bool)
|
||||
assert isinstance(kwargs['user'], Optional[str])
|
||||
assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)
|
||||
assert isinstance(kwargs['api_key'], str)
|
||||
assert isinstance(kwargs['original_response'], str) or inspect.isasyncgen(kwargs['original_response']) or inspect.iscoroutine(kwargs['original_response'])
|
||||
assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str))
|
||||
assert isinstance(kwargs['api_key'], Optional[str])
|
||||
assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) or inspect.isasyncgen(kwargs['original_response']) or inspect.iscoroutine(kwargs['original_response'])
|
||||
assert isinstance(kwargs['additional_args'], Optional[dict])
|
||||
assert isinstance(kwargs['log_event_type'], str)
|
||||
except:
|
||||
|
@ -210,8 +213,8 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse
|
|||
assert isinstance(kwargs['start_time'], Optional[datetime])
|
||||
assert isinstance(kwargs['stream'], bool)
|
||||
assert isinstance(kwargs['user'], Optional[str])
|
||||
assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)
|
||||
assert isinstance(kwargs['api_key'], str)
|
||||
assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str))
|
||||
assert isinstance(kwargs['api_key'], Optional[str])
|
||||
assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) or inspect.isasyncgen(kwargs['original_response'])
|
||||
assert isinstance(kwargs['additional_args'], Optional[dict])
|
||||
assert isinstance(kwargs['log_event_type'], str)
|
||||
|
@ -343,7 +346,7 @@ def test_chat_azure_stream():
|
|||
|
||||
# test_chat_azure_stream()
|
||||
|
||||
## Test OpenAI + Async
|
||||
## Test Azure + Async
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_chat_azure_stream():
|
||||
try:
|
||||
|
@ -384,3 +387,88 @@ async def test_async_chat_azure_stream():
|
|||
pytest.fail(f"An exception occurred: {str(e)}")
|
||||
|
||||
# asyncio.run(test_async_chat_azure_stream())
|
||||
|
||||
## Test Bedrock + sync
|
||||
def test_chat_bedrock_stream():
|
||||
try:
|
||||
customHandler = CompletionCustomHandler()
|
||||
litellm.callbacks = [customHandler]
|
||||
response = litellm.completion(model="bedrock/anthropic.claude-v1",
|
||||
messages=[{
|
||||
"role": "user",
|
||||
"content": "Hi 👋 - i'm sync bedrock"
|
||||
}])
|
||||
# test streaming
|
||||
response = litellm.completion(model="bedrock/anthropic.claude-v1",
|
||||
messages=[{
|
||||
"role": "user",
|
||||
"content": "Hi 👋 - i'm sync bedrock"
|
||||
}],
|
||||
stream=True)
|
||||
for chunk in response:
|
||||
continue
|
||||
# test failure callback
|
||||
try:
|
||||
response = litellm.completion(model="bedrock/anthropic.claude-v1",
|
||||
messages=[{
|
||||
"role": "user",
|
||||
"content": "Hi 👋 - i'm sync bedrock"
|
||||
}],
|
||||
aws_region_name="my-bad-region",
|
||||
stream=True)
|
||||
for chunk in response:
|
||||
continue
|
||||
except:
|
||||
pass
|
||||
time.sleep(1)
|
||||
print(f"customHandler.errors: {customHandler.errors}")
|
||||
assert len(customHandler.errors) == 0
|
||||
litellm.callbacks = []
|
||||
except Exception as e:
|
||||
pytest.fail(f"An exception occurred: {str(e)}")
|
||||
|
||||
# test_chat_bedrock_stream()
|
||||
|
||||
## Test Bedrock + Async
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_chat_bedrock_stream():
|
||||
try:
|
||||
customHandler = CompletionCustomHandler()
|
||||
litellm.callbacks = [customHandler]
|
||||
response = await litellm.acompletion(model="bedrock/anthropic.claude-v1",
|
||||
messages=[{
|
||||
"role": "user",
|
||||
"content": "Hi 👋 - i'm async bedrock"
|
||||
}])
|
||||
# test streaming
|
||||
response = await litellm.acompletion(model="bedrock/anthropic.claude-v1",
|
||||
messages=[{
|
||||
"role": "user",
|
||||
"content": "Hi 👋 - i'm async bedrock"
|
||||
}],
|
||||
stream=True)
|
||||
print(f"response: {response}")
|
||||
async for chunk in response:
|
||||
print(f"chunk: {chunk}")
|
||||
continue
|
||||
## test failure callback
|
||||
try:
|
||||
response = await litellm.acompletion(model="bedrock/anthropic.claude-v1",
|
||||
messages=[{
|
||||
"role": "user",
|
||||
"content": "Hi 👋 - i'm async bedrock"
|
||||
}],
|
||||
aws_region_name="my-bad-key",
|
||||
stream=True)
|
||||
async for chunk in response:
|
||||
continue
|
||||
except:
|
||||
pass
|
||||
time.sleep(1)
|
||||
print(f"customHandler.errors: {customHandler.errors}")
|
||||
assert len(customHandler.errors) == 0
|
||||
litellm.callbacks = []
|
||||
except Exception as e:
|
||||
pytest.fail(f"An exception occurred: {str(e)}")
|
||||
|
||||
# asyncio.run(test_async_chat_bedrock_stream())
|
|
@ -815,6 +815,7 @@ class Logging:
|
|||
print_verbose(
|
||||
f"Logging Details LiteLLM-Success Call"
|
||||
)
|
||||
# print(f"original response in success handler: {self.model_call_details['original_response']}")
|
||||
try:
|
||||
print_verbose(f"success callbacks: {litellm.success_callback}")
|
||||
## BUILD COMPLETE STREAMED RESPONSE
|
||||
|
@ -1191,7 +1192,7 @@ class Logging:
|
|||
print_verbose=print_verbose,
|
||||
callback_func=callback
|
||||
)
|
||||
elif isinstance(callback, CustomLogger): # custom logger class
|
||||
elif isinstance(callback, CustomLogger) and self.model_call_details.get("litellm_params", {}).get("acompletion", False) == False: # custom logger class
|
||||
callback.log_failure_event(
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
|
@ -5712,6 +5713,10 @@ class CustomStreamWrapper:
|
|||
processed_chunk = next(self)
|
||||
asyncio.create_task(self.logging_obj.async_success_handler(processed_chunk,))
|
||||
return processed_chunk
|
||||
except StopAsyncIteration:
|
||||
raise
|
||||
except StopIteration:
|
||||
raise StopAsyncIteration # Re-raise StopIteration
|
||||
except Exception as e:
|
||||
traceback_exception = traceback.format_exc()
|
||||
# Handle any exceptions that might occur during streaming
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue