litellm-mirror/litellm/llms/databricks/streaming_utils.py
Krish Dholakia 2e5c46ef6d
LiteLLM Minor Fixes & Improvements (10/04/2024) (#6064)
* fix(litellm_logging.py): ensure cache hits are scrubbed if 'turn_off_message_logging' is enabled

* fix(sagemaker.py): fix streaming to raise error immediately

Fixes https://github.com/BerriAI/litellm/issues/6054

* (fixes)  gcs bucket key based logging  (#6044)

* fixes for gcs bucket logging

* fix StandardCallbackDynamicParams

* fix - gcs logging when payload is not serializable

* add test_add_callback_via_key_litellm_pre_call_utils_gcs_bucket

* working success callbacks

* linting fixes

* fix linting error

* add type hints to functions

* fixes for dynamic success and failure logging

* fix for test_async_chat_openai_stream

* fix handle case when key based logging vars are set as os.environ/ vars

* fix prometheus track cooldown events on custom logger (#6060)

* (docs) add 1k rps load test doc  (#6059)

* docs 1k rps load test

* docs load testing

* docs load testing litellm

* docs load testing

* clean up load test doc

* docs prom metrics for load testing

* docs using prometheus on load testing

* doc load testing with prometheus

* (fixes) docs + qa - gcs key based logging  (#6061)

* fixes for required values for gcs bucket

* docs gcs bucket logging

* bump: version 1.48.12 → 1.48.13

* ci/cd run again

* bump: version 1.48.13 → 1.48.14

* update load test doc

* (docs) router settings - on litellm config  (#6037)

* add yaml with all router settings

* add docs for router settings

* docs router settings litellm settings

* (feat)  OpenAI prompt caching models to model cost map (#6063)

* add prompt caching for latest models

* add cache_read_input_token_cost for prompt caching models

* fix(litellm_logging.py): check if param is iterable

Fixes https://github.com/BerriAI/litellm/issues/6025#issuecomment-2393929946

* fix(factory.py): support passing an 'assistant_continue_message' to prevent bedrock error

Fixes https://github.com/BerriAI/litellm/issues/6053

* fix(databricks/chat): handle streaming responses

* fix(factory.py): fix linting error

* fix(utils.py): unify anthropic + deepseek prompt caching information to openai format

Fixes https://github.com/BerriAI/litellm/issues/6069

* test: fix test

* fix(types/utils.py): support all openai roles

Fixes https://github.com/BerriAI/litellm/issues/6052

* test: fix test

---------

Co-authored-by: Ishaan Jaff <ishaanjaffer0324@gmail.com>
2024-10-04 21:28:53 -04:00

147 lines
5.4 KiB
Python

import json
from typing import Optional
import litellm
from litellm.types.llms.openai import (
ChatCompletionDeltaChunk,
ChatCompletionResponseMessage,
ChatCompletionToolCallChunk,
ChatCompletionToolCallFunctionChunk,
ChatCompletionUsageBlock,
)
from litellm.types.utils import GenericStreamingChunk
class ModelResponseIterator:
def __init__(self, streaming_response, sync_stream: bool):
self.streaming_response = streaming_response
def chunk_parser(self, chunk: dict) -> GenericStreamingChunk:
try:
processed_chunk = litellm.ModelResponse(**chunk, stream=True) # type: ignore
text = ""
tool_use: Optional[ChatCompletionToolCallChunk] = None
is_finished = False
finish_reason = ""
usage: Optional[ChatCompletionUsageBlock] = None
if processed_chunk.choices[0].delta.content is not None: # type: ignore
text = processed_chunk.choices[0].delta.content # type: ignore
if (
processed_chunk.choices[0].delta.tool_calls is not None # type: ignore
and len(processed_chunk.choices[0].delta.tool_calls) > 0 # type: ignore
and processed_chunk.choices[0].delta.tool_calls[0].function is not None # type: ignore
and processed_chunk.choices[0].delta.tool_calls[0].function.arguments # type: ignore
is not None
):
tool_use = ChatCompletionToolCallChunk(
id=processed_chunk.choices[0].delta.tool_calls[0].id, # type: ignore
type="function",
function=ChatCompletionToolCallFunctionChunk(
name=processed_chunk.choices[0]
.delta.tool_calls[0] # type: ignore
.function.name,
arguments=processed_chunk.choices[0]
.delta.tool_calls[0] # type: ignore
.function.arguments,
),
index=processed_chunk.choices[0].index,
)
if processed_chunk.choices[0].finish_reason is not None:
is_finished = True
finish_reason = processed_chunk.choices[0].finish_reason
usage_chunk: Optional[litellm.Usage] = getattr(
processed_chunk, "usage", None
)
if usage_chunk is not None:
usage = ChatCompletionUsageBlock(
prompt_tokens=usage_chunk.prompt_tokens,
completion_tokens=usage_chunk.completion_tokens,
total_tokens=usage_chunk.total_tokens,
)
return GenericStreamingChunk(
text=text,
tool_use=tool_use,
is_finished=is_finished,
finish_reason=finish_reason,
usage=usage,
index=0,
)
except json.JSONDecodeError:
raise ValueError(f"Failed to decode JSON from chunk: {chunk}")
# Sync iterator
def __iter__(self):
self.response_iterator = self.streaming_response
return self
def __next__(self):
if not hasattr(self, "response_iterator"):
self.response_iterator = self.streaming_response
try:
chunk = self.response_iterator.__next__()
except StopIteration:
raise StopIteration
except ValueError as e:
raise RuntimeError(f"Error receiving chunk from stream: {e}")
try:
chunk = chunk.replace("data:", "")
chunk = chunk.strip()
if len(chunk) > 0:
json_chunk = json.loads(chunk)
return self.chunk_parser(chunk=json_chunk)
else:
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)
except StopIteration:
raise StopIteration
except ValueError as e:
raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
# Async iterator
def __aiter__(self):
self.async_response_iterator = self.streaming_response.__aiter__()
return self
async def __anext__(self):
try:
chunk = await self.async_response_iterator.__anext__()
except StopAsyncIteration:
raise StopAsyncIteration
except ValueError as e:
raise RuntimeError(f"Error receiving chunk from stream: {e}")
try:
chunk = chunk.replace("data:", "")
chunk = chunk.strip()
if chunk == "[DONE]":
raise StopAsyncIteration
if len(chunk) > 0:
json_chunk = json.loads(chunk)
return self.chunk_parser(chunk=json_chunk)
else:
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)
except StopAsyncIteration:
raise StopAsyncIteration
except ValueError as e:
raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")