feat(predibase.py): support async_completion + streaming (sync + async)

finishes up pr
This commit is contained in:
Krrish Dholakia 2024-05-09 17:41:27 -07:00
parent 186c0ec77b
commit d7189c21fd
6 changed files with 7196 additions and 108 deletions

View file

@ -60,8 +60,8 @@ class PredibaseConfig:
adapter_id: Optional[str] = None
adapter_source: Optional[Literal["pbase", "hub", "s3"]] = None
best_of: Optional[int] = None
decoder_input_details: bool = True # on by default - get the finish reason
details: Optional[bool] = True # enables returning logprobs + best of
decoder_input_details: Optional[bool] = None
details: bool = True # enables returning logprobs + best of
max_new_tokens: int = (
256 # openai default - requests hang if max_new_tokens not given
)
@ -124,6 +124,9 @@ class PredibaseConfig:
class PredibaseChatCompletion(BaseLLM):
def __init__(self) -> None:
self.async_handler = AsyncHTTPHandler(
timeout=httpx.Timeout(timeout=litellm.request_timeout, connect=5.0)
)
super().__init__()
def validate_environment(self, api_key: Optional[str], user_headers: dict) -> dict:
@ -162,7 +165,7 @@ class PredibaseChatCompletion(BaseLLM):
def process_response(
self,
model: str,
response: requests.Response,
response: Union[requests.Response, httpx.Response],
model_response: ModelResponse,
stream: bool,
logging_obj: litellm.utils.Logging,
@ -216,7 +219,7 @@ class PredibaseChatCompletion(BaseLLM):
"details"
]["finish_reason"]
sum_logprob = 0
for token in completion_response[0]["details"]["tokens"]:
for token in completion_response["details"]["tokens"]:
if token["logprob"] != None:
sum_logprob += token["logprob"]
model_response["choices"][0][
@ -226,12 +229,12 @@ class PredibaseChatCompletion(BaseLLM):
)
if "best_of" in optional_params and optional_params["best_of"] > 1:
if (
"details" in completion_response[0]
and "best_of_sequences" in completion_response[0]["details"]
"details" in completion_response
and "best_of_sequences" in completion_response["details"]
):
choices_list = []
for idx, item in enumerate(
completion_response[0]["details"]["best_of_sequences"]
completion_response["details"]["best_of_sequences"]
):
sum_logprob = 0
for token in item["tokens"]:
@ -305,7 +308,7 @@ class PredibaseChatCompletion(BaseLLM):
litellm_params=None,
logger_fn=None,
headers: dict = {},
):
) -> Union[ModelResponse, CustomStreamWrapper]:
headers = self.validate_environment(api_key, headers)
completion_url = ""
input_text = ""
@ -317,7 +320,12 @@ class PredibaseChatCompletion(BaseLLM):
elif "PREDIBASE_API_BASE" in os.environ:
base_url = os.getenv("PREDIBASE_API_BASE", "")
completion_url = f"{base_url}/{tenant_id}/deployments/v2/llms/{model}/generate"
completion_url = f"{base_url}/{tenant_id}/deployments/v2/llms/{model}"
if optional_params.get("stream", False) == True:
completion_url += "/generate_stream"
else:
completion_url += "/generate"
if model in custom_prompt_dict:
# check if the model has a registered custom prompt
@ -339,12 +347,12 @@ class PredibaseChatCompletion(BaseLLM):
): # completion(top_k=3) > anthropic_config(top_k=3) <- allows for dynamic variables to be passed in
optional_params[k] = v
stream = optional_params.pop("stream", False)
data = {
"inputs": prompt,
"parameters": optional_params,
}
if optional_params.get("stream") and optional_params["stream"] == True:
data["stream"] = True
input_text = prompt
## LOGGING
logging_obj.pre_call(
@ -360,34 +368,62 @@ class PredibaseChatCompletion(BaseLLM):
## COMPLETION CALL
if acompletion is True:
### ASYNC STREAMING
if optional_params.get("stream", False):
return self.async_streaming(logging_obj=logging_obj, api_base=completion_url, data=data, headers=headers, model_response=model_response, model=model, timeout=timeout) # type: ignore
if stream == True:
return self.async_streaming(
model=model,
messages=messages,
data=data,
api_base=completion_url,
model_response=model_response,
print_verbose=print_verbose,
encoding=encoding,
api_key=api_key,
logging_obj=logging_obj,
optional_params=optional_params,
litellm_params=litellm_params,
logger_fn=logger_fn,
headers=headers,
) # type: ignore
else:
### ASYNC COMPLETION
return self.acompletion(api_base=completion_url, data=data, headers=headers, model_response=model_response, task=task, encoding=encoding, input_text=input_text, model=model, optional_params=optional_params, timeout=timeout) # type: ignore
return self.async_completion(
model=model,
messages=messages,
data=data,
api_base=api_base,
model_response=model_response,
print_verbose=print_verbose,
encoding=encoding,
api_key=api_key,
logging_obj=logging_obj,
optional_params=optional_params,
stream=False,
litellm_params=litellm_params,
logger_fn=logger_fn,
headers=headers,
) # type: ignore
### SYNC STREAMING
if "stream" in optional_params and optional_params["stream"] == True:
if stream == True:
response = requests.post(
completion_url,
headers=headers,
data=json.dumps(data),
stream=optional_params["stream"],
stream=stream,
)
return response.iter_lines()
response = CustomStreamWrapper(
response.iter_lines(),
model,
custom_llm_provider="predibase",
logging_obj=logging_obj,
)
return response
### SYNC COMPLETION
else:
payload = json.dumps(
{
"inputs": "What is your name?",
"parameters": {"max_new_tokens": 20, "temperature": 0.1},
}
# data
)
response = requests.post(
url="https://serving.app.predibase.com/c4768f95/deployments/v2/llms/llama-3-8b-instruct/generate",
url=completion_url,
headers=headers,
data=payload,
data=json.dumps(data),
)
return self.process_response(
@ -404,14 +440,80 @@ class PredibaseChatCompletion(BaseLLM):
encoding=encoding,
)
async def async_completion(self):
pass
async def async_completion(
self,
model: str,
messages: list,
api_base: str,
model_response: ModelResponse,
print_verbose: Callable,
encoding,
api_key,
logging_obj,
stream,
data: dict,
optional_params: dict,
litellm_params=None,
logger_fn=None,
headers={},
) -> ModelResponse:
async def async_streaming(self):
pass
response = await self.async_handler.post(
api_base, headers=headers, data=json.dumps(data)
)
return self.process_response(
model=model,
response=response,
model_response=model_response,
stream=stream,
logging_obj=logging_obj,
api_key=api_key,
data=data,
messages=messages,
print_verbose=print_verbose,
optional_params=optional_params,
encoding=encoding,
)
def streaming(self):
pass
async def async_streaming(
self,
model: str,
messages: list,
api_base: str,
model_response: ModelResponse,
print_verbose: Callable,
encoding,
api_key,
logging_obj,
data: dict,
optional_params=None,
litellm_params=None,
logger_fn=None,
headers={},
) -> CustomStreamWrapper:
data["stream"] = True
response = await self.async_handler.post(
url="https://serving.app.predibase.com/c4768f95/deployments/v2/llms/llama-3-8b-instruct/generate_stream",
headers=headers,
data=json.dumps(data),
stream=True,
)
if response.status_code != 200:
raise PredibaseError(
status_code=response.status_code, message=response.text
)
completion_stream = response.aiter_lines()
streamwrapper = CustomStreamWrapper(
completion_stream=completion_stream,
model=model,
custom_llm_provider="predibase",
logging_obj=logging_obj,
)
return streamwrapper
def embedding(self, *args, **kwargs):
pass

View file

@ -320,6 +320,7 @@ async def acompletion(
or custom_llm_provider == "gemini"
or custom_llm_provider == "sagemaker"
or custom_llm_provider == "anthropic"
or custom_llm_provider == "predibase"
or custom_llm_provider in litellm.openai_compatible_providers
): # currently implemented aiohttp calls for just azure, openai, hf, ollama, vertex ai soon all.
init_response = await loop.run_in_executor(None, func_with_context)
@ -1831,12 +1832,6 @@ def completion(
and optional_params["stream"] == True
and acompletion == False
):
response = CustomStreamWrapper(
model_response,
model,
custom_llm_provider="predibase",
logging_obj=logging,
)
return response
response = model_response
elif custom_llm_provider == "ai21":

File diff suppressed because it is too large Load diff

View file

@ -85,20 +85,33 @@ def test_completion_azure_command_r():
pytest.fail(f"Error occurred: {e}")
@pytest.mark.skip(reason="local test")
def test_completion_predibase():
# @pytest.mark.skip(reason="local test")
@pytest.mark.parametrize("sync_mode", [True, False])
@pytest.mark.asyncio
async def test_completion_predibase(sync_mode):
try:
litellm.set_verbose = True
response = completion(
model="predibase/llama-3-8b-instruct",
tenant_id="c4768f95",
api_base="https://serving.app.predibase.com",
api_key=os.getenv("PREDIBASE_API_KEY"),
messages=[{"role": "user", "content": "What is the meaning of life?"}],
)
if sync_mode:
response = completion(
model="predibase/llama-3-8b-instruct",
tenant_id="c4768f95",
api_base="https://serving.app.predibase.com",
api_key=os.getenv("PREDIBASE_API_KEY"),
messages=[{"role": "user", "content": "What is the meaning of life?"}],
)
print(response)
print(response)
else:
response = await litellm.acompletion(
model="predibase/llama-3-8b-instruct",
tenant_id="c4768f95",
api_base="https://serving.app.predibase.com",
api_key=os.getenv("PREDIBASE_API_KEY"),
messages=[{"role": "user", "content": "What is the meaning of life?"}],
)
print(response)
except litellm.Timeout as e:
pass
except Exception as e:

View file

@ -5,6 +5,7 @@ import sys, os, asyncio
import traceback
import time, pytest
from pydantic import BaseModel
from typing import Tuple
sys.path.insert(
0, os.path.abspath("../..")
@ -142,7 +143,7 @@ def validate_last_format(chunk):
), "'finish_reason' should be a string."
def streaming_format_tests(idx, chunk):
def streaming_format_tests(idx, chunk) -> Tuple[str, bool]:
extracted_chunk = ""
finished = False
print(f"chunk: {chunk}")
@ -306,6 +307,70 @@ def test_completion_azure_stream():
# test_completion_azure_stream()
@pytest.mark.parametrize("sync_mode", [True, False])
@pytest.mark.asyncio
async def test_completion_predibase_streaming(sync_mode):
try:
litellm.set_verbose = True
if sync_mode:
response = completion(
model="predibase/llama-3-8b-instruct",
tenant_id="c4768f95",
api_base="https://serving.app.predibase.com",
api_key=os.getenv("PREDIBASE_API_KEY"),
messages=[{"role": "user", "content": "What is the meaning of life?"}],
stream=True,
)
complete_response = ""
for idx, init_chunk in enumerate(response):
chunk, finished = streaming_format_tests(idx, init_chunk)
complete_response += chunk
custom_llm_provider = init_chunk._hidden_params["custom_llm_provider"]
print(f"custom_llm_provider: {custom_llm_provider}")
assert custom_llm_provider == "predibase"
if finished:
assert isinstance(
init_chunk.choices[0], litellm.utils.StreamingChoices
)
break
if complete_response.strip() == "":
raise Exception("Empty response received")
else:
response = await litellm.acompletion(
model="predibase/llama-3-8b-instruct",
tenant_id="c4768f95",
api_base="https://serving.app.predibase.com",
api_key=os.getenv("PREDIBASE_API_KEY"),
messages=[{"role": "user", "content": "What is the meaning of life?"}],
stream=True,
)
# await response
complete_response = ""
idx = 0
async for init_chunk in response:
chunk, finished = streaming_format_tests(idx, init_chunk)
complete_response += chunk
custom_llm_provider = init_chunk._hidden_params["custom_llm_provider"]
print(f"custom_llm_provider: {custom_llm_provider}")
assert custom_llm_provider == "predibase"
idx += 1
if finished:
assert isinstance(
init_chunk.choices[0], litellm.utils.StreamingChoices
)
break
if complete_response.strip() == "":
raise Exception("Empty response received")
print(f"complete_response: {complete_response}")
except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"Error occurred: {e}")
def test_completion_azure_function_calling_stream():

View file

@ -9757,6 +9757,50 @@ class CustomStreamWrapper:
"finish_reason": finish_reason,
}
def handle_predibase_chunk(self, chunk):
try:
if type(chunk) != str:
chunk = chunk.decode(
"utf-8"
) # DO NOT REMOVE this: This is required for HF inference API + Streaming
text = ""
is_finished = False
finish_reason = ""
print_verbose(f"chunk: {chunk}")
if chunk.startswith("data:"):
data_json = json.loads(chunk[5:])
print_verbose(f"data json: {data_json}")
if "token" in data_json and "text" in data_json["token"]:
text = data_json["token"]["text"]
if data_json.get("details", False) and data_json["details"].get(
"finish_reason", False
):
is_finished = True
finish_reason = data_json["details"]["finish_reason"]
elif data_json.get(
"generated_text", False
): # if full generated text exists, then stream is complete
text = "" # don't return the final bos token
is_finished = True
finish_reason = "stop"
elif data_json.get("error", False):
raise Exception(data_json.get("error"))
return {
"text": text,
"is_finished": is_finished,
"finish_reason": finish_reason,
}
elif "error" in chunk:
raise ValueError(chunk)
return {
"text": text,
"is_finished": is_finished,
"finish_reason": finish_reason,
}
except Exception as e:
traceback.print_exc()
raise e
def handle_huggingface_chunk(self, chunk):
try:
if type(chunk) != str:
@ -10391,6 +10435,11 @@ class CustomStreamWrapper:
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider and self.custom_llm_provider == "predibase":
response_obj = self.handle_predibase_chunk(chunk)
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
self.received_finish_reason = response_obj["finish_reason"]
elif (
self.custom_llm_provider and self.custom_llm_provider == "baseten"
): # baseten doesn't provide streaming
@ -11008,6 +11057,7 @@ class CustomStreamWrapper:
or self.custom_llm_provider == "sagemaker"
or self.custom_llm_provider == "gemini"
or self.custom_llm_provider == "cached_response"
or self.custom_llm_provider == "predibase"
or self.custom_llm_provider in litellm.openai_compatible_endpoints
):
async for chunk in self.completion_stream: