litellm-mirror/litellm/llms/databricks/streaming_utils.py
Krish Dholakia 1bef6457c7
Litellm dev 11 07 2024 (#6649)
* fix(streaming_handler.py): save finish_reasons which might show up mid-stream (store last received one)

Fixes https://github.com/BerriAI/litellm/issues/6104

* refactor: add readme to litellm_core_utils/

make it easier to navigate

* fix(team_endpoints.py): return team id + object for invalid team in `/team/list`

* fix(streaming_handler.py): remove import

* fix(pattern_match_deployments.py): default to user input if unable to map based on wildcards (#6646)

* fix(pattern_match_deployments.py): default to user input if unable to… (#6632)

* fix(pattern_match_deployments.py): default to user input if unable to map based on wildcards

* test: fix test

* test: reset test name

* test: update conftest to reload proxy server module between tests

* ci(config.yml): move langfuse out of local_testing

reduce ci/cd time

* ci(config.yml): cleanup langfuse ci/cd tests

* fix: update test to not use global proxy_server app module

* ci: move caching to a separate test pipeline

speed up ci pipeline

* test: update conftest to check if proxy_server attr exists before reloading

* build(conftest.py): don't block on inability to reload proxy_server

* ci(config.yml): update caching unit test filter to work on 'cache' keyword as well

* fix(encrypt_decrypt_utils.py): use function to get salt key

* test: mark flaky test

* test: handle anthropic overloaded errors

* refactor: create separate ci/cd pipeline for proxy unit tests

make ci/cd faster

* ci(config.yml): add litellm_proxy_unit_testing to build_and_test jobs

* ci(config.yml): generate prisma binaries for proxy unit tests

* test: readd vertex_key.json

* ci(config.yml): remove `-s` from proxy_unit_test cmd

speed up test

* ci: remove any 'debug' logging flag

speed up ci pipeline

* test: fix test

* test(test_braintrust.py): rerun

* test: add delay for braintrust test

* chore: comment for maritalk (#6607)

* Update gpt-4o-2024-08-06, and o1-preview, o1-mini models in model cost map  (#6654)

* Adding supports_response_schema to gpt-4o-2024-08-06 models

* o1 models do not support vision

---------

Co-authored-by: Emerson Gomes <emerson.gomes@thalesgroup.com>

* (QOL improvement) add unit testing for all static_methods in litellm_logging.py  (#6640)

* add unit testing for standard logging payload

* unit testing for static methods in litellm_logging

* add code coverage check for litellm_logging

* litellm_logging_code_coverage

* test_get_final_response_obj

* fix validate_redacted_message_span_attributes

* test validate_redacted_message_span_attributes

* (feat) log error class, function_name on prometheus service failure hook + only log DB related failures on DB service hook  (#6650)

* log error on prometheus service failure hook

* use a more accurate function name for wrapper that handles logging db metrics

* fix log_db_metrics

* test_log_db_metrics_failure_error_types

* fix linting

* fix auth checks

* Update several Azure AI models in model cost map (#6655)

* Adding Azure Phi 3/3.5 models to model cost map

* Update gpt-4o-mini models

* Adding missing Azure Mistral models to model cost map

* Adding Azure Llama3.2 models to model cost map

* Fix Gemini-1.5-flash pricing

* Fix Gemini-1.5-flash output pricing

* Fix Gemini-1.5-pro prices

* Fix Gemini-1.5-flash output prices

* Correct gemini-1.5-pro prices

* Correction on Vertex Llama3.2 entry

---------

Co-authored-by: Emerson Gomes <emerson.gomes@thalesgroup.com>

* fix(streaming_handler.py): fix linting error

* test: remove duplicate test

causes gemini ratelimit error

---------

Co-authored-by: nobuo kawasaki <nobu007@users.noreply.github.com>
Co-authored-by: Emerson Gomes <emerson.gomes@gmail.com>
Co-authored-by: Emerson Gomes <emerson.gomes@thalesgroup.com>
Co-authored-by: Ishaan Jaff <ishaanjaffer0324@gmail.com>
2024-11-08 19:34:22 +05:30

170 lines
6.1 KiB
Python

import json
from typing import List, Optional
import litellm
from litellm import verbose_logger
from litellm.types.llms.openai import (
ChatCompletionDeltaChunk,
ChatCompletionResponseMessage,
ChatCompletionToolCallChunk,
ChatCompletionToolCallFunctionChunk,
ChatCompletionUsageBlock,
)
from litellm.types.utils import GenericStreamingChunk, ModelResponse, Usage
class ModelResponseIterator:
def __init__(self, streaming_response, sync_stream: bool):
self.streaming_response = streaming_response
def chunk_parser(self, chunk: dict) -> GenericStreamingChunk:
try:
processed_chunk = litellm.ModelResponse(**chunk, stream=True) # type: ignore
text = ""
tool_use: Optional[ChatCompletionToolCallChunk] = None
is_finished = False
finish_reason = ""
usage: Optional[ChatCompletionUsageBlock] = None
if processed_chunk.choices[0].delta.content is not None: # type: ignore
text = processed_chunk.choices[0].delta.content # type: ignore
if (
processed_chunk.choices[0].delta.tool_calls is not None # type: ignore
and len(processed_chunk.choices[0].delta.tool_calls) > 0 # type: ignore
and processed_chunk.choices[0].delta.tool_calls[0].function is not None # type: ignore
and processed_chunk.choices[0].delta.tool_calls[0].function.arguments # type: ignore
is not None
):
tool_use = ChatCompletionToolCallChunk(
id=processed_chunk.choices[0].delta.tool_calls[0].id, # type: ignore
type="function",
function=ChatCompletionToolCallFunctionChunk(
name=processed_chunk.choices[0]
.delta.tool_calls[0] # type: ignore
.function.name,
arguments=processed_chunk.choices[0]
.delta.tool_calls[0] # type: ignore
.function.arguments,
),
index=processed_chunk.choices[0].index,
)
if processed_chunk.choices[0].finish_reason is not None:
is_finished = True
finish_reason = processed_chunk.choices[0].finish_reason
usage_chunk: Optional[litellm.Usage] = getattr(
processed_chunk, "usage", None
)
if usage_chunk is not None:
usage = ChatCompletionUsageBlock(
prompt_tokens=usage_chunk.prompt_tokens,
completion_tokens=usage_chunk.completion_tokens,
total_tokens=usage_chunk.total_tokens,
)
return GenericStreamingChunk(
text=text,
tool_use=tool_use,
is_finished=is_finished,
finish_reason=finish_reason,
usage=usage,
index=0,
)
except json.JSONDecodeError:
raise ValueError(f"Failed to decode JSON from chunk: {chunk}")
# Sync iterator
def __iter__(self):
self.response_iterator = self.streaming_response
return self
def __next__(self):
if not hasattr(self, "response_iterator"):
self.response_iterator = self.streaming_response
try:
chunk = self.response_iterator.__next__()
except StopIteration:
raise StopIteration
except ValueError as e:
raise RuntimeError(f"Error receiving chunk from stream: {e}")
try:
chunk = chunk.replace("data:", "")
chunk = chunk.strip()
if len(chunk) > 0:
json_chunk = json.loads(chunk)
return self.chunk_parser(chunk=json_chunk)
else:
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)
except StopIteration:
raise StopIteration
except ValueError as e:
verbose_logger.debug(
f"Error parsing chunk: {e},\nReceived chunk: {chunk}. Defaulting to empty chunk here."
)
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)
# Async iterator
def __aiter__(self):
self.async_response_iterator = self.streaming_response.__aiter__()
return self
async def __anext__(self):
try:
chunk = await self.async_response_iterator.__anext__()
except StopAsyncIteration:
raise StopAsyncIteration
except ValueError as e:
raise RuntimeError(f"Error receiving chunk from stream: {e}")
except Exception as e:
raise RuntimeError(f"Error receiving chunk from stream: {e}")
try:
chunk = chunk.replace("data:", "")
chunk = chunk.strip()
if chunk == "[DONE]":
raise StopAsyncIteration
if len(chunk) > 0:
json_chunk = json.loads(chunk)
return self.chunk_parser(chunk=json_chunk)
else:
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)
except StopAsyncIteration:
raise StopAsyncIteration
except ValueError as e:
verbose_logger.debug(
f"Error parsing chunk: {e},\nReceived chunk: {chunk}. Defaulting to empty chunk here."
)
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)