From 6d9f7b8f9d4efb1d238367ea1d4b5a8315aec8c8 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Sat, 25 Nov 2023 13:45:15 -0800 Subject: [PATCH] fix: fix nlp cloud streaming --- litellm/llms/nlp_cloud.py | 32 +++++++++++++++++++++-- litellm/tests/test_streaming.py | 33 +++++++++++++++++++++-- litellm/utils.py | 46 ++++++++++++++++++++++++++------- 3 files changed, 97 insertions(+), 14 deletions(-) diff --git a/litellm/llms/nlp_cloud.py b/litellm/llms/nlp_cloud.py index a5f70c59b3..8d09b85ea9 100644 --- a/litellm/llms/nlp_cloud.py +++ b/litellm/llms/nlp_cloud.py @@ -131,14 +131,14 @@ def completion( logging_obj.pre_call( input=text, api_key=api_key, - additional_args={"complete_input_dict": data}, + additional_args={"complete_input_dict": data, "headers": headers, "api_base": completion_url}, ) ## COMPLETION CALL response = requests.post( completion_url, headers=headers, data=json.dumps(data), stream=optional_params["stream"] if "stream" in optional_params else False ) if "stream" in optional_params and optional_params["stream"] == True: - return response.iter_lines() + return clean_and_iterate_chunks(response) else: ## LOGGING logging_obj.post_call( @@ -179,6 +179,34 @@ def completion( model_response.usage = usage return model_response + +# def clean_and_iterate_chunks(response): +# def process_chunk(chunk): +# print(f"received chunk: {chunk}") +# cleaned_chunk = chunk.decode("utf-8") +# # Perform further processing based on your needs +# return cleaned_chunk + +# for line in response.iter_lines(): +# if line: +# yield process_chunk(line) +def clean_and_iterate_chunks(response): + buffer = b'' + + for chunk in response.iter_content(chunk_size=1024): + if not chunk: + break + + buffer += chunk + while b'\x00' in buffer: + buffer = buffer.replace(b'\x00', b'') + yield buffer.decode('utf-8') + buffer = b'' + + # No more data expected, yield any remaining data in the buffer + if buffer: + yield buffer.decode('utf-8') + def embedding(): # logic for parsing in - calling - parsing out model embedding calls pass diff --git a/litellm/tests/test_streaming.py b/litellm/tests/test_streaming.py index 137f38e73c..e8bdeea3da 100644 --- a/litellm/tests/test_streaming.py +++ b/litellm/tests/test_streaming.py @@ -318,7 +318,36 @@ def test_completion_deep_infra_stream(): print(f"completion_response: {complete_response}") except Exception as e: pytest.fail(f"Error occurred: {e}") -test_completion_deep_infra_stream() +# test_completion_deep_infra_stream() + +def test_completion_nlp_cloud_stream(): + try: + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + { + "role": "user", + "content": "how does a court case get to the Supreme Court?", + }, + ] + print("testing nlp cloud streaming") + response = completion( + model="nlp_cloud/finetuned-llama-2-70b", messages=messages, stream=True, max_tokens=20 + ) + + complete_response = "" + # Add any assertions here to check the response + for idx, chunk in enumerate(response): + chunk, finished = streaming_format_tests(idx, chunk) + complete_response += chunk + if finished: + break + if complete_response.strip() == "": + raise Exception("Empty response received") + print(f"completion_response: {complete_response}") + except Exception as e: + print(f"Error occurred: {e}") + pytest.fail(f"Error occurred: {e}") +test_completion_nlp_cloud_stream() def test_completion_claude_stream_bad_key(): try: @@ -652,7 +681,7 @@ def hf_test_completion_tgi_stream(): print(f"completion_response: {complete_response}") except Exception as e: pytest.fail(f"Error occurred: {e}") -hf_test_completion_tgi_stream() +# hf_test_completion_tgi_stream() # def test_completion_aleph_alpha(): # try: diff --git a/litellm/utils.py b/litellm/utils.py index 3466e236a4..c036ac90bb 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -4642,6 +4642,7 @@ class CustomStreamWrapper: self.sent_last_chunk = False self.special_tokens = ["<|assistant|>", "<|system|>", "<|user|>", "", ""] self.holding_chunk = "" + self.complete_response = "" if self.logging_obj: # Log the type of the received item self.logging_obj.post_call(str(type(completion_stream))) @@ -4652,6 +4653,23 @@ class CustomStreamWrapper: def __aiter__(self): return self + def process_chunk(self, chunk: str): + """ + NLP Cloud streaming returns the entire response, for each chunk. Process this, to only return the delta. + """ + try: + chunk = chunk.strip() + self.complete_response = self.complete_response.strip() + + if chunk.startswith(self.complete_response): + # Remove last_sent_chunk only if it appears at the start of the new chunk + chunk = chunk[len(self.complete_response):] + + self.complete_response += chunk + return chunk + except Exception as e: + raise e + def logging(self, text): if self.logging_obj: self.logging_obj.post_call(text) @@ -4774,14 +4792,22 @@ class CustomStreamWrapper: raise ValueError(f"Unable to parse response. Original response: {chunk}") def handle_nlp_cloud_chunk(self, chunk): - chunk = chunk.decode("utf-8") - data_json = json.loads(chunk) + text = "" + is_finished = False + finish_reason = "" try: - text = data_json["generated_text"] - is_finished = True - finish_reason = "stop" + if "dolphin" in self.model: + chunk = self.process_chunk(chunk=chunk) + else: + data_json = json.loads(chunk) + chunk = data_json["generated_text"] + text = chunk + if "[DONE]" in text: + text = text.replace("[DONE]", "") + is_finished = True + finish_reason = "stop" return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason} - except: + except Exception as e: raise ValueError(f"Unable to parse response. Original response: {chunk}") def handle_aleph_alpha_chunk(self, chunk): @@ -5025,9 +5051,8 @@ class CustomStreamWrapper: completion_obj["content"] = response_obj["text"] if response_obj["is_finished"]: model_response.choices[0].finish_reason = response_obj["finish_reason"] - elif self.model in litellm.nlp_cloud_models or self.custom_llm_provider == "nlp_cloud": + elif self.custom_llm_provider == "nlp_cloud": try: - response_obj = self.handle_nlp_cloud_chunk(chunk) completion_obj["content"] = response_obj["text"] if response_obj["is_finished"]: @@ -5119,9 +5144,9 @@ class CustomStreamWrapper: model_response.model = self.model print_verbose(f"model_response: {model_response}; completion_obj: {completion_obj}") print_verbose(f"model_response finish reason 3: {model_response.choices[0].finish_reason}") - if len(completion_obj["content"]) > 0: # cannot set content of an OpenAI Object to be an empty string hold, model_response_str = self.check_special_tokens(chunk=completion_obj["content"], finish_reason=model_response.choices[0].finish_reason) + print_verbose(f"hold - {hold}, model_response_str - {model_response_str}") if hold is False: completion_obj["content"] = model_response_str if self.sent_first_chunk == False: @@ -5130,6 +5155,7 @@ class CustomStreamWrapper: model_response.choices[0].delta = Delta(**completion_obj) # LOGGING threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start() + print_verbose(f"model_response: {model_response}") return model_response else: return @@ -5174,7 +5200,7 @@ class CustomStreamWrapper: chunk = next(self.completion_stream) print_verbose(f"chunk in __next__: {chunk}") - if chunk is not None: + if chunk is not None and chunk != b'': response = self.chunk_creator(chunk=chunk) print_verbose(f"response in __next__: {response}") if response is not None: