fix(ollama.py): fix ollama async streaming for /completions calls

This commit is contained in:
Krrish Dholakia 2023-12-15 09:28:18 -08:00
parent c9fb4ba88c
commit cab870f73a
3 changed files with 70 additions and 50 deletions

View file

@ -5626,6 +5626,30 @@ class CustomStreamWrapper:
traceback.print_exc()
return ""
def handle_ollama_stream(self, chunk):
try:
json_chunk = json.loads(chunk)
if "error" in json_chunk:
raise Exception(f"Ollama Error - {json_chunk}")
text = ""
is_finished = False
finish_reason = None
if json_chunk["done"] == True:
text = ""
is_finished = True
finish_reason = "stop"
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
elif json_chunk["response"]:
print_verbose(f"delta content: {json_chunk}")
text = json_chunk["response"]
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
else:
raise Exception(f"Ollama Error - {json_chunk}")
except Exception as e:
raise e
def handle_bedrock_stream(self, chunk):
if hasattr(chunk, "get"):
chunk = chunk.get('chunk')
@ -5800,9 +5824,11 @@ class CustomStreamWrapper:
self.completion_stream = self.completion_stream[chunk_size:]
time.sleep(0.05)
elif self.custom_llm_provider == "ollama":
if "error" in chunk:
exception_type(model=self.model, custom_llm_provider=self.custom_llm_provider, original_exception=chunk["error"])
completion_obj = chunk
response_obj = self.handle_ollama_stream(chunk)
completion_obj["content"] = response_obj["text"]
print_verbose(f"completion obj content: {completion_obj['content']}")
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "text-completion-openai":
response_obj = self.handle_openai_text_completion_chunk(chunk)
completion_obj["content"] = response_obj["text"]
@ -5894,7 +5920,7 @@ class CustomStreamWrapper:
## needs to handle the empty string case (even starting chunk can be an empty string)
def __next__(self):
try:
while True:
while True:
if isinstance(self.completion_stream, str) or isinstance(self.completion_stream, bytes):
chunk = self.completion_stream
else:
@ -5912,7 +5938,7 @@ class CustomStreamWrapper:
except StopIteration:
raise # Re-raise StopIteration
except Exception as e:
print_verbose(f"HITS AN ERROR: {str(e)}")
print_verbose(f"HITS AN ERROR: {str(e)}\n\n {traceback.format_exc()}")
traceback_exception = traceback.format_exc()
# LOG FAILURE - handle streaming failure logging in the _next_ object, remove `handle_failure` once it's deprecated
threading.Thread(target=self.logging_obj.failure_handler, args=(e, traceback_exception)).start()
@ -5969,17 +5995,22 @@ class TextCompletionStreamWrapper:
return self
def convert_to_text_completion_object(self, chunk: ModelResponse):
response = TextCompletionResponse()
response["id"] = chunk.get("id", None)
response["object"] = "text_completion"
response["created"] = response.get("created", None)
response["model"] = response.get("model", None)
text_choices = TextChoices()
text_choices["text"] = chunk["choices"][0]["delta"]["content"]
text_choices["index"] = response["choices"][0]["index"]
text_choices["finish_reason"] = response["choices"][0]["finish_reason"]
response["choices"] = [text_choices]
return response
try:
response = TextCompletionResponse()
response["id"] = chunk.get("id", None)
response["object"] = "text_completion"
response["created"] = response.get("created", None)
response["model"] = response.get("model", None)
text_choices = TextChoices()
if isinstance(chunk, Choices): # chunk should always be of type StreamingChoices
raise Exception
text_choices["text"] = chunk["choices"][0]["delta"]["content"]
text_choices["index"] = response["choices"][0]["index"]
text_choices["finish_reason"] = response["choices"][0]["finish_reason"]
response["choices"] = [text_choices]
return response
except Exception as e:
raise Exception(f"Error occurred converting to text completion object - chunk: {chunk}; Error: {str(e)}")
def __next__(self):
# model_response = ModelResponse(stream=True, model=self.model)