Clarifai - Added streaming and async completion support

This commit is contained in:
mogith-pn 2024-05-03 14:03:38 +00:00
parent d770df2259
commit 723ef9963e
6 changed files with 259 additions and 53 deletions

View file

@ -119,6 +119,42 @@
"print(f\"Claude-2.1 response : {response}\")" "print(f\"Claude-2.1 response : {response}\")"
] ]
}, },
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### OpenAI GPT-4 (Streaming)\n",
"Though clarifai doesn't support streaming, still you can call stream and get the response in standard StreamResponse format of liteLLM"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"ModelResponse(id='chatcmpl-40ae19af-3bf0-4eb4-99f2-33aec3ba84af', choices=[StreamingChoices(finish_reason=None, index=0, delta=Delta(content=\"In the quiet corners of time's grand hall,\\nLies the tale of rise and fall.\\nFrom ancient ruins to modern sprawl,\\nHistory, the greatest story of them all.\\n\\nEmpires have risen, empires have decayed,\\nThrough the eons, memories have stayed.\\nIn the book of time, history is laid,\\nA tapestry of events, meticulously displayed.\\n\\nThe pyramids of Egypt, standing tall,\\nThe Roman Empire's mighty sprawl.\\nFrom Alexander's conquest, to the Berlin Wall,\\nHistory, a silent witness to it all.\\n\\nIn the shadow of the past we tread,\\nWhere once kings and prophets led.\\nTheir stories in our hearts are spread,\\nEchoes of their words, in our minds are read.\\n\\nBattles fought and victories won,\\nActs of courage under the sun.\\nTales of love, of deeds done,\\nIn history's grand book, they all run.\\n\\nHeroes born, legends made,\\nIn the annals of time, they'll never fade.\\nTheir triumphs and failures all displayed,\\nIn the eternal march of history's parade.\\n\\nThe ink of the past is forever dry,\\nBut its lessons, we cannot deny.\\nIn its stories, truths lie,\\nIn its wisdom, we rely.\\n\\nHistory, a mirror to our past,\\nA guide for the future vast.\\nThrough its lens, we're ever cast,\\nIn the drama of life, forever vast.\", role='assistant', function_call=None, tool_calls=None), logprobs=None)], created=1714744515, model='https://api.clarifai.com/v2/users/openai/apps/chat-completion/models/GPT-4/outputs', object='chat.completion.chunk', system_fingerprint=None)\n",
"ModelResponse(id='chatcmpl-40ae19af-3bf0-4eb4-99f2-33aec3ba84af', choices=[StreamingChoices(finish_reason='stop', index=0, delta=Delta(content=None, role=None, function_call=None, tool_calls=None), logprobs=None)], created=1714744515, model='https://api.clarifai.com/v2/users/openai/apps/chat-completion/models/GPT-4/outputs', object='chat.completion.chunk', system_fingerprint=None)\n"
]
}
],
"source": [
"from litellm import completion\n",
"\n",
"messages = [{\"role\": \"user\",\"content\": \"\"\"Write a poem about history?\"\"\"}]\n",
"response = completion(\n",
" model=\"clarifai/openai.chat-completion.GPT-4\",\n",
" messages=messages,\n",
" stream=True,\n",
" api_key = \"c75cc032415e45368be331fdd2c06db0\")\n",
"\n",
"for chunk in response:\n",
" print(chunk)"
]
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
@ -143,7 +179,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.9.10" "version": "3.10.13"
} }
}, },
"nbformat": 4, "nbformat": 4,

View file

