fix(utils.py): fix sync streaming

This commit is contained in:
Krrish Dholakia 2023-11-09 18:47:11 -08:00
parent 76d8ea674e
commit 3d4c5e10a7
2 changed files with 162 additions and 158 deletions

View file

@ -733,7 +733,7 @@ def test_completion_azure_deployment_id():
print(response) print(response)
except Exception as e: except Exception as e:
pytest.fail(f"Error occurred: {e}") pytest.fail(f"Error occurred: {e}")
test_completion_azure_deployment_id() # test_completion_azure_deployment_id()
# Only works for local endpoint # Only works for local endpoint
# def test_completion_anthropic_openai_proxy(): # def test_completion_anthropic_openai_proxy():
@ -1169,7 +1169,7 @@ def test_mistral_anyscale_stream():
for chunk in response: for chunk in response:
# print(chunk) # print(chunk)
print(chunk["choices"][0]["delta"].get("content", ""), end="") print(chunk["choices"][0]["delta"].get("content", ""), end="")
# test_mistral_anyscale_stream() test_mistral_anyscale_stream()
# test_completion_anyscale_2() # test_completion_anyscale_2()
# def test_completion_with_fallbacks_multiple_keys(): # def test_completion_with_fallbacks_multiple_keys():
# print(f"backup key 1: {os.getenv('BACKUP_OPENAI_API_KEY_1')}") # print(f"backup key 1: {os.getenv('BACKUP_OPENAI_API_KEY_1')}")

View file

