forked from phoenix-oss/llama-stack-mirror
fix: Call pandas.read_* in a seperate thread (#1698)
These block on io reads which in turn block the server. Move them to their own thread. Closes: #1697 # What does this PR do? To avoid blocking the main eventloop, updates datasetio/localfs to load data in a seperate thread Signed-off-by: Derek Higgins <derekh@redhat.com>
This commit is contained in:
parent
65ca85ba6b
commit
6949bd1999
2 changed files with 11 additions and 7 deletions
|
@ -35,12 +35,12 @@ class PandasDataframeDataset:
|
||||||
else:
|
else:
|
||||||
return self.df.iloc[idx].to_dict()
|
return self.df.iloc[idx].to_dict()
|
||||||
|
|
||||||
def load(self) -> None:
|
async def load(self) -> None:
|
||||||
if self.df is not None:
|
if self.df is not None:
|
||||||
return
|
return
|
||||||
|
|
||||||
if self.dataset_def.source.type == "uri":
|
if self.dataset_def.source.type == "uri":
|
||||||
self.df = get_dataframe_from_uri(self.dataset_def.source.uri)
|
self.df = await get_dataframe_from_uri(self.dataset_def.source.uri)
|
||||||
elif self.dataset_def.source.type == "rows":
|
elif self.dataset_def.source.type == "rows":
|
||||||
self.df = pandas.DataFrame(self.dataset_def.source.rows)
|
self.df = pandas.DataFrame(self.dataset_def.source.rows)
|
||||||
else:
|
else:
|
||||||
|
@ -95,7 +95,7 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
|
||||||
) -> IterrowsResponse:
|
) -> IterrowsResponse:
|
||||||
dataset_def = self.dataset_infos[dataset_id]
|
dataset_def = self.dataset_infos[dataset_id]
|
||||||
dataset_impl = PandasDataframeDataset(dataset_def)
|
dataset_impl = PandasDataframeDataset(dataset_def)
|
||||||
dataset_impl.load()
|
await dataset_impl.load()
|
||||||
|
|
||||||
start_index = start_index or 0
|
start_index = start_index or 0
|
||||||
|
|
||||||
|
@ -114,7 +114,7 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
|
||||||
async def append_rows(self, dataset_id: str, rows: List[Dict[str, Any]]) -> None:
|
async def append_rows(self, dataset_id: str, rows: List[Dict[str, Any]]) -> None:
|
||||||
dataset_def = self.dataset_infos[dataset_id]
|
dataset_def = self.dataset_infos[dataset_id]
|
||||||
dataset_impl = PandasDataframeDataset(dataset_def)
|
dataset_impl = PandasDataframeDataset(dataset_def)
|
||||||
dataset_impl.load()
|
await dataset_impl.load()
|
||||||
|
|
||||||
new_rows_df = pandas.DataFrame(rows)
|
new_rows_df = pandas.DataFrame(rows)
|
||||||
dataset_impl.df = pandas.concat([dataset_impl.df, new_rows_df], ignore_index=True)
|
dataset_impl.df = pandas.concat([dataset_impl.df, new_rows_df], ignore_index=True)
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
# This source code is licensed under the terms described in the LICENSE file in
|
# This source code is licensed under the terms described in the LICENSE file in
|
||||||
# the root directory of this source tree.
|
# the root directory of this source tree.
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import base64
|
import base64
|
||||||
import io
|
import io
|
||||||
from urllib.parse import unquote
|
from urllib.parse import unquote
|
||||||
|
@ -13,12 +14,15 @@ import pandas
|
||||||
from llama_stack.providers.utils.memory.vector_store import parse_data_url
|
from llama_stack.providers.utils.memory.vector_store import parse_data_url
|
||||||
|
|
||||||
|
|
||||||
def get_dataframe_from_uri(uri: str):
|
async def get_dataframe_from_uri(uri: str):
|
||||||
df = None
|
df = None
|
||||||
if uri.endswith(".csv"):
|
if uri.endswith(".csv"):
|
||||||
df = pandas.read_csv(uri)
|
# Moving to its own thread to avoid io from blocking the eventloop
|
||||||
|
# This isn't ideal as it moves more then just the IO to a new thread
|
||||||
|
# but it is as close as we can easly get
|
||||||
|
df = await asyncio.to_thread(pandas.read_csv, uri)
|
||||||
elif uri.endswith(".xlsx"):
|
elif uri.endswith(".xlsx"):
|
||||||
df = pandas.read_excel(uri)
|
df = await asyncio.to_thread(pandas.read_excel, uri)
|
||||||
elif uri.startswith("data:"):
|
elif uri.startswith("data:"):
|
||||||
parts = parse_data_url(uri)
|
parts = parse_data_url(uri)
|
||||||
data = parts["data"]
|
data = parts["data"]
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue