Merge branch 'main' into main

This commit is contained in:
Vincelwt 2024-03-30 13:21:53 +09:00 committed by GitHub
commit 1b84dfac91
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
301 changed files with 62646 additions and 3691 deletions

View file

@ -354,7 +354,10 @@ class Choices(OpenAIObject):
if message is None:
self.message = Message(content=None)
else:
self.message = message
if isinstance(message, Message):
self.message = message
elif isinstance(message, dict):
self.message = Message(**message)
if logprobs is not None:
self.logprobs = logprobs
if enhancements is not None:
@ -422,8 +425,11 @@ class StreamingChoices(OpenAIObject):
else:
self.finish_reason = None
self.index = index
if delta:
self.delta = delta
if delta is not None:
if isinstance(delta, Delta):
self.delta = delta
if isinstance(delta, dict):
self.delta = Delta(**delta)
else:
self.delta = Delta()
if enhancements is not None:
@ -491,13 +497,33 @@ class ModelResponse(OpenAIObject):
):
if stream is not None and stream == True:
object = "chat.completion.chunk"
choices = [StreamingChoices()]
if choices is not None and isinstance(choices, list):
new_choices = []
for choice in choices:
if isinstance(choice, StreamingChoices):
_new_choice = choice
elif isinstance(choice, dict):
_new_choice = StreamingChoices(**choice)
new_choices.append(_new_choice)
choices = new_choices
else:
choices = [StreamingChoices()]
else:
if model in litellm.open_ai_embedding_models:
object = "embedding"
else:
object = "chat.completion"
choices = [Choices()]
if choices is not None and isinstance(choices, list):
new_choices = []
for choice in choices:
if isinstance(choice, Choices):
_new_choice = choice
elif isinstance(choice, dict):
_new_choice = Choices(**choice)
new_choices.append(_new_choice)
choices = new_choices
else:
choices = [Choices()]
if id is None:
id = _generate_id()
else:
@ -1475,6 +1501,7 @@ class Logging:
self.langfuse_public_key != langFuseLogger.public_key
and self.langfuse_secret != langFuseLogger.secret_key
):
print_verbose("Instantiates langfuse client")
langFuseLogger = LangFuseLogger(
langfuse_public_key=self.langfuse_public_key,
langfuse_secret=self.langfuse_secret,
@ -1783,16 +1810,14 @@ class Logging:
end_time=end_time,
)
except Exception as e:
verbose_logger.debug(
print_verbose(
f"Error occurred building stream chunk: {traceback.format_exc()}"
)
complete_streaming_response = None
else:
self.streaming_chunks.append(result)
if complete_streaming_response is not None:
verbose_logger.debug(
"Async success callbacks: Got a complete streaming response"
)
print_verbose("Async success callbacks: Got a complete streaming response")
self.model_call_details["async_complete_streaming_response"] = (
complete_streaming_response
)
@ -1833,7 +1858,7 @@ class Logging:
callbacks.append(callback)
else:
callbacks = litellm._async_success_callback
verbose_logger.debug(f"Async success callbacks: {callbacks}")
print_verbose(f"Async success callbacks: {callbacks}")
for callback in callbacks:
# check if callback can run for this request
litellm_params = self.model_call_details.get("litellm_params", {})
@ -1903,10 +1928,6 @@ class Logging:
end_time=end_time,
)
if callable(callback): # custom logger functions
# print_verbose(
# f"Making async function logging call for {callback}, result={result} - {self.model_call_details}",
# logger_only=True,
# )
if self.stream:
if (
"async_complete_streaming_response"
@ -2370,8 +2391,6 @@ def client(original_function):
)
if "logger_fn" in kwargs:
user_logger_fn = kwargs["logger_fn"]
# CRASH REPORTING TELEMETRY
crash_reporting(*args, **kwargs)
# INIT LOGGER - for user-specified integrations
model = args[0] if len(args) > 0 else kwargs.get("model", None)
call_type = original_function.__name__
@ -2458,6 +2477,14 @@ def client(original_function):
)
raise e
def check_coroutine(value) -> bool:
if inspect.iscoroutine(value):
return True
elif inspect.iscoroutinefunction(value):
return True
else:
return False
def post_call_processing(original_response, model):
try:
if original_response is None:
@ -2468,33 +2495,18 @@ def client(original_function):
call_type == CallTypes.completion.value
or call_type == CallTypes.acompletion.value
):
model_response = original_response["choices"][0]["message"][
"content"
]
### POST-CALL RULES ###
rules_obj.post_call_rules(input=model_response, model=model)
is_coroutine = check_coroutine(original_function)
if is_coroutine == True:
pass
else:
model_response = original_response["choices"][0]["message"][
"content"
]
### POST-CALL RULES ###
rules_obj.post_call_rules(input=model_response, model=model)
except Exception as e:
raise e
def crash_reporting(*args, **kwargs):
if litellm.telemetry:
try:
model = args[0] if len(args) > 0 else kwargs["model"]
exception = kwargs["exception"] if "exception" in kwargs else None
custom_llm_provider = (
kwargs["custom_llm_provider"]
if "custom_llm_provider" in kwargs
else None
)
safe_crash_reporting(
model=model,
exception=exception,
custom_llm_provider=custom_llm_provider,
) # log usage-crash details. Do not log any user details. If you want to turn this off, set `litellm.telemetry=False`.
except:
# [Non-Blocking Error]
pass
@wraps(original_function)
def wrapper(*args, **kwargs):
# Prints Exactly what was passed to litellm function - don't execute any logic here - it should just print
@ -2764,13 +2776,17 @@ def client(original_function):
"context_window_fallback_dict", {}
)
if num_retries:
_is_litellm_router_call = "model_group" in kwargs.get(
"metadata", {}
) # check if call from litellm.router/proxy
if (
num_retries and not _is_litellm_router_call
): # only enter this if call is not from litellm router/proxy. router has it's own logic for retrying
if (
isinstance(e, openai.APIError)
or isinstance(e, openai.Timeout)
or isinstance(e, openai.APIConnectionError)
):
print_verbose(f"RETRY TRIGGERED!")
kwargs["num_retries"] = num_retries
return litellm.completion_with_retries(*args, **kwargs)
elif (
@ -2784,7 +2800,6 @@ def client(original_function):
kwargs["model"] = context_window_fallback_dict[model]
return original_function(*args, **kwargs)
traceback_exception = traceback.format_exc()
crash_reporting(*args, **kwargs, exception=traceback_exception)
end_time = datetime.datetime.now()
# LOG FAILURE - handle streaming failure logging in the _next_ object, remove `handle_failure` once it's deprecated
if logging_obj:
@ -3206,7 +3221,6 @@ def client(original_function):
return result
except Exception as e:
traceback_exception = traceback.format_exc()
crash_reporting(*args, **kwargs, exception=traceback_exception)
end_time = datetime.datetime.now()
if logging_obj:
try:
@ -3234,7 +3248,12 @@ def client(original_function):
"context_window_fallback_dict", {}
)
if num_retries:
_is_litellm_router_call = "model_group" in kwargs.get(
"metadata", {}
) # check if call from litellm.router/proxy
if (
num_retries and not _is_litellm_router_call
): # only enter this if call is not from litellm router/proxy. router has it's own logic for retrying
try:
kwargs["num_retries"] = num_retries
kwargs["original_function"] = original_function
@ -3590,6 +3609,8 @@ def token_counter(
raise ValueError("text and messages cannot both be None")
elif isinstance(text, List):
text = "".join(t for t in text if isinstance(t, str))
elif isinstance(text, str):
count_response_tokens = True # user just trying to count tokens for a text. don't add the chat_ml +3 tokens to this
if model is not None:
tokenizer_json = _select_tokenizer(model=model)
@ -3630,7 +3651,7 @@ def token_counter(
count_response_tokens=count_response_tokens,
)
else:
num_tokens = len(encoding.encode(text)) # type: ignore
num_tokens = len(encoding.encode(text, disallowed_special=())) # type: ignore
return num_tokens
@ -6228,7 +6249,8 @@ def set_callbacks(callback_list, function_id=None):
elif callback == "datadog":
dataDogLogger = DataDogLogger()
elif callback == "prometheus":
prometheusLogger = PrometheusLogger()
if prometheusLogger is None:
prometheusLogger = PrometheusLogger()
elif callback == "dynamodb":
dynamoLogger = DyanmoDBLogger()
elif callback == "s3":
@ -7227,13 +7249,23 @@ def exception_type(
message=f"AnthropicException - {original_exception.message}",
llm_provider="anthropic",
model=model,
response=original_exception.response,
response=(
original_exception.response
if hasattr(original_exception, "response")
else httpx.Response(
status_code=500,
request=httpx.Request(
method="POST",
url="https://docs.anthropic.com/claude/reference/messages_post",
),
)
),
)
else:
exception_mapping_worked = True
raise APIError(
status_code=original_exception.status_code,
message=f"AnthropicException - {original_exception.message}",
message=f"AnthropicException - {original_exception.message}. Handle with `litellm.APIError`.",
llm_provider="anthropic",
model=model,
request=original_exception.request,
@ -8281,17 +8313,6 @@ def exception_type(
raise original_exception
####### CRASH REPORTING ################
def safe_crash_reporting(model=None, exception=None, custom_llm_provider=None):
data = {
"model": model,
"exception": str(exception),
"custom_llm_provider": custom_llm_provider,
}
executor.submit(litellm_telemetry, data)
# threading.Thread(target=litellm_telemetry, args=(data,), daemon=True).start()
def get_or_generate_uuid():
temp_dir = os.path.join(os.path.abspath(os.sep), "tmp")
uuid_file = os.path.join(temp_dir, "litellm_uuid.txt")
@ -8326,34 +8347,6 @@ def get_or_generate_uuid():
return uuid_value
def litellm_telemetry(data):
# Load or generate the UUID
uuid_value = ""
try:
uuid_value = get_or_generate_uuid()
except:
uuid_value = str(uuid.uuid4())
try:
# Prepare the data to send to litellm logging api
try:
pkg_version = importlib.metadata.version("litellm")
except:
pkg_version = None
if "model" not in data:
data["model"] = None
payload = {"uuid": uuid_value, "data": data, "version:": pkg_version}
# Make the POST request to litellm logging api
response = requests.post(
"https://litellm-logging.onrender.com/logging",
headers={"Content-Type": "application/json"},
json=payload,
)
response.raise_for_status() # Raise an exception for HTTP errors
except:
# [Non-Blocking Error]
return
######### Secret Manager ############################
# checks if user has passed in a secret manager client
# if passed in then checks the secret there
@ -8485,6 +8478,8 @@ class CustomStreamWrapper:
self.completion_stream = completion_stream
self.sent_first_chunk = False
self.sent_last_chunk = False
self.system_fingerprint: Optional[str] = None
self.received_finish_reason: Optional[str] = None
self.special_tokens = ["<|assistant|>", "<|system|>", "<|user|>", "<s>", "</s>"]
self.holding_chunk = ""
self.complete_response = ""
@ -8925,7 +8920,6 @@ class CustomStreamWrapper:
if data_json["choices"][0].get("finish_reason", None):
is_finished = True
finish_reason = data_json["choices"][0]["finish_reason"]
self.sent_last_chunk = True
print_verbose(
f"text: {text}; is_finished: {is_finished}; finish_reason: {finish_reason}"
)
@ -9114,6 +9108,17 @@ class CustomStreamWrapper:
if stop_reason != None:
is_finished = True
finish_reason = stop_reason
######## bedrock.mistral mappings ###############
elif "outputs" in chunk_data:
if (
len(chunk_data["outputs"]) == 1
and chunk_data["outputs"][0].get("text", None) is not None
):
text = chunk_data["outputs"][0]["text"]
stop_reason = chunk_data.get("stop_reason", None)
if stop_reason != None:
is_finished = True
finish_reason = stop_reason
######## bedrock.cohere mappings ###############
# meta mapping
elif "generation" in chunk_data:
@ -9158,16 +9163,32 @@ class CustomStreamWrapper:
"finish_reason": finish_reason,
}
def chunk_creator(self, chunk):
def model_response_creator(self):
model_response = ModelResponse(stream=True, model=self.model)
if self.response_id is not None:
model_response.id = self.response_id
else:
self.response_id = model_response.id
if self.system_fingerprint is not None:
model_response.system_fingerprint = self.system_fingerprint
model_response._hidden_params["custom_llm_provider"] = self.custom_llm_provider
model_response._hidden_params["created_at"] = time.time()
model_response.choices = [StreamingChoices()]
model_response.choices[0].finish_reason = None
return model_response
def is_delta_empty(self, delta: Delta) -> bool:
is_empty = True
if delta.content is not None:
is_empty = False
elif delta.tool_calls is not None:
is_empty = False
elif delta.function_call is not None:
is_empty = False
return is_empty
def chunk_creator(self, chunk):
model_response = self.model_response_creator()
response_obj = {}
try:
# return this for all models
@ -9176,30 +9197,22 @@ class CustomStreamWrapper:
response_obj = self.handle_anthropic_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.model == "replicate" or self.custom_llm_provider == "replicate":
response_obj = self.handle_replicate_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "together_ai":
response_obj = self.handle_together_ai_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "huggingface":
response_obj = self.handle_huggingface_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif (
self.custom_llm_provider and self.custom_llm_provider == "baseten"
): # baseten doesn't provide streaming
@ -9210,16 +9223,12 @@ class CustomStreamWrapper:
response_obj = self.handle_ai21_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "maritalk":
response_obj = self.handle_maritalk_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "vllm":
completion_obj["content"] = chunk[0].outputs[0].text
elif (
@ -9228,152 +9237,116 @@ class CustomStreamWrapper:
response_obj = self.handle_aleph_alpha_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "nlp_cloud":
try:
response_obj = self.handle_nlp_cloud_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
except Exception as e:
if self.sent_last_chunk:
if self.received_finish_reason:
raise e
else:
if self.sent_first_chunk is False:
raise Exception("An unknown error occurred with the stream")
model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True
self.received_finish_reason = "stop"
elif self.custom_llm_provider == "gemini":
try:
if hasattr(chunk, "parts") == True:
try:
if len(chunk.parts) > 0:
completion_obj["content"] = chunk.parts[0].text
if hasattr(chunk.parts[0], "finish_reason"):
model_response.choices[0].finish_reason = (
map_finish_reason(chunk.parts[0].finish_reason.name)
)
except:
if chunk.parts[0].finish_reason.name == "SAFETY":
raise Exception(
f"The response was blocked by VertexAI. {str(chunk)}"
)
else:
completion_obj["content"] = str(chunk)
except StopIteration as e:
if self.sent_last_chunk:
raise e
else:
model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True
if hasattr(chunk, "parts") == True:
try:
if len(chunk.parts) > 0:
completion_obj["content"] = chunk.parts[0].text
if hasattr(chunk.parts[0], "finish_reason"):
self.received_finish_reason = chunk.parts[
0
].finish_reason.name
except:
if chunk.parts[0].finish_reason.name == "SAFETY":
raise Exception(
f"The response was blocked by VertexAI. {str(chunk)}"
)
else:
completion_obj["content"] = str(chunk)
elif self.custom_llm_provider and (self.custom_llm_provider == "vertex_ai"):
try:
if hasattr(chunk, "candidates") == True:
if hasattr(chunk, "candidates") == True:
try:
try:
try:
completion_obj["content"] = chunk.text
except Exception as e:
if "Part has no text." in str(e):
## check for function calling
function_call = (
chunk.candidates[0]
.content.parts[0]
.function_call
)
args_dict = {}
for k, v in function_call.args.items():
args_dict[k] = v
args_str = json.dumps(args_dict)
_delta_obj = litellm.utils.Delta(
content=None,
tool_calls=[
{
"id": f"call_{str(uuid.uuid4())}",
"function": {
"arguments": args_str,
"name": function_call.name,
},
"type": "function",
}
],
)
_streaming_response = StreamingChoices(
delta=_delta_obj
)
_model_response = ModelResponse(stream=True)
_model_response.choices = [_streaming_response]
response_obj = {"original_chunk": _model_response}
else:
raise e
if (
hasattr(chunk.candidates[0], "finish_reason")
and chunk.candidates[0].finish_reason.name
!= "FINISH_REASON_UNSPECIFIED"
): # every non-final chunk in vertex ai has this
model_response.choices[0].finish_reason = (
map_finish_reason(
chunk.candidates[0].finish_reason.name
)
)
completion_obj["content"] = chunk.text
except Exception as e:
if chunk.candidates[0].finish_reason.name == "SAFETY":
raise Exception(
f"The response was blocked by VertexAI. {str(chunk)}"
if "Part has no text." in str(e):
## check for function calling
function_call = (
chunk.candidates[0].content.parts[0].function_call
)
else:
completion_obj["content"] = str(chunk)
except StopIteration as e:
if self.sent_last_chunk:
raise e
else:
model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True
args_dict = {}
for k, v in function_call.args.items():
args_dict[k] = v
args_str = json.dumps(args_dict)
_delta_obj = litellm.utils.Delta(
content=None,
tool_calls=[
{
"id": f"call_{str(uuid.uuid4())}",
"function": {
"arguments": args_str,
"name": function_call.name,
},
"type": "function",
}
],
)
_streaming_response = StreamingChoices(delta=_delta_obj)
_model_response = ModelResponse(stream=True)
_model_response.choices = [_streaming_response]
response_obj = {"original_chunk": _model_response}
else:
raise e
if (
hasattr(chunk.candidates[0], "finish_reason")
and chunk.candidates[0].finish_reason.name
!= "FINISH_REASON_UNSPECIFIED"
): # every non-final chunk in vertex ai has this
self.received_finish_reason = chunk.candidates[
0
].finish_reason.name
except Exception as e:
if chunk.candidates[0].finish_reason.name == "SAFETY":
raise Exception(
f"The response was blocked by VertexAI. {str(chunk)}"
)
else:
completion_obj["content"] = str(chunk)
elif self.custom_llm_provider == "cohere":
response_obj = self.handle_cohere_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "cohere_chat":
response_obj = self.handle_cohere_chat_chunk(chunk)
if response_obj is None:
return
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "bedrock":
if self.sent_last_chunk:
if self.received_finish_reason is not None:
raise StopIteration
response_obj = self.handle_bedrock_stream(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.sent_last_chunk = True
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "sagemaker":
verbose_logger.debug(f"ENTERS SAGEMAKER STREAMING for chunk {chunk}")
print_verbose(f"ENTERS SAGEMAKER STREAMING for chunk {chunk}")
response_obj = self.handle_sagemaker_stream(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.sent_last_chunk = True
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "petals":
if len(self.completion_stream) == 0:
if self.sent_last_chunk:
if self.received_finish_reason is not None:
raise StopIteration
else:
model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True
self.received_finish_reason = "stop"
chunk_size = 30
new_chunk = self.completion_stream[:chunk_size]
completion_obj["content"] = new_chunk
@ -9383,11 +9356,10 @@ class CustomStreamWrapper:
# fake streaming
response_obj = {}
if len(self.completion_stream) == 0:
if self.sent_last_chunk:
if self.received_finish_reason is not None:
raise StopIteration
else:
model_response.choices[0].finish_reason = "stop"
self.sent_last_chunk = True
self.received_finish_reason = "stop"
chunk_size = 30
new_chunk = self.completion_stream[:chunk_size]
completion_obj["content"] = new_chunk
@ -9398,41 +9370,31 @@ class CustomStreamWrapper:
completion_obj["content"] = response_obj["text"]
print_verbose(f"completion obj content: {completion_obj['content']}")
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "ollama_chat":
response_obj = self.handle_ollama_chat_stream(chunk)
completion_obj["content"] = response_obj["text"]
print_verbose(f"completion obj content: {completion_obj['content']}")
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "cloudflare":
response_obj = self.handle_cloudlfare_stream(chunk)
completion_obj["content"] = response_obj["text"]
print_verbose(f"completion obj content: {completion_obj['content']}")
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "text-completion-openai":
response_obj = self.handle_openai_text_completion_chunk(chunk)
completion_obj["content"] = response_obj["text"]
print_verbose(f"completion obj content: {completion_obj['content']}")
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "azure_text":
response_obj = self.handle_azure_text_completion_chunk(chunk)
completion_obj["content"] = response_obj["text"]
print_verbose(f"completion obj content: {completion_obj['content']}")
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "cached_response":
response_obj = {
"text": chunk.choices[0].delta.content,
@ -9445,10 +9407,11 @@ class CustomStreamWrapper:
print_verbose(f"completion obj content: {completion_obj['content']}")
if hasattr(chunk, "id"):
model_response.id = chunk.id
self.response_id = chunk.id
if hasattr(chunk, "system_fingerprint"):
self.system_fingerprint = chunk.system_fingerprint
if response_obj["is_finished"]:
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
else: # openai / azure chat model
if self.custom_llm_provider == "azure":
if hasattr(chunk, "model"):
@ -9464,21 +9427,24 @@ class CustomStreamWrapper:
raise Exception(
"Mistral API raised a streaming error - finish_reason: error, no content string given."
)
model_response.choices[0].finish_reason = response_obj[
"finish_reason"
]
self.received_finish_reason = response_obj["finish_reason"]
if response_obj.get("original_chunk", None) is not None:
model_response.system_fingerprint = getattr(
response_obj["original_chunk"], "system_fingerprint", None
)
if hasattr(response_obj["original_chunk"], "id"):
model_response.id = response_obj["original_chunk"].id
self.response_id = model_response.id
if hasattr(response_obj["original_chunk"], "system_fingerprint"):
model_response.system_fingerprint = response_obj[
"original_chunk"
].system_fingerprint
self.system_fingerprint = response_obj[
"original_chunk"
].system_fingerprint
if response_obj["logprobs"] is not None:
model_response.choices[0].logprobs = response_obj["logprobs"]
model_response.model = self.model
print_verbose(
f"model_response finish reason 3: {model_response.choices[0].finish_reason}; response_obj={response_obj}"
f"model_response finish reason 3: {self.received_finish_reason}; response_obj={response_obj}"
)
## FUNCTION CALL PARSING
if (
@ -9488,6 +9454,7 @@ class CustomStreamWrapper:
# enter this branch when no content has been passed in response
original_chunk = response_obj.get("original_chunk", None)
model_response.id = original_chunk.id
self.response_id = original_chunk.id
if len(original_chunk.choices) > 0:
if (
original_chunk.choices[0].delta.function_call is not None
@ -9569,6 +9536,7 @@ class CustomStreamWrapper:
original_chunk = response_obj.get("original_chunk", None)
if original_chunk:
model_response.id = original_chunk.id
self.response_id = original_chunk.id
if len(original_chunk.choices) > 0:
try:
delta = dict(original_chunk.choices[0].delta)
@ -9607,7 +9575,10 @@ class CustomStreamWrapper:
return model_response
else:
return
elif model_response.choices[0].finish_reason is not None:
elif self.received_finish_reason is not None:
if self.sent_last_chunk == True:
raise StopIteration
# flush any remaining holding chunk
if len(self.holding_chunk) > 0:
if model_response.choices[0].delta.content is None:
@ -9617,10 +9588,18 @@ class CustomStreamWrapper:
self.holding_chunk + model_response.choices[0].delta.content
)
self.holding_chunk = ""
# get any function call arguments
model_response.choices[0].finish_reason = map_finish_reason(
model_response.choices[0].finish_reason
) # ensure consistent output to openai
# if delta is None
is_delta_empty = self.is_delta_empty(
delta=model_response.choices[0].delta
)
if is_delta_empty:
# get any function call arguments
model_response.choices[0].finish_reason = map_finish_reason(
finish_reason=self.received_finish_reason
) # ensure consistent output to openai
self.sent_last_chunk = True
return model_response
elif (
model_response.choices[0].delta.tool_calls is not None
@ -9644,15 +9623,31 @@ class CustomStreamWrapper:
)
def set_logging_event_loop(self, loop):
"""
import litellm, asyncio
loop = asyncio.get_event_loop() # 👈 gets the current event loop
response = litellm.completion(.., stream=True)
response.set_logging_event_loop(loop=loop) # 👈 enables async_success callbacks for sync logging
for chunk in response:
...
"""
self.logging_loop = loop
async def your_async_function(self):
# Your asynchronous code here
return "Your asynchronous code is running"
def run_success_logging_in_thread(self, processed_chunk):
# Create an event loop for the new thread
if litellm.disable_streaming_logging == True:
"""
[NOT RECOMMENDED]
Set this via `litellm.disable_streaming_logging = True`.
Disables streaming logging.
"""
return
## ASYNC LOGGING
# Create an event loop for the new thread
if self.logging_loop is not None:
future = asyncio.run_coroutine_threadsafe(
self.logging_obj.async_success_handler(processed_chunk),
@ -9664,6 +9659,16 @@ class CustomStreamWrapper:
## SYNC LOGGING
self.logging_obj.success_handler(processed_chunk)
def finish_reason_handler(self):
model_response = self.model_response_creator()
if self.received_finish_reason is not None:
model_response.choices[0].finish_reason = map_finish_reason(
finish_reason=self.received_finish_reason
)
else:
model_response.choices[0].finish_reason = "stop"
return model_response
## needs to handle the empty string case (even starting chunk can be an empty string)
def __next__(self):
try:
@ -9698,14 +9703,30 @@ class CustomStreamWrapper:
# RETURN RESULT
return response
except StopIteration:
raise # Re-raise StopIteration
if self.sent_last_chunk == True:
raise # Re-raise StopIteration
else:
self.sent_last_chunk = True
processed_chunk = self.finish_reason_handler()
## LOGGING
threading.Thread(
target=self.logging_obj.success_handler, args=(processed_chunk,)
).start() # log response
return processed_chunk
except Exception as e:
traceback_exception = traceback.format_exc()
# LOG FAILURE - handle streaming failure logging in the _next_ object, remove `handle_failure` once it's deprecated
threading.Thread(
target=self.logging_obj.failure_handler, args=(e, traceback_exception)
).start()
raise e
if isinstance(e, OpenAIError):
raise e
else:
raise exception_type(
model=self.model,
original_exception=e,
custom_llm_provider=self.custom_llm_provider,
)
async def __anext__(self):
try:
@ -9803,9 +9824,37 @@ class CustomStreamWrapper:
# RETURN RESULT
return processed_chunk
except StopAsyncIteration:
raise
if self.sent_last_chunk == True:
raise # Re-raise StopIteration
else:
self.sent_last_chunk = True
processed_chunk = self.finish_reason_handler()
## LOGGING
threading.Thread(
target=self.logging_obj.success_handler, args=(processed_chunk,)
).start() # log response
asyncio.create_task(
self.logging_obj.async_success_handler(
processed_chunk,
)
)
return processed_chunk
except StopIteration:
raise StopAsyncIteration # Re-raise StopIteration
if self.sent_last_chunk == True:
raise StopAsyncIteration
else:
self.sent_last_chunk = True
processed_chunk = self.finish_reason_handler()
## LOGGING
threading.Thread(
target=self.logging_obj.success_handler, args=(processed_chunk,)
).start() # log response
asyncio.create_task(
self.logging_obj.async_success_handler(
processed_chunk,
)
)
return processed_chunk
except Exception as e:
traceback_exception = traceback.format_exc()
# Handle any exceptions that might occur during streaming