@ -4113,166 +4113,164 @@ class CustomStreamWrapper:
def chunk_creator(self, chunk): def chunk_creator(self, chunk):
model_response = ModelResponse(stream=True, model=self.model) model_response = ModelResponse(stream=True, model=self.model)
try: try:
while True: # loop until a non-empty string is found
# return this for all models # return this for all models
completion_obj = {"content": ""} completion_obj = {"content": ""}
if self.custom_llm_provider and self.custom_llm_provider == "anthropic": if self.custom_llm_provider and self.custom_llm_provider == "anthropic":
response_obj = self.handle_anthropic_chunk(chunk) response_obj = self.handle_anthropic_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.model == "replicate" or self.custom_llm_provider == "replicate": elif self.model == "replicate" or self.custom_llm_provider == "replicate":
response_obj = self.handle_replicate_chunk(chunk) response_obj = self.handle_replicate_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif ( elif (
self.custom_llm_provider and self.custom_llm_provider == "together_ai"): self.custom_llm_provider and self.custom_llm_provider == "together_ai"):
response_obj = self.handle_together_ai_chunk(chunk) response_obj = self.handle_together_ai_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "huggingface": elif self.custom_llm_provider and self.custom_llm_provider == "huggingface":
response_obj = self.handle_huggingface_chunk(chunk) response_obj = self.handle_huggingface_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "baseten": # baseten doesn't provide streaming elif self.custom_llm_provider and self.custom_llm_provider == "baseten": # baseten doesn't provide streaming
completion_obj["content"] = self.handle_baseten_chunk(chunk) completion_obj["content"] = self.handle_baseten_chunk(chunk)
elif self.custom_llm_provider and self.custom_llm_provider == "ai21": #ai21 doesn't provide streaming elif self.custom_llm_provider and self.custom_llm_provider == "ai21": #ai21 doesn't provide streaming
response_obj = self.handle_ai21_chunk(chunk) response_obj = self.handle_ai21_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "azure": elif self.custom_llm_provider and self.custom_llm_provider == "azure":
response_obj = self.handle_azure_chunk(chunk) response_obj = self.handle_azure_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "maritalk": elif self.custom_llm_provider and self.custom_llm_provider == "maritalk":
response_obj = self.handle_maritalk_chunk(chunk) response_obj = self.handle_maritalk_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "vllm": elif self.custom_llm_provider and self.custom_llm_provider == "vllm":
completion_obj["content"] = chunk[0].outputs[0].text completion_obj["content"] = chunk[0].outputs[0].text
elif self.custom_llm_provider and self.custom_llm_provider == "aleph_alpha": #aleph alpha doesn't provide streaming elif self.custom_llm_provider and self.custom_llm_provider == "aleph_alpha": #aleph alpha doesn't provide streaming
response_obj = self.handle_aleph_alpha_chunk(chunk) response_obj = self.handle_aleph_alpha_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.model in litellm.nlp_cloud_models or self.custom_llm_provider == "nlp_cloud": elif self.model in litellm.nlp_cloud_models or self.custom_llm_provider == "nlp_cloud":
try: try:
response_obj = self.handle_nlp_cloud_chunk(chunk) response_obj = self.handle_nlp_cloud_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
except Exception as e: except Exception as e:
if self.sent_last_chunk: if self.sent_last_chunk:
raise e raise e
else: else:
if self.sent_first_chunk is False: if self.sent_first_chunk is False:
raise Exception("An unknown error occurred with the stream") raise Exception("An unknown error occurred with the stream")
model_response.choices[0].finish_reason = "stop" model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True self.sent_last_chunk = True
elif self.custom_llm_provider and self.custom_llm_provider == "vertex_ai": elif self.custom_llm_provider and self.custom_llm_provider == "vertex_ai":
try: try:
completion_obj["content"] = str(chunk) completion_obj["content"] = str(chunk)
except StopIteration as e: except StopIteration as e:
if self.sent_last_chunk: if self.sent_last_chunk:
raise e raise e
else: else:
model_response.choices[0].finish_reason = "stop" model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True self.sent_last_chunk = True
elif self.custom_llm_provider == "cohere": elif self.custom_llm_provider == "cohere":
response_obj = self.handle_cohere_chunk(chunk) response_obj = self.handle_cohere_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "bedrock": elif self.custom_llm_provider == "bedrock":
response_obj = self.handle_bedrock_stream(chunk) response_obj = self.handle_bedrock_stream(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "sagemaker": elif self.custom_llm_provider == "sagemaker":
if len(self.completion_stream)==0: if len(self.completion_stream)==0:
if self.sent_last_chunk: if self.sent_last_chunk:
raise StopIteration raise StopIteration
else: else:
model_response.choices[0].finish_reason = "stop" model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True self.sent_last_chunk = True
chunk_size = 30 chunk_size = 30
new_chunk = self.completion_stream[:chunk_size] new_chunk = self.completion_stream[:chunk_size]
completion_obj["content"] = new_chunk completion_obj["content"] = new_chunk
self.completion_stream = self.completion_stream[chunk_size:] self.completion_stream = self.completion_stream[chunk_size:]
time.sleep(0.05) time.sleep(0.05)
elif self.custom_llm_provider == "petals": elif self.custom_llm_provider == "petals":
if len(self.completion_stream)==0: if len(self.completion_stream)==0:
if self.sent_last_chunk: if self.sent_last_chunk:
raise StopIteration raise StopIteration
else: else:
model_response.choices[0].finish_reason = "stop" model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True self.sent_last_chunk = True
chunk_size = 30 chunk_size = 30
new_chunk = self.completion_stream[:chunk_size] new_chunk = self.completion_stream[:chunk_size]
completion_obj["content"] = new_chunk completion_obj["content"] = new_chunk
self.completion_stream = self.completion_stream[chunk_size:] self.completion_stream = self.completion_stream[chunk_size:]
time.sleep(0.05) time.sleep(0.05)
elif self.custom_llm_provider == "palm": elif self.custom_llm_provider == "palm":
# fake streaming # fake streaming
if len(self.completion_stream)==0: if len(self.completion_stream)==0:
if self.sent_last_chunk: if self.sent_last_chunk:
raise StopIteration raise StopIteration
else: else:
model_response.choices[0].finish_reason = "stop" model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True self.sent_last_chunk = True
chunk_size = 30 chunk_size = 30
new_chunk = self.completion_stream[:chunk_size] new_chunk = self.completion_stream[:chunk_size]
completion_obj["content"] = new_chunk completion_obj["content"] = new_chunk
self.completion_stream = self.completion_stream[chunk_size:] self.completion_stream = self.completion_stream[chunk_size:]
time.sleep(0.05) time.sleep(0.05)
elif self.custom_llm_provider == "ollama": elif self.custom_llm_provider == "ollama":
if "error" in chunk: if "error" in chunk:
exception_type(model=self.model, custom_llm_provider=self.custom_llm_provider, original_exception=chunk["error"]) exception_type(model=self.model, custom_llm_provider=self.custom_llm_provider, original_exception=chunk["error"])
completion_obj = chunk completion_obj = chunk
elif self.custom_llm_provider == "openai": elif self.custom_llm_provider == "text-completion-openai":
response_obj = self.handle_openai_chat_completion_chunk(chunk) response_obj = self.handle_openai_text_completion_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
print_verbose(f"completion obj content: {completion_obj['content']}") print_verbose(f"completion obj content: {completion_obj['content']}")
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "text-completion-openai": else: # openai chat model
response_obj = self.handle_openai_text_completion_chunk(chunk) response_obj = self.handle_openai_chat_completion_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
print_verbose(f"completion obj content: {completion_obj['content']}") print_verbose(f"completion obj content: {completion_obj['content']}")
if response_obj["is_finished"]: if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"] model_response.choices[0].finish_reason = response_obj["finish_reason"]
else: # openai chat/azure models
raise Exception("Unmapped Model Error") model_response.model = self.model
if len(completion_obj["content"]) > 0: # cannot set content of an OpenAI Object to be an empty string
model_response.model = self.model hold, model_response_str = self.check_special_tokens(completion_obj["content"])
if len(completion_obj["content"]) > 0: # cannot set content of an OpenAI Object to be an empty string if hold is False:
hold, model_response_str = self.check_special_tokens(completion_obj["content"]) completion_obj["content"] = model_response_str
if hold is False: if self.sent_first_chunk == False:
completion_obj["content"] = model_response_str completion_obj["role"] = "assistant"
if self.sent_first_chunk == False: self.sent_first_chunk = True
completion_obj["role"] = "assistant" model_response.choices[0].delta = Delta(**completion_obj)
self.sent_first_chunk = True
model_response.choices[0].delta = Delta(**completion_obj)
# LOGGING
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
return model_response
else:
return
elif model_response.choices[0].finish_reason:
model_response.choices[0].finish_reason = map_finish_reason(model_response.choices[0].finish_reason) # ensure consistent output to openai
# LOGGING # LOGGING
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start() threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
return model_response return model_response
else: else:
return return
elif model_response.choices[0].finish_reason:
model_response.choices[0].finish_reason = map_finish_reason(model_response.choices[0].finish_reason) # ensure consistent output to openai
# LOGGING
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
return model_response
else:
return
except StopIteration: except StopIteration:
raise StopIteration raise StopIteration
except Exception as e: except Exception as e:
@ -4284,8 +4282,14 @@ class CustomStreamWrapper:
## needs to handle the empty string case (even starting chunk can be an empty string) ## needs to handle the empty string case (even starting chunk can be an empty string)
def __next__(self): def __next__(self):
chunk = next(self.completion_stream) while True: # loop until a non-empty string is found
return self.chunk_creator(chunk=chunk) if isinstance(self.completion_stream, str):
chunk = self.completion_stream
else:
chunk = next(self.completion_stream)
response = self.chunk_creator(chunk=chunk)
if response is not None:
return response
async def __anext__(self): async def __anext__(self):
try: try: