litellm-mirror/litellm/llms/databricks/streaming_utils.py
Krish Dholakia 516c2a6a70
Litellm remove circular imports (#7232)
* fix(utils.py): initial commit to remove circular imports - moves llmproviders to utils.py

* fix(router.py): fix 'litellm.EmbeddingResponse' import from router.py

'

* refactor: fix litellm.ModelResponse import on pass through endpoints

* refactor(litellm_logging.py): fix circular import for custom callbacks literal

* fix(factory.py): fix circular imports inside prompt factory

* fix(cost_calculator.py): fix circular import for 'litellm.Usage'

* fix(proxy_server.py): fix potential circular import with `litellm.Router'

* fix(proxy/utils.py): fix potential circular import in `litellm.Router`

* fix: remove circular imports in 'auth_checks' and 'guardrails/'

* fix(prompt_injection_detection.py): fix router impor t

* fix(vertex_passthrough_logging_handler.py): fix potential circular imports in vertex pass through

* fix(anthropic_pass_through_logging_handler.py): fix potential circular imports

* fix(slack_alerting.py-+-ollama_chat.py): fix modelresponse import

* fix(base.py): fix potential circular import

* fix(handler.py): fix potential circular ref in codestral + cohere handler's

* fix(azure.py): fix potential circular imports

* fix(gpt_transformation.py): fix modelresponse import

* fix(litellm_logging.py): add logging base class - simplify typing

makes it easy for other files to type check the logging obj without introducing circular imports

* fix(azure_ai/embed): fix potential circular import on handler.py

* fix(databricks/): fix potential circular imports in databricks/

* fix(vertex_ai/): fix potential circular imports on vertex ai embeddings

* fix(vertex_ai/image_gen): fix import

* fix(watsonx-+-bedrock): cleanup imports

* refactor(anthropic-pass-through-+-petals): cleanup imports

* refactor(huggingface/): cleanup imports

* fix(ollama-+-clarifai): cleanup circular imports

* fix(openai_like/): fix impor t

* fix(openai_like/): fix embedding handler

cleanup imports

* refactor(openai.py): cleanup imports

* fix(sagemaker/transformation.py): fix import

* ci(config.yml): add circular import test to ci/cd
2024-12-14 16:28:34 -08:00

168 lines
6.1 KiB
Python

import json
from typing import List, Optional
import litellm
from litellm import verbose_logger
from litellm.types.llms.openai import (
ChatCompletionDeltaChunk,
ChatCompletionResponseMessage,
ChatCompletionToolCallChunk,
ChatCompletionToolCallFunctionChunk,
ChatCompletionUsageBlock,
)
from litellm.types.utils import GenericStreamingChunk, ModelResponse, Usage
class ModelResponseIterator:
def __init__(self, streaming_response, sync_stream: bool):
self.streaming_response = streaming_response
def chunk_parser(self, chunk: dict) -> GenericStreamingChunk:
try:
processed_chunk = litellm.ModelResponse(**chunk, stream=True) # type: ignore
text = ""
tool_use: Optional[ChatCompletionToolCallChunk] = None
is_finished = False
finish_reason = ""
usage: Optional[ChatCompletionUsageBlock] = None
if processed_chunk.choices[0].delta.content is not None: # type: ignore
text = processed_chunk.choices[0].delta.content # type: ignore
if (
processed_chunk.choices[0].delta.tool_calls is not None # type: ignore
and len(processed_chunk.choices[0].delta.tool_calls) > 0 # type: ignore
and processed_chunk.choices[0].delta.tool_calls[0].function is not None # type: ignore
and processed_chunk.choices[0].delta.tool_calls[0].function.arguments # type: ignore
is not None
):
tool_use = ChatCompletionToolCallChunk(
id=processed_chunk.choices[0].delta.tool_calls[0].id, # type: ignore
type="function",
function=ChatCompletionToolCallFunctionChunk(
name=processed_chunk.choices[0]
.delta.tool_calls[0] # type: ignore
.function.name,
arguments=processed_chunk.choices[0]
.delta.tool_calls[0] # type: ignore
.function.arguments,
),
index=processed_chunk.choices[0].index,
)
if processed_chunk.choices[0].finish_reason is not None:
is_finished = True
finish_reason = processed_chunk.choices[0].finish_reason
usage_chunk: Optional[Usage] = getattr(processed_chunk, "usage", None)
if usage_chunk is not None:
usage = ChatCompletionUsageBlock(
prompt_tokens=usage_chunk.prompt_tokens,
completion_tokens=usage_chunk.completion_tokens,
total_tokens=usage_chunk.total_tokens,
)
return GenericStreamingChunk(
text=text,
tool_use=tool_use,
is_finished=is_finished,
finish_reason=finish_reason,
usage=usage,
index=0,
)
except json.JSONDecodeError:
raise ValueError(f"Failed to decode JSON from chunk: {chunk}")
# Sync iterator
def __iter__(self):
self.response_iterator = self.streaming_response
return self
def __next__(self):
if not hasattr(self, "response_iterator"):
self.response_iterator = self.streaming_response
try:
chunk = self.response_iterator.__next__()
except StopIteration:
raise StopIteration
except ValueError as e:
raise RuntimeError(f"Error receiving chunk from stream: {e}")
try:
chunk = chunk.replace("data:", "")
chunk = chunk.strip()
if len(chunk) > 0:
json_chunk = json.loads(chunk)
return self.chunk_parser(chunk=json_chunk)
else:
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)
except StopIteration:
raise StopIteration
except ValueError as e:
verbose_logger.debug(
f"Error parsing chunk: {e},\nReceived chunk: {chunk}. Defaulting to empty chunk here."
)
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)
# Async iterator
def __aiter__(self):
self.async_response_iterator = self.streaming_response.__aiter__()
return self
async def __anext__(self):
try:
chunk = await self.async_response_iterator.__anext__()
except StopAsyncIteration:
raise StopAsyncIteration
except ValueError as e:
raise RuntimeError(f"Error receiving chunk from stream: {e}")
except Exception as e:
raise RuntimeError(f"Error receiving chunk from stream: {e}")
try:
chunk = chunk.replace("data:", "")
chunk = chunk.strip()
if chunk == "[DONE]":
raise StopAsyncIteration
if len(chunk) > 0:
json_chunk = json.loads(chunk)
return self.chunk_parser(chunk=json_chunk)
else:
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)
except StopAsyncIteration:
raise StopAsyncIteration
except ValueError as e:
verbose_logger.debug(
f"Error parsing chunk: {e},\nReceived chunk: {chunk}. Defaulting to empty chunk here."
)
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)