diff --git a/litellm/tests/test_completion.py b/litellm/tests/test_completion.py index 8d97d6aa8e..c397f6581b 100644 --- a/litellm/tests/test_completion.py +++ b/litellm/tests/test_completion.py @@ -733,7 +733,7 @@ def test_completion_azure_deployment_id(): print(response) except Exception as e: pytest.fail(f"Error occurred: {e}") -test_completion_azure_deployment_id() +# test_completion_azure_deployment_id() # Only works for local endpoint # def test_completion_anthropic_openai_proxy(): @@ -1169,7 +1169,7 @@ def test_mistral_anyscale_stream(): for chunk in response: # print(chunk) print(chunk["choices"][0]["delta"].get("content", ""), end="") -# test_mistral_anyscale_stream() +test_mistral_anyscale_stream() # test_completion_anyscale_2() # def test_completion_with_fallbacks_multiple_keys(): # print(f"backup key 1: {os.getenv('BACKUP_OPENAI_API_KEY_1')}") diff --git a/litellm/utils.py b/litellm/utils.py index 66ec0c5dfc..53c7e7117f 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -4113,166 +4113,164 @@ class CustomStreamWrapper: def chunk_creator(self, chunk): model_response = ModelResponse(stream=True, model=self.model) try: - while True: # loop until a non-empty string is found - # return this for all models - completion_obj = {"content": ""} - if self.custom_llm_provider and self.custom_llm_provider == "anthropic": - response_obj = self.handle_anthropic_chunk(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.model == "replicate" or self.custom_llm_provider == "replicate": - response_obj = self.handle_replicate_chunk(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif ( - self.custom_llm_provider and self.custom_llm_provider == "together_ai"): - response_obj = self.handle_together_ai_chunk(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.custom_llm_provider and self.custom_llm_provider == "huggingface": - response_obj = self.handle_huggingface_chunk(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.custom_llm_provider and self.custom_llm_provider == "baseten": # baseten doesn't provide streaming - completion_obj["content"] = self.handle_baseten_chunk(chunk) - elif self.custom_llm_provider and self.custom_llm_provider == "ai21": #ai21 doesn't provide streaming - response_obj = self.handle_ai21_chunk(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.custom_llm_provider and self.custom_llm_provider == "azure": - response_obj = self.handle_azure_chunk(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.custom_llm_provider and self.custom_llm_provider == "maritalk": - response_obj = self.handle_maritalk_chunk(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.custom_llm_provider and self.custom_llm_provider == "vllm": - completion_obj["content"] = chunk[0].outputs[0].text - elif self.custom_llm_provider and self.custom_llm_provider == "aleph_alpha": #aleph alpha doesn't provide streaming - response_obj = self.handle_aleph_alpha_chunk(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.model in litellm.nlp_cloud_models or self.custom_llm_provider == "nlp_cloud": - try: + + # return this for all models + completion_obj = {"content": ""} + if self.custom_llm_provider and self.custom_llm_provider == "anthropic": + response_obj = self.handle_anthropic_chunk(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + elif self.model == "replicate" or self.custom_llm_provider == "replicate": + response_obj = self.handle_replicate_chunk(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + elif ( + self.custom_llm_provider and self.custom_llm_provider == "together_ai"): + response_obj = self.handle_together_ai_chunk(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + elif self.custom_llm_provider and self.custom_llm_provider == "huggingface": + response_obj = self.handle_huggingface_chunk(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + elif self.custom_llm_provider and self.custom_llm_provider == "baseten": # baseten doesn't provide streaming + completion_obj["content"] = self.handle_baseten_chunk(chunk) + elif self.custom_llm_provider and self.custom_llm_provider == "ai21": #ai21 doesn't provide streaming + response_obj = self.handle_ai21_chunk(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + elif self.custom_llm_provider and self.custom_llm_provider == "azure": + response_obj = self.handle_azure_chunk(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + elif self.custom_llm_provider and self.custom_llm_provider == "maritalk": + response_obj = self.handle_maritalk_chunk(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + elif self.custom_llm_provider and self.custom_llm_provider == "vllm": + completion_obj["content"] = chunk[0].outputs[0].text + elif self.custom_llm_provider and self.custom_llm_provider == "aleph_alpha": #aleph alpha doesn't provide streaming + response_obj = self.handle_aleph_alpha_chunk(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + elif self.model in litellm.nlp_cloud_models or self.custom_llm_provider == "nlp_cloud": + try: - response_obj = self.handle_nlp_cloud_chunk(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - except Exception as e: - if self.sent_last_chunk: - raise e - else: - if self.sent_first_chunk is False: - raise Exception("An unknown error occurred with the stream") - model_response.choices[0].finish_reason = "stop" - self.sent_last_chunk = True - elif self.custom_llm_provider and self.custom_llm_provider == "vertex_ai": - try: + response_obj = self.handle_nlp_cloud_chunk(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + except Exception as e: + if self.sent_last_chunk: + raise e + else: + if self.sent_first_chunk is False: + raise Exception("An unknown error occurred with the stream") + model_response.choices[0].finish_reason = "stop" + self.sent_last_chunk = True + elif self.custom_llm_provider and self.custom_llm_provider == "vertex_ai": + try: - completion_obj["content"] = str(chunk) - except StopIteration as e: - if self.sent_last_chunk: - raise e - else: - model_response.choices[0].finish_reason = "stop" - self.sent_last_chunk = True - elif self.custom_llm_provider == "cohere": - response_obj = self.handle_cohere_chunk(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.custom_llm_provider == "bedrock": - response_obj = self.handle_bedrock_stream(chunk) - completion_obj["content"] = response_obj["text"] - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.custom_llm_provider == "sagemaker": - if len(self.completion_stream)==0: - if self.sent_last_chunk: - raise StopIteration - else: - model_response.choices[0].finish_reason = "stop" - self.sent_last_chunk = True - chunk_size = 30 - new_chunk = self.completion_stream[:chunk_size] - completion_obj["content"] = new_chunk - self.completion_stream = self.completion_stream[chunk_size:] - time.sleep(0.05) - elif self.custom_llm_provider == "petals": - if len(self.completion_stream)==0: - if self.sent_last_chunk: - raise StopIteration - else: - model_response.choices[0].finish_reason = "stop" - self.sent_last_chunk = True - chunk_size = 30 - new_chunk = self.completion_stream[:chunk_size] - completion_obj["content"] = new_chunk - self.completion_stream = self.completion_stream[chunk_size:] - time.sleep(0.05) - elif self.custom_llm_provider == "palm": - # fake streaming - if len(self.completion_stream)==0: - if self.sent_last_chunk: - raise StopIteration - else: - model_response.choices[0].finish_reason = "stop" - self.sent_last_chunk = True - chunk_size = 30 - new_chunk = self.completion_stream[:chunk_size] - completion_obj["content"] = new_chunk - self.completion_stream = self.completion_stream[chunk_size:] - time.sleep(0.05) - elif self.custom_llm_provider == "ollama": - if "error" in chunk: - exception_type(model=self.model, custom_llm_provider=self.custom_llm_provider, original_exception=chunk["error"]) - completion_obj = chunk - elif self.custom_llm_provider == "openai": - response_obj = self.handle_openai_chat_completion_chunk(chunk) - completion_obj["content"] = response_obj["text"] - print_verbose(f"completion obj content: {completion_obj['content']}") - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.custom_llm_provider == "text-completion-openai": - response_obj = self.handle_openai_text_completion_chunk(chunk) - completion_obj["content"] = response_obj["text"] - print_verbose(f"completion obj content: {completion_obj['content']}") - if response_obj["is_finished"]: - model_response.choices[0].finish_reason = response_obj["finish_reason"] - else: # openai chat/azure models - raise Exception("Unmapped Model Error") - - model_response.model = self.model - if len(completion_obj["content"]) > 0: # cannot set content of an OpenAI Object to be an empty string - hold, model_response_str = self.check_special_tokens(completion_obj["content"]) - if hold is False: - completion_obj["content"] = model_response_str - if self.sent_first_chunk == False: - completion_obj["role"] = "assistant" - self.sent_first_chunk = True - model_response.choices[0].delta = Delta(**completion_obj) - # LOGGING - threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start() - return model_response - else: - return - elif model_response.choices[0].finish_reason: - model_response.choices[0].finish_reason = map_finish_reason(model_response.choices[0].finish_reason) # ensure consistent output to openai + completion_obj["content"] = str(chunk) + except StopIteration as e: + if self.sent_last_chunk: + raise e + else: + model_response.choices[0].finish_reason = "stop" + self.sent_last_chunk = True + elif self.custom_llm_provider == "cohere": + response_obj = self.handle_cohere_chunk(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + elif self.custom_llm_provider == "bedrock": + response_obj = self.handle_bedrock_stream(chunk) + completion_obj["content"] = response_obj["text"] + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + elif self.custom_llm_provider == "sagemaker": + if len(self.completion_stream)==0: + if self.sent_last_chunk: + raise StopIteration + else: + model_response.choices[0].finish_reason = "stop" + self.sent_last_chunk = True + chunk_size = 30 + new_chunk = self.completion_stream[:chunk_size] + completion_obj["content"] = new_chunk + self.completion_stream = self.completion_stream[chunk_size:] + time.sleep(0.05) + elif self.custom_llm_provider == "petals": + if len(self.completion_stream)==0: + if self.sent_last_chunk: + raise StopIteration + else: + model_response.choices[0].finish_reason = "stop" + self.sent_last_chunk = True + chunk_size = 30 + new_chunk = self.completion_stream[:chunk_size] + completion_obj["content"] = new_chunk + self.completion_stream = self.completion_stream[chunk_size:] + time.sleep(0.05) + elif self.custom_llm_provider == "palm": + # fake streaming + if len(self.completion_stream)==0: + if self.sent_last_chunk: + raise StopIteration + else: + model_response.choices[0].finish_reason = "stop" + self.sent_last_chunk = True + chunk_size = 30 + new_chunk = self.completion_stream[:chunk_size] + completion_obj["content"] = new_chunk + self.completion_stream = self.completion_stream[chunk_size:] + time.sleep(0.05) + elif self.custom_llm_provider == "ollama": + if "error" in chunk: + exception_type(model=self.model, custom_llm_provider=self.custom_llm_provider, original_exception=chunk["error"]) + completion_obj = chunk + elif self.custom_llm_provider == "text-completion-openai": + response_obj = self.handle_openai_text_completion_chunk(chunk) + completion_obj["content"] = response_obj["text"] + print_verbose(f"completion obj content: {completion_obj['content']}") + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + else: # openai chat model + response_obj = self.handle_openai_chat_completion_chunk(chunk) + completion_obj["content"] = response_obj["text"] + print_verbose(f"completion obj content: {completion_obj['content']}") + if response_obj["is_finished"]: + model_response.choices[0].finish_reason = response_obj["finish_reason"] + + model_response.model = self.model + if len(completion_obj["content"]) > 0: # cannot set content of an OpenAI Object to be an empty string + hold, model_response_str = self.check_special_tokens(completion_obj["content"]) + if hold is False: + completion_obj["content"] = model_response_str + if self.sent_first_chunk == False: + completion_obj["role"] = "assistant" + self.sent_first_chunk = True + model_response.choices[0].delta = Delta(**completion_obj) # LOGGING threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start() return model_response else: - return + return + elif model_response.choices[0].finish_reason: + model_response.choices[0].finish_reason = map_finish_reason(model_response.choices[0].finish_reason) # ensure consistent output to openai + # LOGGING + threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start() + return model_response + else: + return except StopIteration: raise StopIteration except Exception as e: @@ -4284,8 +4282,14 @@ class CustomStreamWrapper: ## needs to handle the empty string case (even starting chunk can be an empty string) def __next__(self): - chunk = next(self.completion_stream) - return self.chunk_creator(chunk=chunk) + while True: # loop until a non-empty string is found + if isinstance(self.completion_stream, str): + chunk = self.completion_stream + else: + chunk = next(self.completion_stream) + response = self.chunk_creator(chunk=chunk) + if response is not None: + return response async def __anext__(self): try: