forked from phoenix/litellm-mirror
fix: fix nlp cloud streaming
This commit is contained in:
parent
a688df79b1
commit
6d9f7b8f9d
3 changed files with 97 additions and 14 deletions
|
@ -131,14 +131,14 @@ def completion(
|
||||||
logging_obj.pre_call(
|
logging_obj.pre_call(
|
||||||
input=text,
|
input=text,
|
||||||
api_key=api_key,
|
api_key=api_key,
|
||||||
additional_args={"complete_input_dict": data},
|
additional_args={"complete_input_dict": data, "headers": headers, "api_base": completion_url},
|
||||||
)
|
)
|
||||||
## COMPLETION CALL
|
## COMPLETION CALL
|
||||||
response = requests.post(
|
response = requests.post(
|
||||||
completion_url, headers=headers, data=json.dumps(data), stream=optional_params["stream"] if "stream" in optional_params else False
|
completion_url, headers=headers, data=json.dumps(data), stream=optional_params["stream"] if "stream" in optional_params else False
|
||||||
)
|
)
|
||||||
if "stream" in optional_params and optional_params["stream"] == True:
|
if "stream" in optional_params and optional_params["stream"] == True:
|
||||||
return response.iter_lines()
|
return clean_and_iterate_chunks(response)
|
||||||
else:
|
else:
|
||||||
## LOGGING
|
## LOGGING
|
||||||
logging_obj.post_call(
|
logging_obj.post_call(
|
||||||
|
@ -179,6 +179,34 @@ def completion(
|
||||||
model_response.usage = usage
|
model_response.usage = usage
|
||||||
return model_response
|
return model_response
|
||||||
|
|
||||||
|
|
||||||
|
# def clean_and_iterate_chunks(response):
|
||||||
|
# def process_chunk(chunk):
|
||||||
|
# print(f"received chunk: {chunk}")
|
||||||
|
# cleaned_chunk = chunk.decode("utf-8")
|
||||||
|
# # Perform further processing based on your needs
|
||||||
|
# return cleaned_chunk
|
||||||
|
|
||||||
|
# for line in response.iter_lines():
|
||||||
|
# if line:
|
||||||
|
# yield process_chunk(line)
|
||||||
|
def clean_and_iterate_chunks(response):
|
||||||
|
buffer = b''
|
||||||
|
|
||||||
|
for chunk in response.iter_content(chunk_size=1024):
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
|
||||||
|
buffer += chunk
|
||||||
|
while b'\x00' in buffer:
|
||||||
|
buffer = buffer.replace(b'\x00', b'')
|
||||||
|
yield buffer.decode('utf-8')
|
||||||
|
buffer = b''
|
||||||
|
|
||||||
|
# No more data expected, yield any remaining data in the buffer
|
||||||
|
if buffer:
|
||||||
|
yield buffer.decode('utf-8')
|
||||||
|
|
||||||
def embedding():
|
def embedding():
|
||||||
# logic for parsing in - calling - parsing out model embedding calls
|
# logic for parsing in - calling - parsing out model embedding calls
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -318,7 +318,36 @@ def test_completion_deep_infra_stream():
|
||||||
print(f"completion_response: {complete_response}")
|
print(f"completion_response: {complete_response}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pytest.fail(f"Error occurred: {e}")
|
pytest.fail(f"Error occurred: {e}")
|
||||||
test_completion_deep_infra_stream()
|
# test_completion_deep_infra_stream()
|
||||||
|
|
||||||
|
def test_completion_nlp_cloud_stream():
|
||||||
|
try:
|
||||||
|
messages = [
|
||||||
|
{"role": "system", "content": "You are a helpful assistant."},
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": "how does a court case get to the Supreme Court?",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
print("testing nlp cloud streaming")
|
||||||
|
response = completion(
|
||||||
|
model="nlp_cloud/finetuned-llama-2-70b", messages=messages, stream=True, max_tokens=20
|
||||||
|
)
|
||||||
|
|
||||||
|
complete_response = ""
|
||||||
|
# Add any assertions here to check the response
|
||||||
|
for idx, chunk in enumerate(response):
|
||||||
|
chunk, finished = streaming_format_tests(idx, chunk)
|
||||||
|
complete_response += chunk
|
||||||
|
if finished:
|
||||||
|
break
|
||||||
|
if complete_response.strip() == "":
|
||||||
|
raise Exception("Empty response received")
|
||||||
|
print(f"completion_response: {complete_response}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error occurred: {e}")
|
||||||
|
pytest.fail(f"Error occurred: {e}")
|
||||||
|
test_completion_nlp_cloud_stream()
|
||||||
|
|
||||||
def test_completion_claude_stream_bad_key():
|
def test_completion_claude_stream_bad_key():
|
||||||
try:
|
try:
|
||||||
|
@ -652,7 +681,7 @@ def hf_test_completion_tgi_stream():
|
||||||
print(f"completion_response: {complete_response}")
|
print(f"completion_response: {complete_response}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pytest.fail(f"Error occurred: {e}")
|
pytest.fail(f"Error occurred: {e}")
|
||||||
hf_test_completion_tgi_stream()
|
# hf_test_completion_tgi_stream()
|
||||||
|
|
||||||
# def test_completion_aleph_alpha():
|
# def test_completion_aleph_alpha():
|
||||||
# try:
|
# try:
|
||||||
|
|
|
@ -4642,6 +4642,7 @@ class CustomStreamWrapper:
|
||||||
self.sent_last_chunk = False
|
self.sent_last_chunk = False
|
||||||
self.special_tokens = ["<|assistant|>", "<|system|>", "<|user|>", "<s>", "</s>"]
|
self.special_tokens = ["<|assistant|>", "<|system|>", "<|user|>", "<s>", "</s>"]
|
||||||
self.holding_chunk = ""
|
self.holding_chunk = ""
|
||||||
|
self.complete_response = ""
|
||||||
if self.logging_obj:
|
if self.logging_obj:
|
||||||
# Log the type of the received item
|
# Log the type of the received item
|
||||||
self.logging_obj.post_call(str(type(completion_stream)))
|
self.logging_obj.post_call(str(type(completion_stream)))
|
||||||
|
@ -4652,6 +4653,23 @@ class CustomStreamWrapper:
|
||||||
def __aiter__(self):
|
def __aiter__(self):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def process_chunk(self, chunk: str):
|
||||||
|
"""
|
||||||
|
NLP Cloud streaming returns the entire response, for each chunk. Process this, to only return the delta.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
chunk = chunk.strip()
|
||||||
|
self.complete_response = self.complete_response.strip()
|
||||||
|
|
||||||
|
if chunk.startswith(self.complete_response):
|
||||||
|
# Remove last_sent_chunk only if it appears at the start of the new chunk
|
||||||
|
chunk = chunk[len(self.complete_response):]
|
||||||
|
|
||||||
|
self.complete_response += chunk
|
||||||
|
return chunk
|
||||||
|
except Exception as e:
|
||||||
|
raise e
|
||||||
|
|
||||||
def logging(self, text):
|
def logging(self, text):
|
||||||
if self.logging_obj:
|
if self.logging_obj:
|
||||||
self.logging_obj.post_call(text)
|
self.logging_obj.post_call(text)
|
||||||
|
@ -4774,14 +4792,22 @@ class CustomStreamWrapper:
|
||||||
raise ValueError(f"Unable to parse response. Original response: {chunk}")
|
raise ValueError(f"Unable to parse response. Original response: {chunk}")
|
||||||
|
|
||||||
def handle_nlp_cloud_chunk(self, chunk):
|
def handle_nlp_cloud_chunk(self, chunk):
|
||||||
chunk = chunk.decode("utf-8")
|
text = ""
|
||||||
data_json = json.loads(chunk)
|
is_finished = False
|
||||||
|
finish_reason = ""
|
||||||
try:
|
try:
|
||||||
text = data_json["generated_text"]
|
if "dolphin" in self.model:
|
||||||
|
chunk = self.process_chunk(chunk=chunk)
|
||||||
|
else:
|
||||||
|
data_json = json.loads(chunk)
|
||||||
|
chunk = data_json["generated_text"]
|
||||||
|
text = chunk
|
||||||
|
if "[DONE]" in text:
|
||||||
|
text = text.replace("[DONE]", "")
|
||||||
is_finished = True
|
is_finished = True
|
||||||
finish_reason = "stop"
|
finish_reason = "stop"
|
||||||
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
|
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
|
||||||
except:
|
except Exception as e:
|
||||||
raise ValueError(f"Unable to parse response. Original response: {chunk}")
|
raise ValueError(f"Unable to parse response. Original response: {chunk}")
|
||||||
|
|
||||||
def handle_aleph_alpha_chunk(self, chunk):
|
def handle_aleph_alpha_chunk(self, chunk):
|
||||||
|
@ -5025,9 +5051,8 @@ class CustomStreamWrapper:
|
||||||
completion_obj["content"] = response_obj["text"]
|
completion_obj["content"] = response_obj["text"]
|
||||||
if response_obj["is_finished"]:
|
if response_obj["is_finished"]:
|
||||||
model_response.choices[0].finish_reason = response_obj["finish_reason"]
|
model_response.choices[0].finish_reason = response_obj["finish_reason"]
|
||||||
elif self.model in litellm.nlp_cloud_models or self.custom_llm_provider == "nlp_cloud":
|
elif self.custom_llm_provider == "nlp_cloud":
|
||||||
try:
|
try:
|
||||||
|
|
||||||
response_obj = self.handle_nlp_cloud_chunk(chunk)
|
response_obj = self.handle_nlp_cloud_chunk(chunk)
|
||||||
completion_obj["content"] = response_obj["text"]
|
completion_obj["content"] = response_obj["text"]
|
||||||
if response_obj["is_finished"]:
|
if response_obj["is_finished"]:
|
||||||
|
@ -5119,9 +5144,9 @@ class CustomStreamWrapper:
|
||||||
model_response.model = self.model
|
model_response.model = self.model
|
||||||
print_verbose(f"model_response: {model_response}; completion_obj: {completion_obj}")
|
print_verbose(f"model_response: {model_response}; completion_obj: {completion_obj}")
|
||||||
print_verbose(f"model_response finish reason 3: {model_response.choices[0].finish_reason}")
|
print_verbose(f"model_response finish reason 3: {model_response.choices[0].finish_reason}")
|
||||||
|
|
||||||
if len(completion_obj["content"]) > 0: # cannot set content of an OpenAI Object to be an empty string
|
if len(completion_obj["content"]) > 0: # cannot set content of an OpenAI Object to be an empty string
|
||||||
hold, model_response_str = self.check_special_tokens(chunk=completion_obj["content"], finish_reason=model_response.choices[0].finish_reason)
|
hold, model_response_str = self.check_special_tokens(chunk=completion_obj["content"], finish_reason=model_response.choices[0].finish_reason)
|
||||||
|
print_verbose(f"hold - {hold}, model_response_str - {model_response_str}")
|
||||||
if hold is False:
|
if hold is False:
|
||||||
completion_obj["content"] = model_response_str
|
completion_obj["content"] = model_response_str
|
||||||
if self.sent_first_chunk == False:
|
if self.sent_first_chunk == False:
|
||||||
|
@ -5130,6 +5155,7 @@ class CustomStreamWrapper:
|
||||||
model_response.choices[0].delta = Delta(**completion_obj)
|
model_response.choices[0].delta = Delta(**completion_obj)
|
||||||
# LOGGING
|
# LOGGING
|
||||||
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
|
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
|
||||||
|
print_verbose(f"model_response: {model_response}")
|
||||||
return model_response
|
return model_response
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
@ -5174,7 +5200,7 @@ class CustomStreamWrapper:
|
||||||
chunk = next(self.completion_stream)
|
chunk = next(self.completion_stream)
|
||||||
|
|
||||||
print_verbose(f"chunk in __next__: {chunk}")
|
print_verbose(f"chunk in __next__: {chunk}")
|
||||||
if chunk is not None:
|
if chunk is not None and chunk != b'':
|
||||||
response = self.chunk_creator(chunk=chunk)
|
response = self.chunk_creator(chunk=chunk)
|
||||||
print_verbose(f"response in __next__: {response}")
|
print_verbose(f"response in __next__: {response}")
|
||||||
if response is not None:
|
if response is not None:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue