fix exception mapping for streaming

This commit is contained in:
Krrish Dholakia 2023-09-23 15:04:34 -07:00
parent f984e5f380
commit 889679a0dd
8 changed files with 766 additions and 100 deletions

View file

@ -2,6 +2,7 @@ import sys
import dotenv, json, traceback, threading
import subprocess, os
import litellm, openai
import itertools
import random, uuid, requests
import datetime, time
import tiktoken
@ -1915,7 +1916,6 @@ def exception_type(
):
global user_logger_fn, liteDebuggerClient
exception_mapping_worked = False
if litellm.set_verbose == True:
litellm.error_logs['EXCEPTION'] = original_exception
litellm.error_logs['KWARGS'] = completion_kwargs
@ -1970,7 +1970,7 @@ def exception_type(
exception_type = type(original_exception).__name__
else:
exception_type = ""
if "claude" in model: # one of the anthropics
if custom_llm_provider == "anthropic": # one of the anthropics
if hasattr(original_exception, "message"):
if "prompt is too long" in original_exception.message:
exception_mapping_worked = True
@ -1979,6 +1979,13 @@ def exception_type(
model=model,
llm_provider="anthropic"
)
if "Invalid API Key" in original_exception.message:
exception_mapping_worked = True
raise AuthenticationError(
message=original_exception.message,
model=model,
llm_provider="anthropic"
)
if hasattr(original_exception, "status_code"):
print_verbose(f"status_code: {original_exception.status_code}")
if original_exception.status_code == 401:
@ -2031,7 +2038,7 @@ def exception_type(
llm_provider="anthropic",
model=model
)
elif "replicate" in model:
elif custom_llm_provider == "replicate":
if "Incorrect authentication token" in error_str:
exception_mapping_worked = True
raise AuthenticationError(
@ -2068,7 +2075,7 @@ def exception_type(
llm_provider="replicate",
model=model
)
elif original_exception.status_code == 400:
elif original_exception.status_code == 400 or original_exception.status_code == 422:
exception_mapping_worked = True
raise InvalidRequestError(
message=f"ReplicateException - {original_exception.message}",
@ -2110,7 +2117,31 @@ def exception_type(
llm_provider="replicate",
model=model
)
elif model in litellm.cohere_models or custom_llm_provider == "cohere": # Cohere
elif custom_llm_provider == "bedrock":
if "Unable to locate credentials" in error_str:
exception_mapping_worked = True
raise InvalidRequestError(
message=f"BedrockException - {error_str}",
model=model,
llm_provider="bedrock"
)
elif custom_llm_provider == "sagemaker":
if "Unable to locate credentials" in error_str:
exception_mapping_worked = True
raise InvalidRequestError(
message=f"SagemakerException - {error_str}",
model=model,
llm_provider="sagemaker"
)
elif custom_llm_provider == "vertex_ai":
if "Vertex AI API has not been used in project" in error_str or "Unable to find your project" in error_str:
exception_mapping_worked = True
raise InvalidRequestError(
message=f"VertexAIException - {error_str}",
model=model,
llm_provider="vertex_ai"
)
elif custom_llm_provider == "cohere": # Cohere
if (
"invalid api token" in error_str
or "No API key provided." in error_str
@ -2184,6 +2215,13 @@ def exception_type(
model=model,
llm_provider="huggingface"
)
elif "A valid user token is required" in error_str:
exception_mapping_worked = True
raise InvalidRequestError(
message=error_str,
llm_provider="huggingface",
model=model
)
if hasattr(original_exception, "status_code"):
if original_exception.status_code == 401:
exception_mapping_worked = True
@ -2221,6 +2259,8 @@ def exception_type(
llm_provider="huggingface",
model=model
)
exception_mapping_worked = True
raise APIError(status_code=500, message=error_str, model=model, llm_provider=custom_llm_provider)
elif custom_llm_provider == "ai21":
if hasattr(original_exception, "message"):
if "Prompt has too many tokens" in original_exception.message:
@ -2230,6 +2270,13 @@ def exception_type(
model=model,
llm_provider="ai21"
)
if "Bad or missing API token." in original_exception.message:
exception_mapping_worked = True
raise InvalidRequestError(
message=f"AI21Exception - {original_exception.message}",
model=model,
llm_provider="ai21"
)
if hasattr(original_exception, "status_code"):
if original_exception.status_code == 401:
exception_mapping_worked = True
@ -2266,7 +2313,7 @@ def exception_type(
llm_provider="ai21",
model=model
)
elif model in litellm.nlp_cloud_models or custom_llm_provider == "nlp_cloud":
elif custom_llm_provider == "nlp_cloud":
if "detail" in error_str:
if "Input text length should not exceed" in error_str:
exception_mapping_worked = True
@ -2342,6 +2389,7 @@ def exception_type(
model=model
)
elif custom_llm_provider == "together_ai":
import json
error_response = json.loads(error_str)
if "error" in error_response and "`inputs` tokens + `max_new_tokens` must be <=" in error_response["error"]:
exception_mapping_worked = True
@ -2364,6 +2412,13 @@ def exception_type(
model=model,
llm_provider="together_ai"
)
elif "error" in error_response and "API key doesn't match expected format." in error_response["error"]:
exception_mapping_worked = True
raise InvalidRequestError(
message=f"TogetherAIException - {error_response['error']}",
model=model,
llm_provider="together_ai"
)
elif "error_type" in error_response and error_response["error_type"] == "validation":
exception_mapping_worked = True
raise InvalidRequestError(
@ -2393,7 +2448,7 @@ def exception_type(
llm_provider="together_ai",
model=model
)
elif model in litellm.aleph_alpha_models:
elif custom_llm_provider == "aleph_alpha":
if "This is longer than the model's maximum context length" in error_str:
exception_mapping_worked = True
raise ContextWindowExceededError(
@ -2401,6 +2456,13 @@ def exception_type(
llm_provider="aleph_alpha",
model=model
)
elif "InvalidToken" in error_str or "No token provided" in error_str:
exception_mapping_worked = True
raise InvalidRequestError(
message=f"AlephAlphaException - {original_exception.message}",
llm_provider="aleph_alpha",
model=model
)
elif hasattr(original_exception, "status_code"):
print(f"status code: {original_exception.status_code}")
if original_exception.status_code == 401:
@ -2445,7 +2507,8 @@ def exception_type(
elif custom_llm_provider == "ollama":
if "no attribute 'async_get_ollama_response_stream" in error_str:
raise ImportError("Import error - trying to use async for ollama. import async_generator failed. Try 'pip install async_generator'")
raise original_exception
exception_mapping_worked = True
raise APIError(status_code=500, message=str(original_exception), llm_provider=custom_llm_provider, model=model)
except Exception as e:
# LOGGING
exception_logging(
@ -2563,6 +2626,7 @@ class CustomStreamWrapper:
self.logging_obj = logging_obj
self.completion_stream = completion_stream
self.sent_first_chunk = False
self.sent_last_chunk = False
if self.logging_obj:
# Log the type of the received item
self.logging_obj.post_call(str(type(completion_stream)))
@ -2579,41 +2643,71 @@ class CustomStreamWrapper:
def handle_anthropic_chunk(self, chunk):
str_line = chunk.decode("utf-8") # Convert bytes to string
print(f"str_line: {str_line}")
text = ""
is_finished = False
finish_reason = None
if str_line.startswith("data:"):
data_json = json.loads(str_line[5:])
return data_json.get("completion", "")
return ""
text = data_json.get("completion", "")
if data_json.get("stop_reason", None):
is_finished = True
finish_reason = data_json["stop_reason"]
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
elif "error" in str_line:
raise ValueError(f"Unable to parse response. Original response: {str_line}")
else:
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
def handle_together_ai_chunk(self, chunk):
chunk = chunk.decode("utf-8")
text_index = chunk.find('"text":"') # this checks if text: exists
text_start = text_index + len('"text":"')
text_end = chunk.find('"}', text_start)
if text_index != -1 and text_end != -1:
extracted_text = chunk[text_start:text_end]
return extracted_text
text = ""
is_finished = False
finish_reason = None
if "text" in chunk:
text_index = chunk.find('"text":"') # this checks if text: exists
text_start = text_index + len('"text":"')
text_end = chunk.find('"}', text_start)
if text_index != -1 and text_end != -1:
extracted_text = chunk[text_start:text_end]
text = extracted_text
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
elif "[DONE]" in chunk:
return {"text": text, "is_finished": True, "finish_reason": "stop"}
elif "error" in chunk:
raise ValueError(chunk)
else:
return ""
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
def handle_huggingface_chunk(self, chunk):
chunk = chunk.decode("utf-8")
text = ""
is_finished = False
finish_reason = ""
if chunk.startswith("data:"):
data_json = json.loads(chunk[5:])
print(f"data json: {data_json}")
if "token" in data_json and "text" in data_json["token"]:
text = data_json["token"]["text"]
if "meta-llama/Llama-2" in self.model: #clean eos tokens like </s> from the returned output text
if any(token in text for token in llama_2_special_tokens):
text = text.replace("<s>", "").replace("</s>", "")
return text
else:
return ""
return ""
if data_json.get("details", False) and data_json["details"].get("finish_reason", False):
is_finished = True
finish_reason = data_json["details"]["finish_reason"]
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
elif "error" in chunk:
raise ValueError(chunk)
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
def handle_ai21_chunk(self, chunk):
def handle_ai21_chunk(self, chunk): # fake streaming
chunk = chunk.decode("utf-8")
data_json = json.loads(chunk)
try:
return data_json["completions"][0]["data"]["text"]
text = data_json["completions"][0]["data"]["text"]
is_finished = True
finish_reason = "stop"
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
except:
raise ValueError(f"Unable to parse response. Original response: {chunk}")
@ -2621,8 +2715,10 @@ class CustomStreamWrapper:
chunk = chunk.decode("utf-8")
data_json = json.loads(chunk)
try:
print(f"data json: {data_json}")
return data_json["generated_text"]
text = data_json["generated_text"]
is_finished = True
finish_reason = "stop"
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
except:
raise ValueError(f"Unable to parse response. Original response: {chunk}")
@ -2630,7 +2726,10 @@ class CustomStreamWrapper:
chunk = chunk.decode("utf-8")
data_json = json.loads(chunk)
try:
return data_json["completions"][0]["completion"]
text = data_json["completions"][0]["completion"]
is_finished = True
finish_reason = "stop"
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
except:
raise ValueError(f"Unable to parse response. Original response: {chunk}")
@ -2638,7 +2737,35 @@ class CustomStreamWrapper:
chunk = chunk.decode("utf-8")
data_json = json.loads(chunk)
try:
return data_json["text"]
text = ""
is_finished = False
finish_reason = ""
if "text" in data_json:
text = data_json["text"]
elif "is_finished" in data_json:
is_finished = data_json["is_finished"]
finish_reason = data_json["finish_reason"]
else:
raise Exception(data_json)
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
except:
raise ValueError(f"Unable to parse response. Original response: {chunk}")
def handle_replicate_chunk(self, chunk):
print(f"chunk: {chunk}")
try:
text = ""
is_finished = False
finish_reason = ""
if "output" in chunk:
text = chunk['output']
if "status" in chunk:
if chunk["status"] == "succeeded":
is_finished = True
finish_reason = "stop"
elif chunk.get("error", None):
raise Exception(chunk["error"])
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
except:
raise ValueError(f"Unable to parse response. Original response: {chunk}")
@ -2683,13 +2810,21 @@ class CustomStreamWrapper:
traceback.print_exc()
return ""
def handle_bedrock_stream(self):
if self.completion_stream:
event = next(self.completion_stream)
chunk = event.get('chunk')
if chunk:
chunk_data = json.loads(chunk.get('bytes').decode())
return chunk_data['outputText']
def handle_bedrock_stream(self, chunk):
chunk = chunk.get('chunk')
if chunk:
chunk_data = json.loads(chunk.get('bytes').decode())
text = ""
is_finished = False
finish_reason = ""
if "outputText" in chunk_data:
text = chunk_data['outputText']
if chunk_data.get("completionReason", None):
is_finished = True
finish_reason = chunk_data["completionReason"]
elif chunk.get("error", None):
raise Exception(chunk["error"])
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
return ""
## needs to handle the empty string case (even starting chunk can be an empty string)
@ -2701,49 +2836,94 @@ class CustomStreamWrapper:
completion_obj = {"content": ""}
if self.custom_llm_provider and self.custom_llm_provider == "anthropic":
chunk = next(self.completion_stream)
completion_obj["content"] = self.handle_anthropic_chunk(chunk)
response_obj = self.handle_anthropic_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.model == "replicate" or self.custom_llm_provider == "replicate":
chunk = next(self.completion_stream)
completion_obj["content"] = chunk
response_obj = self.handle_replicate_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif (
self.custom_llm_provider and self.custom_llm_provider == "together_ai"):
chunk = next(self.completion_stream)
text_data = self.handle_together_ai_chunk(chunk)
if text_data == "":
return self.__next__()
completion_obj["content"] = text_data
response_obj = self.handle_together_ai_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "huggingface":
chunk = next(self.completion_stream)
completion_obj["content"] = self.handle_huggingface_chunk(chunk)
response_obj = self.handle_huggingface_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "baseten": # baseten doesn't provide streaming
chunk = next(self.completion_stream)
completion_obj["content"] = self.handle_baseten_chunk(chunk)
elif self.custom_llm_provider and self.custom_llm_provider == "ai21": #ai21 doesn't provide streaming
chunk = next(self.completion_stream)
completion_obj["content"] = self.handle_ai21_chunk(chunk)
response_obj = self.handle_ai21_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "vllm":
chunk = next(self.completion_stream)
completion_obj["content"] = chunk[0].outputs[0].text
elif self.custom_llm_provider and self.custom_llm_provider == "aleph-alpha": #aleph alpha doesn't provide streaming
elif self.custom_llm_provider and self.custom_llm_provider == "aleph_alpha": #aleph alpha doesn't provide streaming
chunk = next(self.completion_stream)
completion_obj["content"] = self.handle_aleph_alpha_chunk(chunk)
response_obj = self.handle_aleph_alpha_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "text-completion-openai":
chunk = next(self.completion_stream)
completion_obj["content"] = self.handle_openai_text_completion_chunk(chunk)
elif self.model in litellm.nlp_cloud_models or self.custom_llm_provider == "nlp_cloud":
chunk = next(self.completion_stream)
completion_obj["content"] = self.handle_nlp_cloud_chunk(chunk)
elif self.model in (litellm.vertex_chat_models + litellm.vertex_code_chat_models + litellm.vertex_text_models + litellm.vertex_code_text_models):
chunk = next(self.completion_stream)
completion_obj["content"] = str(chunk)
try:
chunk = next(self.completion_stream)
response_obj = self.handle_nlp_cloud_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"]
except Exception as e:
if self.sent_last_chunk:
raise e
else:
if self.sent_first_chunk is False:
raise Exception("An unknown error occurred with the stream")
model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True
elif self.custom_llm_provider and self.custom_llm_provider == "vertex_ai":
try:
chunk = next(self.completion_stream)
completion_obj["content"] = str(chunk)
except StopIteration as e:
if self.sent_last_chunk:
raise e
else:
model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True
elif self.custom_llm_provider == "cohere":
chunk = next(self.completion_stream)
completion_obj["content"] = self.handle_cohere_chunk(chunk)
response_obj = self.handle_cohere_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "bedrock":
completion_obj["content"] = self.handle_bedrock_stream()
chunk = next(self.completion_stream)
response_obj = self.handle_bedrock_stream(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "sagemaker":
if len(self.completion_stream)==0:
raise StopIteration
if self.sent_last_chunk:
raise StopIteration
else:
model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True
chunk_size = 30
new_chunk = self.completion_stream[:chunk_size]
completion_obj["content"] = new_chunk
@ -2765,11 +2945,13 @@ class CustomStreamWrapper:
self.sent_first_chunk = True
model_response.choices[0].delta = Delta(**completion_obj)
return model_response
elif model_response.choices[0].finish_reason:
return model_response
except StopIteration:
raise StopIteration
except Exception as e:
model_response.choices[0].finish_reason = "stop"
return model_response
except Exception as e:
e.message = str(e)
return exception_type(model=self.model, custom_llm_provider=self.custom_llm_provider, original_exception=e)
async def __anext__(self):
try:
@ -2796,7 +2978,6 @@ def read_config_args(config_path) -> dict:
# read keys/ values from config file and return them
return config
except Exception as e:
print("An error occurred while reading config:", str(e))
raise e
########## experimental completion variants ############################
@ -2899,7 +3080,6 @@ def get_model_split_test(models, completion_call_id):
try:
# make the api call
last_fetched_at = time.time()
print(f"last_fetched_at: {last_fetched_at}")
response = requests.post(
#http://api.litellm.ai
url="http://api.litellm.ai/get_model_split_test", # get the updated dict from table or update the table with the dict