From 723ef9963e9c9468d98a25ab5aa8ed3f2499ccac Mon Sep 17 00:00:00 2001 From: mogith-pn <143642606+mogith-pn@users.noreply.github.com> Date: Fri, 3 May 2024 14:03:38 +0000 Subject: [PATCH] Clarifai - Added streaming and async completion support --- cookbook/liteLLM_clarifai_Demo.ipynb | 38 +++- litellm/llms/clarifai.py | 206 +++++++++++++++++----- litellm/main.py | 9 +- litellm/tests/test_clarifai_completion.py | 28 ++- litellm/tests/test_streaming.py | 3 +- litellm/utils.py | 28 +++ 6 files changed, 259 insertions(+), 53 deletions(-) diff --git a/cookbook/liteLLM_clarifai_Demo.ipynb b/cookbook/liteLLM_clarifai_Demo.ipynb index 4e3b4dbb0..40ef2fcf9 100644 --- a/cookbook/liteLLM_clarifai_Demo.ipynb +++ b/cookbook/liteLLM_clarifai_Demo.ipynb @@ -119,6 +119,42 @@ "print(f\"Claude-2.1 response : {response}\")" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### OpenAI GPT-4 (Streaming)\n", + "Though clarifai doesn't support streaming, still you can call stream and get the response in standard StreamResponse format of liteLLM" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "ModelResponse(id='chatcmpl-40ae19af-3bf0-4eb4-99f2-33aec3ba84af', choices=[StreamingChoices(finish_reason=None, index=0, delta=Delta(content=\"In the quiet corners of time's grand hall,\\nLies the tale of rise and fall.\\nFrom ancient ruins to modern sprawl,\\nHistory, the greatest story of them all.\\n\\nEmpires have risen, empires have decayed,\\nThrough the eons, memories have stayed.\\nIn the book of time, history is laid,\\nA tapestry of events, meticulously displayed.\\n\\nThe pyramids of Egypt, standing tall,\\nThe Roman Empire's mighty sprawl.\\nFrom Alexander's conquest, to the Berlin Wall,\\nHistory, a silent witness to it all.\\n\\nIn the shadow of the past we tread,\\nWhere once kings and prophets led.\\nTheir stories in our hearts are spread,\\nEchoes of their words, in our minds are read.\\n\\nBattles fought and victories won,\\nActs of courage under the sun.\\nTales of love, of deeds done,\\nIn history's grand book, they all run.\\n\\nHeroes born, legends made,\\nIn the annals of time, they'll never fade.\\nTheir triumphs and failures all displayed,\\nIn the eternal march of history's parade.\\n\\nThe ink of the past is forever dry,\\nBut its lessons, we cannot deny.\\nIn its stories, truths lie,\\nIn its wisdom, we rely.\\n\\nHistory, a mirror to our past,\\nA guide for the future vast.\\nThrough its lens, we're ever cast,\\nIn the drama of life, forever vast.\", role='assistant', function_call=None, tool_calls=None), logprobs=None)], created=1714744515, model='https://api.clarifai.com/v2/users/openai/apps/chat-completion/models/GPT-4/outputs', object='chat.completion.chunk', system_fingerprint=None)\n", + "ModelResponse(id='chatcmpl-40ae19af-3bf0-4eb4-99f2-33aec3ba84af', choices=[StreamingChoices(finish_reason='stop', index=0, delta=Delta(content=None, role=None, function_call=None, tool_calls=None), logprobs=None)], created=1714744515, model='https://api.clarifai.com/v2/users/openai/apps/chat-completion/models/GPT-4/outputs', object='chat.completion.chunk', system_fingerprint=None)\n" + ] + } + ], + "source": [ + "from litellm import completion\n", + "\n", + "messages = [{\"role\": \"user\",\"content\": \"\"\"Write a poem about history?\"\"\"}]\n", + "response = completion(\n", + " model=\"clarifai/openai.chat-completion.GPT-4\",\n", + " messages=messages,\n", + " stream=True,\n", + " api_key = \"c75cc032415e45368be331fdd2c06db0\")\n", + "\n", + "for chunk in response:\n", + " print(chunk)" + ] + }, { "cell_type": "code", "execution_count": null, @@ -143,7 +179,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.10" + "version": "3.10.13" } }, "nbformat": 4, diff --git a/litellm/llms/clarifai.py b/litellm/llms/clarifai.py index 2a7d77c61..e07a8d9e8 100644 --- a/litellm/llms/clarifai.py +++ b/litellm/llms/clarifai.py @@ -3,9 +3,10 @@ import json import requests import time from typing import Callable, Optional -from litellm.utils import ModelResponse, Usage, Choices, Message +from litellm.utils import ModelResponse, Usage, Choices, Message, CustomStreamWrapper import litellm import httpx +from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler from .prompt_templates.factory import prompt_factory, custom_prompt @@ -84,6 +85,63 @@ def completions_to_model(payload): "inputs": [{"data": {"text": {"raw": payload["prompt"]}}}], "model": {"output_info": {"params": params}}, } + +def process_response( + model, + prompt, + response, + model_response, + api_key, + data, + encoding, + logging_obj + ): + logging_obj.post_call( + input=prompt, + api_key=api_key, + original_response=response.text, + additional_args={"complete_input_dict": data}, + ) + ## RESPONSE OBJECT + try: + completion_response = response.json() + except Exception: + raise ClarifaiError( + message=response.text, status_code=response.status_code, url=model + ) + # print(completion_response) + try: + choices_list = [] + for idx, item in enumerate(completion_response["outputs"]): + if len(item["data"]["text"]["raw"]) > 0: + message_obj = Message(content=item["data"]["text"]["raw"]) + else: + message_obj = Message(content=None) + choice_obj = Choices( + finish_reason="stop", + index=idx + 1, #check + message=message_obj, + ) + choices_list.append(choice_obj) + model_response["choices"] = choices_list + + except Exception as e: + raise ClarifaiError( + message=traceback.format_exc(), status_code=response.status_code, url=model + ) + + # Calculate Usage + prompt_tokens = len(encoding.encode(prompt)) + completion_tokens = len( + encoding.encode(model_response["choices"][0]["message"].get("content")) + ) + model_response["model"] = model + model_response["usage"] = Usage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=prompt_tokens + completion_tokens, + ) + return model_response def convert_model_to_url(model: str, api_base: str): user_id, app_id, model_id = model.split(".") @@ -98,6 +156,40 @@ def get_prompt_model_name(url: str): else: return "", clarifai_model_name +async def async_completion( + model: str, + prompt: str, + api_base: str, + custom_prompt_dict: dict, + model_response: ModelResponse, + print_verbose: Callable, + encoding, + api_key, + logging_obj, + data=None, + optional_params=None, + litellm_params=None, + logger_fn=None, + headers={}): + + async_handler = AsyncHTTPHandler( + timeout=httpx.Timeout(timeout=600.0, connect=5.0) + ) + response = await async_handler.post( + api_base, headers=headers, data=json.dumps(data) + ) + + return process_response( + model=model, + prompt=prompt, + response=response, + model_response=model_response, + api_key=api_key, + data=data, + encoding=encoding, + logging_obj=logging_obj, + ) + def completion( model: str, messages: list, @@ -108,6 +200,7 @@ def completion( api_key, logging_obj, custom_prompt_dict={}, + acompletion=False, optional_params=None, litellm_params=None, logger_fn=None, @@ -158,59 +251,78 @@ def completion( "api_base": api_base, }, ) - - ## COMPLETION CALL - response = requests.post( + if acompletion==True: + return async_completion( + model=model, + prompt=prompt, + api_base=api_base, + custom_prompt_dict=custom_prompt_dict, + model_response=model_response, + print_verbose=print_verbose, + encoding=encoding, + api_key=api_key, + logging_obj=logging_obj, + data=data, + optional_params=optional_params, + litellm_params=litellm_params, + logger_fn=logger_fn, + headers=headers, + ) + else: + ## COMPLETION CALL + response = requests.post( model, headers=headers, data=json.dumps(data), ) # print(response.content); exit() - """ - {"status":{"code":10000,"description":"Ok","req_id":"d914cf7e097487997910650cde954a37"},"outputs":[{"id":"c2baa668174b4547bd4d2e9f8996198d","status":{"code":10000,"description":"Ok"},"created_at":"2024-02-07T10:57:52.917990493Z","model":{"id":"GPT-4","name":"GPT-4","created_at":"2023-06-08T17:40:07.964967Z","modified_at":"2023-12-04T11:39:54.587604Z","app_id":"chat-completion","model_version":{"id":"5d7a50b44aec4a01a9c492c5a5fcf387","created_at":"2023-11-09T19:57:56.961259Z","status":{"code":21100,"description":"Model is trained and ready"},"completed_at":"2023-11-09T20:00:48.933172Z","visibility":{"gettable":50},"app_id":"chat-completion","user_id":"openai","metadata":{}},"user_id":"openai","model_type_id":"text-to-text","visibility":{"gettable":50},"toolkits":[],"use_cases":[],"languages":[],"languages_full":[],"check_consents":[],"workflow_recommended":false,"image":{"url":"https://data.clarifai.com/small/users/openai/apps/chat-completion/inputs/image/34326a9914d361bb93ae8e5381689755","hosted":{"prefix":"https://data.clarifai.com","suffix":"users/openai/apps/chat-completion/inputs/image/34326a9914d361bb93ae8e5381689755","sizes":["small"],"crossorigin":"use-credentials"}}},"input":{"id":"fba1f22a332743f083ddae0a7eb443ae","data":{"text":{"raw":"what\'s the weather in SF","url":"https://samples.clarifai.com/placeholder.gif"}}},"data":{"text":{"raw":"As an AI, I\'m unable to provide real-time information or updates. Please check a reliable weather website or app for the current weather in San Francisco.","text_info":{"encoding":"UnknownTextEnc"}}}}]} - """ + if response.status_code != 200: raise ClarifaiError(status_code=response.status_code, message=response.text, url=model) + if "stream" in optional_params and optional_params["stream"] == True: - return response.iter_lines() - else: - logging_obj.post_call( - input=prompt, - api_key=api_key, - original_response=response.text, - additional_args={"complete_input_dict": data}, - ) - ## RESPONSE OBJECT - completion_response = response.json() - # print(completion_response) - try: - choices_list = [] - for idx, item in enumerate(completion_response["outputs"]): - if len(item["data"]["text"]["raw"]) > 0: - message_obj = Message(content=item["data"]["text"]["raw"]) - else: - message_obj = Message(content=None) - choice_obj = Choices( - finish_reason="stop", - index=idx + 1, #check - message=message_obj, - ) - choices_list.append(choice_obj) - model_response["choices"] = choices_list - except Exception as e: - raise ClarifaiError( - message=traceback.format_exc(), status_code=response.status_code, url=model + completion_stream = response.iter_lines() + stream_response = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="clarifai", + logging_obj=logging_obj, ) + return stream_response + + else: + return process_response( + model=model, + prompt=prompt, + response=response, + model_response=model_response, + api_key=api_key, + data=data, + encoding=encoding, + logging_obj=logging_obj) + - # Calculate Usage - prompt_tokens = len(encoding.encode(prompt)) - completion_tokens = len( - encoding.encode(model_response["choices"][0]["message"].get("content")) - ) - model_response["model"] = model - model_response["usage"] = Usage( - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=prompt_tokens + completion_tokens, - ) - return model_response \ No newline at end of file +class ModelResponseIterator: + def __init__(self, model_response): + self.model_response = model_response + self.is_done = False + + # Sync iterator + def __iter__(self): + return self + + def __next__(self): + if self.is_done: + raise StopIteration + self.is_done = True + return self.model_response + + # Async iterator + def __aiter__(self): + return self + + async def __anext__(self): + if self.is_done: + raise StopAsyncIteration + self.is_done = True + return self.model_response \ No newline at end of file diff --git a/litellm/main.py b/litellm/main.py index 0bc802a9c..396ac1779 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -1185,6 +1185,7 @@ def completion( print_verbose=print_verbose, optional_params=optional_params, litellm_params=litellm_params, + acompletion=acompletion, logger_fn=logger_fn, encoding=encoding, # for calculating input/output tokens api_key=clarifai_key, @@ -1194,8 +1195,12 @@ def completion( if "stream" in optional_params and optional_params["stream"] == True: # don't try to access stream object, - - model_response = CustomStreamWrapper(model_response, model, logging_obj=logging, custom_llm_provider="replicate") + ## LOGGING + logging.post_call( + input=messages, + api_key=api_key, + original_response=model_response, + ) if optional_params.get("stream", False) or acompletion == True: ## LOGGING diff --git a/litellm/tests/test_clarifai_completion.py b/litellm/tests/test_clarifai_completion.py index 2c2626398..347e513bc 100644 --- a/litellm/tests/test_clarifai_completion.py +++ b/litellm/tests/test_clarifai_completion.py @@ -1,6 +1,7 @@ import sys, os import traceback from dotenv import load_dotenv +import asyncio, logging load_dotenv() import os, io @@ -10,7 +11,7 @@ sys.path.insert( ) # Adds the parent directory to the system path import pytest import litellm -from litellm import embedding, completion, completion_cost, Timeout, ModelResponse +from litellm import embedding, completion, acompletion, acreate, completion_cost, Timeout, ModelResponse from litellm import RateLimitError # litellm.num_retries = 3 @@ -65,3 +66,28 @@ def test_completion_clarifai_mistral_large(): pass except Exception as e: pytest.fail(f"Error occurred: {e}") + +@pytest.mark.asyncio +def test_async_completion_clarifai(): + import asyncio + + litellm.set_verbose = True + + async def test_get_response(): + user_message = "Hello, how are you?" + messages = [{"content": user_message, "role": "user"}] + try: + response = await acompletion( + model="clarifai/openai.chat-completion.GPT-4", + messages=messages, + timeout=10, + api_key=os.getenv("CLARIFAI_API_KEY"), + ) + print(f"response: {response}") + except litellm.Timeout as e: + pass + except Exception as e: + pytest.fail(f"An exception occurred: {e}") + + + asyncio.run(test_get_response()) diff --git a/litellm/tests/test_streaming.py b/litellm/tests/test_streaming.py index d0d8a720a..bb9e0e16b 100644 --- a/litellm/tests/test_streaming.py +++ b/litellm/tests/test_streaming.py @@ -391,8 +391,7 @@ def test_completion_claude_stream(): print(f"completion_response: {complete_response}") except Exception as e: pytest.fail(f"Error occurred: {e}") - - + # test_completion_claude_stream() def test_completion_claude_2_stream(): litellm.set_verbose = True diff --git a/litellm/utils.py b/litellm/utils.py index e5f7f9d11..56518f9f9 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -2807,6 +2807,7 @@ def client(original_function): ) else: return result + return result # Prints Exactly what was passed to litellm function - don't execute any logic here - it should just print @@ -2910,6 +2911,7 @@ def client(original_function): model_response_object=ModelResponse(), stream=kwargs.get("stream", False), ) + if kwargs.get("stream", False) == True: cached_result = CustomStreamWrapper( completion_stream=cached_result, @@ -9905,6 +9907,27 @@ class CustomStreamWrapper: return {"text": "", "is_finished": False} except Exception as e: raise e + + def handle_clarifai_completion_chunk(self, chunk): + try: + if isinstance(chunk, dict): + parsed_response = chunk + if isinstance(chunk, (str, bytes)): + if isinstance(chunk, bytes): + parsed_response = chunk.decode("utf-8") + else: + parsed_response = chunk + data_json = json.loads(parsed_response) + text = data_json.get("outputs", "")[0].get("data", "").get("text", "").get("raw","") + prompt_tokens = len(encoding.encode(data_json.get("outputs", "")[0].get("input","").get("data", "").get("text", "").get("raw",""))) + completion_tokens = len(encoding.encode(text)) + return { + "text": text, + "is_finished": True, + } + except: + traceback.print_exc() + return "" def model_response_creator(self): model_response = ModelResponse(stream=True, model=self.model) @@ -9949,6 +9972,11 @@ class CustomStreamWrapper: completion_obj["content"] = response_obj["text"] if response_obj["is_finished"]: self.received_finish_reason = response_obj["finish_reason"] + elif ( + self.custom_llm_provider and self.custom_llm_provider == "clarifai" + ): + response_obj = self.handle_clarifai_completion_chunk(chunk) + completion_obj["content"] = response_obj["text"] elif self.model == "replicate" or self.custom_llm_provider == "replicate": response_obj = self.handle_replicate_chunk(chunk) completion_obj["content"] = response_obj["text"]