From 6949bd19998d761003958486e38a2bd53c231d58 Mon Sep 17 00:00:00 2001 From: Derek Higgins Date: Wed, 19 Mar 2025 17:46:37 +0000 Subject: [PATCH] fix: Call pandas.read_* in a seperate thread (#1698) These block on io reads which in turn block the server. Move them to their own thread. Closes: #1697 # What does this PR do? To avoid blocking the main eventloop, updates datasetio/localfs to load data in a seperate thread Signed-off-by: Derek Higgins --- .../providers/inline/datasetio/localfs/datasetio.py | 8 ++++---- llama_stack/providers/utils/datasetio/url_utils.py | 10 +++++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/llama_stack/providers/inline/datasetio/localfs/datasetio.py b/llama_stack/providers/inline/datasetio/localfs/datasetio.py index cf4bf7fec..f489739bf 100644 --- a/llama_stack/providers/inline/datasetio/localfs/datasetio.py +++ b/llama_stack/providers/inline/datasetio/localfs/datasetio.py @@ -35,12 +35,12 @@ class PandasDataframeDataset: else: return self.df.iloc[idx].to_dict() - def load(self) -> None: + async def load(self) -> None: if self.df is not None: return if self.dataset_def.source.type == "uri": - self.df = get_dataframe_from_uri(self.dataset_def.source.uri) + self.df = await get_dataframe_from_uri(self.dataset_def.source.uri) elif self.dataset_def.source.type == "rows": self.df = pandas.DataFrame(self.dataset_def.source.rows) else: @@ -95,7 +95,7 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate): ) -> IterrowsResponse: dataset_def = self.dataset_infos[dataset_id] dataset_impl = PandasDataframeDataset(dataset_def) - dataset_impl.load() + await dataset_impl.load() start_index = start_index or 0 @@ -114,7 +114,7 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate): async def append_rows(self, dataset_id: str, rows: List[Dict[str, Any]]) -> None: dataset_def = self.dataset_infos[dataset_id] dataset_impl = PandasDataframeDataset(dataset_def) - dataset_impl.load() + await dataset_impl.load() new_rows_df = pandas.DataFrame(rows) dataset_impl.df = pandas.concat([dataset_impl.df, new_rows_df], ignore_index=True) diff --git a/llama_stack/providers/utils/datasetio/url_utils.py b/llama_stack/providers/utils/datasetio/url_utils.py index 6a544ea49..386ee736d 100644 --- a/llama_stack/providers/utils/datasetio/url_utils.py +++ b/llama_stack/providers/utils/datasetio/url_utils.py @@ -4,6 +4,7 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +import asyncio import base64 import io from urllib.parse import unquote @@ -13,12 +14,15 @@ import pandas from llama_stack.providers.utils.memory.vector_store import parse_data_url -def get_dataframe_from_uri(uri: str): +async def get_dataframe_from_uri(uri: str): df = None if uri.endswith(".csv"): - df = pandas.read_csv(uri) + # Moving to its own thread to avoid io from blocking the eventloop + # This isn't ideal as it moves more then just the IO to a new thread + # but it is as close as we can easly get + df = await asyncio.to_thread(pandas.read_csv, uri) elif uri.endswith(".xlsx"): - df = pandas.read_excel(uri) + df = await asyncio.to_thread(pandas.read_excel, uri) elif uri.startswith("data:"): parts = parse_data_url(uri) data = parts["data"]