From b09ecb986ef681684a0b7cd3b7832146de0e3f10 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 11 Dec 2023 12:59:49 -0800 Subject: [PATCH] test(test_custom_callback_input.py): add bedrock testing n n --- litellm/llms/bedrock.py | 2 +- litellm/main.py | 72 +++++++++--- litellm/tests/test_custom_callback_input.py | 120 +++++++++++++++++--- litellm/utils.py | 7 +- 4 files changed, 165 insertions(+), 36 deletions(-) diff --git a/litellm/llms/bedrock.py b/litellm/llms/bedrock.py index 9b65104006..5b3659f881 100644 --- a/litellm/llms/bedrock.py +++ b/litellm/llms/bedrock.py @@ -482,7 +482,7 @@ def completion( logging_obj.post_call( input=prompt, api_key="", - original_response=response_body, + original_response=json.dumps(response_body), additional_args={"complete_input_dict": data}, ) print_verbose(f"raw model_response: {response}") diff --git a/litellm/main.py b/litellm/main.py index e92e5e7463..3b50c075b6 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -683,9 +683,14 @@ def completion( logger_fn=logger_fn ) - # if "stream" in optional_params and optional_params["stream"] == True: - # response = CustomStreamWrapper(model_response, model, custom_llm_provider="text-completion-openai", logging_obj=logging) - # return response + if optional_params.get("stream", False) or acompletion == True: + ## LOGGING + logging.post_call( + input=messages, + api_key=api_key, + original_response=model_response, + additional_args={"headers": headers}, + ) response = model_response elif ( "replicate" in model or @@ -730,8 +735,16 @@ def completion( ) if "stream" in optional_params and optional_params["stream"] == True: # don't try to access stream object, - response = CustomStreamWrapper(model_response, model, logging_obj=logging, custom_llm_provider="replicate") - return response + model_response = CustomStreamWrapper(model_response, model, logging_obj=logging, custom_llm_provider="replicate") # type: ignore + + if optional_params.get("stream", False) or acompletion == True: + ## LOGGING + logging.post_call( + input=messages, + api_key=replicate_key, + original_response=model_response, + ) + response = model_response elif custom_llm_provider=="anthropic": @@ -751,7 +764,7 @@ def completion( custom_prompt_dict or litellm.custom_prompt_dict ) - model_response = anthropic.completion( + response = anthropic.completion( model=model, messages=messages, api_base=api_base, @@ -767,9 +780,16 @@ def completion( ) if "stream" in optional_params and optional_params["stream"] == True: # don't try to access stream object, - response = CustomStreamWrapper(model_response, model, custom_llm_provider="anthropic", logging_obj=logging) - return response - response = model_response + response = CustomStreamWrapper(response, model, custom_llm_provider="anthropic", logging_obj=logging) + + if optional_params.get("stream", False) or acompletion == True: + ## LOGGING + logging.post_call( + input=messages, + api_key=api_key, + original_response=response, + ) + response = response elif custom_llm_provider == "nlp_cloud": nlp_cloud_key = ( api_key or litellm.nlp_cloud_key or get_secret("NLP_CLOUD_API_KEY") or litellm.api_key @@ -782,7 +802,7 @@ def completion( or "https://api.nlpcloud.io/v1/gpu/" ) - model_response = nlp_cloud.completion( + response = nlp_cloud.completion( model=model, messages=messages, api_base=api_base, @@ -798,9 +818,17 @@ def completion( if "stream" in optional_params and optional_params["stream"] == True: # don't try to access stream object, - response = CustomStreamWrapper(model_response, model, custom_llm_provider="nlp_cloud", logging_obj=logging) - return response - response = model_response + response = CustomStreamWrapper(response, model, custom_llm_provider="nlp_cloud", logging_obj=logging) + + if optional_params.get("stream", False) or acompletion == True: + ## LOGGING + logging.post_call( + input=messages, + api_key=api_key, + original_response=response, + ) + + response = response elif custom_llm_provider == "aleph_alpha": aleph_alpha_key = ( api_key or litellm.aleph_alpha_key or get_secret("ALEPH_ALPHA_API_KEY") or get_secret("ALEPHALPHA_API_KEY") or litellm.api_key @@ -1202,7 +1230,7 @@ def completion( custom_prompt_dict or litellm.custom_prompt_dict ) - model_response = bedrock.completion( + response = bedrock.completion( model=model, messages=messages, custom_prompt_dict=litellm.custom_prompt_dict, @@ -1220,16 +1248,24 @@ def completion( # don't try to access stream object, if "ai21" in model: response = CustomStreamWrapper( - model_response, model, custom_llm_provider="bedrock", logging_obj=logging + response, model, custom_llm_provider="bedrock", logging_obj=logging ) else: response = CustomStreamWrapper( - iter(model_response), model, custom_llm_provider="bedrock", logging_obj=logging + iter(response), model, custom_llm_provider="bedrock", logging_obj=logging ) - return response + + if optional_params.get("stream", False): + ## LOGGING + logging.post_call( + input=messages, + api_key=None, + original_response=response, + ) + ## RESPONSE OBJECT - response = model_response + response = response elif custom_llm_provider == "vllm": model_response = vllm.completion( model=model, diff --git a/litellm/tests/test_custom_callback_input.py b/litellm/tests/test_custom_callback_input.py index 428d2e4d88..0102f6eb7c 100644 --- a/litellm/tests/test_custom_callback_input.py +++ b/litellm/tests/test_custom_callback_input.py @@ -53,6 +53,9 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse def log_post_api_call(self, kwargs, response_obj, start_time, end_time): try: + print("IN POST CALL API") + print(f"kwargs input: {kwargs['input']}") + print(f"kwargs original response: {kwargs['original_response']}") ## START TIME assert isinstance(start_time, datetime) ## END TIME @@ -67,8 +70,8 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse assert isinstance(kwargs['start_time'], Optional[datetime]) assert isinstance(kwargs['stream'], bool) assert isinstance(kwargs['user'], Optional[str]) - assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict) - assert isinstance(kwargs['api_key'], str) + assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str)) + assert isinstance(kwargs['api_key'], Optional[str]) assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) or inspect.iscoroutine(kwargs['original_response']) or inspect.isasyncgen(kwargs['original_response']) assert isinstance(kwargs['additional_args'], Optional[dict]) assert isinstance(kwargs['log_event_type'], str) @@ -92,9 +95,9 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse assert isinstance(kwargs['start_time'], Optional[datetime]) assert isinstance(kwargs['stream'], bool) assert isinstance(kwargs['user'], Optional[str]) - assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict) - assert isinstance(kwargs['api_key'], str) - assert inspect.isasyncgen(kwargs['original_response']) or inspect.iscoroutine(kwargs['original_response']) + assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str)) + assert isinstance(kwargs['api_key'], Optional[str]) + assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) or inspect.isasyncgen(kwargs['original_response']) or inspect.iscoroutine(kwargs['original_response']) assert isinstance(kwargs['additional_args'], Optional[dict]) assert isinstance(kwargs['log_event_type'], str) except: @@ -117,8 +120,8 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse assert isinstance(kwargs['start_time'], Optional[datetime]) assert isinstance(kwargs['stream'], bool) assert isinstance(kwargs['user'], Optional[str]) - assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict) - assert isinstance(kwargs['api_key'], str) + assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str)) + assert isinstance(kwargs['api_key'], Optional[str]) assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) assert isinstance(kwargs['additional_args'], Optional[dict]) assert isinstance(kwargs['log_event_type'], str) @@ -142,8 +145,8 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse assert isinstance(kwargs['start_time'], Optional[datetime]) assert isinstance(kwargs['stream'], bool) assert isinstance(kwargs['user'], Optional[str]) - assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict) - assert isinstance(kwargs['api_key'], str) + assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str)) + assert isinstance(kwargs['api_key'], Optional[str]) assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) or kwargs["original_response"] == None assert isinstance(kwargs['additional_args'], Optional[dict]) assert isinstance(kwargs['log_event_type'], str) @@ -185,9 +188,9 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse assert isinstance(kwargs['start_time'], Optional[datetime]) assert isinstance(kwargs['stream'], bool) assert isinstance(kwargs['user'], Optional[str]) - assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict) - assert isinstance(kwargs['api_key'], str) - assert isinstance(kwargs['original_response'], str) or inspect.isasyncgen(kwargs['original_response']) or inspect.iscoroutine(kwargs['original_response']) + assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str)) + assert isinstance(kwargs['api_key'], Optional[str]) + assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) or inspect.isasyncgen(kwargs['original_response']) or inspect.iscoroutine(kwargs['original_response']) assert isinstance(kwargs['additional_args'], Optional[dict]) assert isinstance(kwargs['log_event_type'], str) except: @@ -210,8 +213,8 @@ class CompletionCustomHandler(CustomLogger): # https://docs.litellm.ai/docs/obse assert isinstance(kwargs['start_time'], Optional[datetime]) assert isinstance(kwargs['stream'], bool) assert isinstance(kwargs['user'], Optional[str]) - assert isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict) - assert isinstance(kwargs['api_key'], str) + assert (isinstance(kwargs['input'], list) and isinstance(kwargs['input'][0], dict)) or isinstance(kwargs['input'], (dict, str)) + assert isinstance(kwargs['api_key'], Optional[str]) assert isinstance(kwargs['original_response'], (str, litellm.CustomStreamWrapper)) or inspect.isasyncgen(kwargs['original_response']) assert isinstance(kwargs['additional_args'], Optional[dict]) assert isinstance(kwargs['log_event_type'], str) @@ -343,7 +346,7 @@ def test_chat_azure_stream(): # test_chat_azure_stream() -## Test OpenAI + Async +## Test Azure + Async @pytest.mark.asyncio async def test_async_chat_azure_stream(): try: @@ -383,4 +386,89 @@ async def test_async_chat_azure_stream(): except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") -# asyncio.run(test_async_chat_azure_stream()) \ No newline at end of file +# asyncio.run(test_async_chat_azure_stream()) + +## Test Bedrock + sync +def test_chat_bedrock_stream(): + try: + customHandler = CompletionCustomHandler() + litellm.callbacks = [customHandler] + response = litellm.completion(model="bedrock/anthropic.claude-v1", + messages=[{ + "role": "user", + "content": "Hi 👋 - i'm sync bedrock" + }]) + # test streaming + response = litellm.completion(model="bedrock/anthropic.claude-v1", + messages=[{ + "role": "user", + "content": "Hi 👋 - i'm sync bedrock" + }], + stream=True) + for chunk in response: + continue + # test failure callback + try: + response = litellm.completion(model="bedrock/anthropic.claude-v1", + messages=[{ + "role": "user", + "content": "Hi 👋 - i'm sync bedrock" + }], + aws_region_name="my-bad-region", + stream=True) + for chunk in response: + continue + except: + pass + time.sleep(1) + print(f"customHandler.errors: {customHandler.errors}") + assert len(customHandler.errors) == 0 + litellm.callbacks = [] + except Exception as e: + pytest.fail(f"An exception occurred: {str(e)}") + +# test_chat_bedrock_stream() + +## Test Bedrock + Async +@pytest.mark.asyncio +async def test_async_chat_bedrock_stream(): + try: + customHandler = CompletionCustomHandler() + litellm.callbacks = [customHandler] + response = await litellm.acompletion(model="bedrock/anthropic.claude-v1", + messages=[{ + "role": "user", + "content": "Hi 👋 - i'm async bedrock" + }]) + # test streaming + response = await litellm.acompletion(model="bedrock/anthropic.claude-v1", + messages=[{ + "role": "user", + "content": "Hi 👋 - i'm async bedrock" + }], + stream=True) + print(f"response: {response}") + async for chunk in response: + print(f"chunk: {chunk}") + continue + ## test failure callback + try: + response = await litellm.acompletion(model="bedrock/anthropic.claude-v1", + messages=[{ + "role": "user", + "content": "Hi 👋 - i'm async bedrock" + }], + aws_region_name="my-bad-key", + stream=True) + async for chunk in response: + continue + except: + pass + time.sleep(1) + print(f"customHandler.errors: {customHandler.errors}") + assert len(customHandler.errors) == 0 + litellm.callbacks = [] + except Exception as e: + pytest.fail(f"An exception occurred: {str(e)}") + +# asyncio.run(test_async_chat_bedrock_stream()) \ No newline at end of file diff --git a/litellm/utils.py b/litellm/utils.py index 0c48c83a87..1e61d79897 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -815,6 +815,7 @@ class Logging: print_verbose( f"Logging Details LiteLLM-Success Call" ) + # print(f"original response in success handler: {self.model_call_details['original_response']}") try: print_verbose(f"success callbacks: {litellm.success_callback}") ## BUILD COMPLETE STREAMED RESPONSE @@ -1191,7 +1192,7 @@ class Logging: print_verbose=print_verbose, callback_func=callback ) - elif isinstance(callback, CustomLogger): # custom logger class + elif isinstance(callback, CustomLogger) and self.model_call_details.get("litellm_params", {}).get("acompletion", False) == False: # custom logger class callback.log_failure_event( start_time=start_time, end_time=end_time, @@ -5712,6 +5713,10 @@ class CustomStreamWrapper: processed_chunk = next(self) asyncio.create_task(self.logging_obj.async_success_handler(processed_chunk,)) return processed_chunk + except StopAsyncIteration: + raise + except StopIteration: + raise StopAsyncIteration # Re-raise StopIteration except Exception as e: traceback_exception = traceback.format_exc() # Handle any exceptions that might occur during streaming