diff --git a/litellm/llms/vertex_ai.py b/litellm/llms/vertex_ai.py index fdbc1625e..f4447a9e9 100644 --- a/litellm/llms/vertex_ai.py +++ b/litellm/llms/vertex_ai.py @@ -1000,12 +1000,15 @@ async def async_streaming( if stream: response = TextStreamer(completion_response) + logging_obj.post_call(input=prompt, api_key=None, original_response=response) + streamwrapper = CustomStreamWrapper( completion_stream=response, model=model, custom_llm_provider="vertex_ai", logging_obj=logging_obj, ) + return streamwrapper diff --git a/litellm/tests/test_custom_callback_input.py b/litellm/tests/test_custom_callback_input.py index 5da46ffee..ca1fe19a9 100644 --- a/litellm/tests/test_custom_callback_input.py +++ b/litellm/tests/test_custom_callback_input.py @@ -600,6 +600,81 @@ async def test_async_chat_sagemaker_stream(): pytest.fail(f"An exception occurred: {str(e)}") +## Test Vertex AI + Async +import json +import tempfile + + +def load_vertex_ai_credentials(): + # Define the path to the vertex_key.json file + print("loading vertex ai credentials") + filepath = os.path.dirname(os.path.abspath(__file__)) + vertex_key_path = filepath + "/vertex_key.json" + + # Read the existing content of the file or create an empty dictionary + try: + with open(vertex_key_path, "r") as file: + # Read the file content + print("Read vertexai file path") + content = file.read() + + # If the file is empty or not valid JSON, create an empty dictionary + if not content or not content.strip(): + service_account_key_data = {} + else: + # Attempt to load the existing JSON content + file.seek(0) + service_account_key_data = json.load(file) + except FileNotFoundError: + # If the file doesn't exist, create an empty dictionary + service_account_key_data = {} + + # Update the service_account_key_data with environment variables + private_key_id = os.environ.get("VERTEX_AI_PRIVATE_KEY_ID", "") + private_key = os.environ.get("VERTEX_AI_PRIVATE_KEY", "") + private_key = private_key.replace("\\n", "\n") + service_account_key_data["private_key_id"] = private_key_id + service_account_key_data["private_key"] = private_key + + # Create a temporary file + with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file: + # Write the updated content to the temporary file + json.dump(service_account_key_data, temp_file, indent=2) + + # Export the temporary file as GOOGLE_APPLICATION_CREDENTIALS + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.abspath(temp_file.name) + + +@pytest.mark.asyncio +async def test_async_chat_vertex_ai_stream(): + try: + load_vertex_ai_credentials() + customHandler = CompletionCustomHandler() + litellm.callbacks = [customHandler] + # test streaming + response = await litellm.acompletion( + model="gemini-pro", + messages=[ + { + "role": "user", + "content": f"Hi 👋 - i'm async vertex_ai {uuid.uuid4()}", + } + ], + stream=True, + ) + print(f"response: {response}") + async for chunk in response: + print(f"chunk: {chunk}") + continue + print(f"customHandler.states: {customHandler.states}") + assert ( + customHandler.states.count("async_success") == 1 + ) # pre, post, success, pre, post, failure + assert len(customHandler.states) >= 3 # pre, post, success + except Exception as e: + pytest.fail(f"An exception occurred: {str(e)}") + + # Text Completion