@ -3,9 +3,10 @@ import json
import requests import requests
import time import time
from typing import Callable, Optional from typing import Callable, Optional
from litellm.utils import ModelResponse, Usage, Choices, Message from litellm.utils import ModelResponse, Usage, Choices, Message, CustomStreamWrapper
import litellm import litellm
import httpx import httpx
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler
from .prompt_templates.factory import prompt_factory, custom_prompt from .prompt_templates.factory import prompt_factory, custom_prompt
@ -85,6 +86,63 @@ def completions_to_model(payload):
"model": {"output_info": {"params": params}}, "model": {"output_info": {"params": params}},
} }
def process_response(
model,
prompt,
response,
model_response,
api_key,
data,
encoding,
logging_obj
):
logging_obj.post_call(
input=prompt,
api_key=api_key,
original_response=response.text,
additional_args={"complete_input_dict": data},
)
## RESPONSE OBJECT
try:
completion_response = response.json()
except Exception:
raise ClarifaiError(
message=response.text, status_code=response.status_code, url=model
)
# print(completion_response)
try:
choices_list = []
for idx, item in enumerate(completion_response["outputs"]):
if len(item["data"]["text"]["raw"]) > 0:
message_obj = Message(content=item["data"]["text"]["raw"])
else:
message_obj = Message(content=None)
choice_obj = Choices(
finish_reason="stop",
index=idx + 1, #check
message=message_obj,
)
choices_list.append(choice_obj)
model_response["choices"] = choices_list
except Exception as e:
raise ClarifaiError(
message=traceback.format_exc(), status_code=response.status_code, url=model
)
# Calculate Usage
prompt_tokens = len(encoding.encode(prompt))
completion_tokens = len(
encoding.encode(model_response["choices"][0]["message"].get("content"))
)
model_response["model"] = model
model_response["usage"] = Usage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
)
return model_response
def convert_model_to_url(model: str, api_base: str): def convert_model_to_url(model: str, api_base: str):
user_id, app_id, model_id = model.split(".") user_id, app_id, model_id = model.split(".")
return f"{api_base}/users/{user_id}/apps/{app_id}/models/{model_id}/outputs" return f"{api_base}/users/{user_id}/apps/{app_id}/models/{model_id}/outputs"
@ -98,6 +156,40 @@ def get_prompt_model_name(url: str):
else: else:
return "", clarifai_model_name return "", clarifai_model_name
async def async_completion(
model: str,
prompt: str,
api_base: str,
custom_prompt_dict: dict,
model_response: ModelResponse,
print_verbose: Callable,
encoding,
api_key,
logging_obj,
data=None,
optional_params=None,
litellm_params=None,
logger_fn=None,
headers={}):
async_handler = AsyncHTTPHandler(
timeout=httpx.Timeout(timeout=600.0, connect=5.0)
)
response = await async_handler.post(
api_base, headers=headers, data=json.dumps(data)
)
return process_response(
model=model,
prompt=prompt,
response=response,
model_response=model_response,
api_key=api_key,
data=data,
encoding=encoding,
logging_obj=logging_obj,
)
def completion( def completion(
model: str, model: str,
messages: list, messages: list,
@ -108,6 +200,7 @@ def completion(
api_key, api_key,
logging_obj, logging_obj,
custom_prompt_dict={}, custom_prompt_dict={},
acompletion=False,
optional_params=None, optional_params=None,
litellm_params=None, litellm_params=None,
logger_fn=None, logger_fn=None,
@ -158,7 +251,24 @@ def completion(
"api_base": api_base, "api_base": api_base,
}, },
) )
if acompletion==True:
return async_completion(
model=model,
prompt=prompt,
api_base=api_base,
custom_prompt_dict=custom_prompt_dict,
model_response=model_response,
print_verbose=print_verbose,
encoding=encoding,
api_key=api_key,
logging_obj=logging_obj,
data=data,
optional_params=optional_params,
litellm_params=litellm_params,
logger_fn=logger_fn,
headers=headers,
)
else:
## COMPLETION CALL ## COMPLETION CALL
response = requests.post( response = requests.post(
model, model,
@ -166,51 +276,53 @@ def completion(
data=json.dumps(data), data=json.dumps(data),
) )
# print(response.content); exit() # print(response.content); exit()
"""
{"status":{"code":10000,"description":"Ok","req_id":"d914cf7e097487997910650cde954a37"},"outputs":[{"id":"c2baa668174b4547bd4d2e9f8996198d","status":{"code":10000,"description":"Ok"},"created_at":"2024-02-07T10:57:52.917990493Z","model":{"id":"GPT-4","name":"GPT-4","created_at":"2023-06-08T17:40:07.964967Z","modified_at":"2023-12-04T11:39:54.587604Z","app_id":"chat-completion","model_version":{"id":"5d7a50b44aec4a01a9c492c5a5fcf387","created_at":"2023-11-09T19:57:56.961259Z","status":{"code":21100,"description":"Model is trained and ready"},"completed_at":"2023-11-09T20:00:48.933172Z","visibility":{"gettable":50},"app_id":"chat-completion","user_id":"openai","metadata":{}},"user_id":"openai","model_type_id":"text-to-text","visibility":{"gettable":50},"toolkits":[],"use_cases":[],"languages":[],"languages_full":[],"check_consents":[],"workflow_recommended":false,"image":{"url":"https://data.clarifai.com/small/users/openai/apps/chat-completion/inputs/image/34326a9914d361bb93ae8e5381689755","hosted":{"prefix":"https://data.clarifai.com","suffix":"users/openai/apps/chat-completion/inputs/image/34326a9914d361bb93ae8e5381689755","sizes":["small"],"crossorigin":"use-credentials"}}},"input":{"id":"fba1f22a332743f083ddae0a7eb443ae","data":{"text":{"raw":"what\'s the weather in SF","url":"https://samples.clarifai.com/placeholder.gif"}}},"data":{"text":{"raw":"As an AI, I\'m unable to provide real-time information or updates. Please check a reliable weather website or app for the current weather in San Francisco.","text_info":{"encoding":"UnknownTextEnc"}}}}]}
"""
if response.status_code != 200: if response.status_code != 200:
raise ClarifaiError(status_code=response.status_code, message=response.text, url=model) raise ClarifaiError(status_code=response.status_code, message=response.text, url=model)
if "stream" in optional_params and optional_params["stream"] == True:
return response.iter_lines()
else:
logging_obj.post_call(
input=prompt,
api_key=api_key,
original_response=response.text,
additional_args={"complete_input_dict": data},
)
## RESPONSE OBJECT
completion_response = response.json()
# print(completion_response)
try:
choices_list = []
for idx, item in enumerate(completion_response["outputs"]):
if len(item["data"]["text"]["raw"]) > 0:
message_obj = Message(content=item["data"]["text"]["raw"])
else:
message_obj = Message(content=None)
choice_obj = Choices(
finish_reason="stop",
index=idx + 1, #check
message=message_obj,
)
choices_list.append(choice_obj)
model_response["choices"] = choices_list
except Exception as e:
raise ClarifaiError(
message=traceback.format_exc(), status_code=response.status_code, url=model
)
# Calculate Usage if "stream" in optional_params and optional_params["stream"] == True:
prompt_tokens = len(encoding.encode(prompt)) completion_stream = response.iter_lines()
completion_tokens = len( stream_response = CustomStreamWrapper(
encoding.encode(model_response["choices"][0]["message"].get("content")) completion_stream=completion_stream,
model=model,
custom_llm_provider="clarifai",
logging_obj=logging_obj,
) )
model_response["model"] = model return stream_response
model_response["usage"] = Usage(
prompt_tokens=prompt_tokens, else:
completion_tokens=completion_tokens, return process_response(
total_tokens=prompt_tokens + completion_tokens, model=model,
) prompt=prompt,
return model_response response=response,
model_response=model_response,
api_key=api_key,
data=data,
encoding=encoding,
logging_obj=logging_obj)
class ModelResponseIterator:
def __init__(self, model_response):
self.model_response = model_response
self.is_done = False
# Sync iterator
def __iter__(self):
return self
def __next__(self):
if self.is_done:
raise StopIteration
self.is_done = True
return self.model_response
# Async iterator
def __aiter__(self):
return self
async def __anext__(self):
if self.is_done:
raise StopAsyncIteration
self.is_done = True
return self.model_response

View file

@ -1185,6 +1185,7 @@ def completion(
print_verbose=print_verbose, print_verbose=print_verbose,
optional_params=optional_params, optional_params=optional_params,
litellm_params=litellm_params, litellm_params=litellm_params,
acompletion=acompletion,
logger_fn=logger_fn, logger_fn=logger_fn,
encoding=encoding, # for calculating input/output tokens encoding=encoding, # for calculating input/output tokens
api_key=clarifai_key, api_key=clarifai_key,
@ -1194,8 +1195,12 @@ def completion(
if "stream" in optional_params and optional_params["stream"] == True: if "stream" in optional_params and optional_params["stream"] == True:
# don't try to access stream object, # don't try to access stream object,
## LOGGING
model_response = CustomStreamWrapper(model_response, model, logging_obj=logging, custom_llm_provider="replicate") logging.post_call(
input=messages,
api_key=api_key,
original_response=model_response,
)
if optional_params.get("stream", False) or acompletion == True: if optional_params.get("stream", False) or acompletion == True:
## LOGGING ## LOGGING

View file

@ -1,6 +1,7 @@
import sys, os import sys, os
import traceback import traceback
from dotenv import load_dotenv from dotenv import load_dotenv
import asyncio, logging
load_dotenv() load_dotenv()
import os, io import os, io
@ -10,7 +11,7 @@ sys.path.insert(
) # Adds the parent directory to the system path ) # Adds the parent directory to the system path
import pytest import pytest
import litellm import litellm
from litellm import embedding, completion, completion_cost, Timeout, ModelResponse from litellm import embedding, completion, acompletion, acreate, completion_cost, Timeout, ModelResponse
from litellm import RateLimitError from litellm import RateLimitError
# litellm.num_retries = 3 # litellm.num_retries = 3
@ -65,3 +66,28 @@ def test_completion_clarifai_mistral_large():
pass pass
except Exception as e: except Exception as e:
pytest.fail(f"Error occurred: {e}") pytest.fail(f"Error occurred: {e}")
@pytest.mark.asyncio
def test_async_completion_clarifai():
import asyncio
litellm.set_verbose = True
async def test_get_response():
user_message = "Hello, how are you?"
messages = [{"content": user_message, "role": "user"}]
try:
response = await acompletion(
model="clarifai/openai.chat-completion.GPT-4",
messages=messages,
timeout=10,
api_key=os.getenv("CLARIFAI_API_KEY"),
)
print(f"response: {response}")
except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"An exception occurred: {e}")
asyncio.run(test_get_response())

View file

@ -392,7 +392,6 @@ def test_completion_claude_stream():
except Exception as e: except Exception as e:
pytest.fail(f"Error occurred: {e}") pytest.fail(f"Error occurred: {e}")
# test_completion_claude_stream() # test_completion_claude_stream()
def test_completion_claude_2_stream(): def test_completion_claude_2_stream():
litellm.set_verbose = True litellm.set_verbose = True

View file

@ -2807,6 +2807,7 @@ def client(original_function):
) )
else: else:
return result return result
return result return result
# Prints Exactly what was passed to litellm function - don't execute any logic here - it should just print # Prints Exactly what was passed to litellm function - don't execute any logic here - it should just print
@ -2910,6 +2911,7 @@ def client(original_function):
model_response_object=ModelResponse(), model_response_object=ModelResponse(),
stream=kwargs.get("stream", False), stream=kwargs.get("stream", False),
) )
if kwargs.get("stream", False) == True: if kwargs.get("stream", False) == True:
cached_result = CustomStreamWrapper( cached_result = CustomStreamWrapper(
completion_stream=cached_result, completion_stream=cached_result,
@ -9906,6 +9908,27 @@ class CustomStreamWrapper:
except Exception as e: except Exception as e:
raise e raise e
def handle_clarifai_completion_chunk(self, chunk):
try:
if isinstance(chunk, dict):
parsed_response = chunk
if isinstance(chunk, (str, bytes)):
if isinstance(chunk, bytes):
parsed_response = chunk.decode("utf-8")
else:
parsed_response = chunk
data_json = json.loads(parsed_response)
text = data_json.get("outputs", "")[0].get("data", "").get("text", "").get("raw","")
prompt_tokens = len(encoding.encode(data_json.get("outputs", "")[0].get("input","").get("data", "").get("text", "").get("raw","")))
completion_tokens = len(encoding.encode(text))
return {
"text": text,
"is_finished": True,
}
except:
traceback.print_exc()
return ""
def model_response_creator(self): def model_response_creator(self):
model_response = ModelResponse(stream=True, model=self.model) model_response = ModelResponse(stream=True, model=self.model)
if self.response_id is not None: if self.response_id is not None:
@ -9949,6 +9972,11 @@ class CustomStreamWrapper:
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]: if response_obj["is_finished"]:
self.received_finish_reason = response_obj["finish_reason"] self.received_finish_reason = response_obj["finish_reason"]
elif (
self.custom_llm_provider and self.custom_llm_provider == "clarifai"
):
response_obj = self.handle_clarifai_completion_chunk(chunk)
completion_obj["content"] = response_obj["text"]
elif self.model == "replicate" or self.custom_llm_provider == "replicate": elif self.model == "replicate" or self.custom_llm_provider == "replicate":
response_obj = self.handle_replicate_chunk(chunk) response_obj = self.handle_replicate_chunk(chunk)
completion_obj["content"] = response_obj["text"] completion_obj["content"] = response_obj["text"]