From a9c5d3cd3dd5f4db0d97da2b8b4552bac946999b Mon Sep 17 00:00:00 2001
From: Sarthak Deshpande <60317842+cheesecake100201@users.noreply.github.com>
Date: Mon, 10 Mar 2025 05:29:24 +0530
Subject: [PATCH] chore: made inbuilt tools blocking calls into async non
blocking calls (#1509)
# What does this PR do?
This PR converts blocking calls for in built tools like wolfram, brave,
tavily and bing into non blocking async calls
[//]: # (If resolving an issue, uncomment and update the line below)
[//]: # (Closes #[issue-number])
## Test Plan
[Describe the tests you ran to verify your changes with result
summaries. *Provide clear instructions so the plan can be easily
re-executed.*]
pytest -s -v tool_runtime/test_builtin_tools.py --stack-config=together
--text-model=meta-llama/Llama-3.1-8B-Instruct
Used the command above to get the below results
[//]: # (## Documentation)
---------
Co-authored-by: sarthakdeshpande
---
.../providers/inline/vector_io/faiss/faiss.py | 3 ++-
.../tool_runtime/bing_search/bing_search.py | 17 +++++++++--------
.../tool_runtime/brave_search/brave_search.py | 13 +++++++++----
.../tavily_search/tavily_search.py | 14 ++++++++------
.../wolfram_alpha/wolfram_alpha.py | 12 +++++-------
.../utils/kvstore/mongodb/mongodb.py | 19 ++++++++++++-------
6 files changed, 45 insertions(+), 33 deletions(-)
diff --git a/llama_stack/providers/inline/vector_io/faiss/faiss.py b/llama_stack/providers/inline/vector_io/faiss/faiss.py
index 410d8bd8b..0c8718cb8 100644
--- a/llama_stack/providers/inline/vector_io/faiss/faiss.py
+++ b/llama_stack/providers/inline/vector_io/faiss/faiss.py
@@ -4,6 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
+import asyncio
import base64
import io
import json
@@ -99,7 +100,7 @@ class FaissIndex(EmbeddingIndex):
await self._save_index()
async def query(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
- distances, indices = self.index.search(embedding.reshape(1, -1).astype(np.float32), k)
+ distances, indices = await asyncio.to_thread(self.index.search, embedding.reshape(1, -1).astype(np.float32), k)
chunks = []
scores = []
diff --git a/llama_stack/providers/remote/tool_runtime/bing_search/bing_search.py b/llama_stack/providers/remote/tool_runtime/bing_search/bing_search.py
index 826d21dd9..f494a7fbb 100644
--- a/llama_stack/providers/remote/tool_runtime/bing_search/bing_search.py
+++ b/llama_stack/providers/remote/tool_runtime/bing_search/bing_search.py
@@ -7,7 +7,7 @@
import json
from typing import Any, Dict, List, Optional
-import requests
+import httpx
from llama_stack.apis.common.content_types import URL
from llama_stack.apis.tools import (
@@ -31,7 +31,7 @@ class BingSearchToolRuntimeImpl(ToolsProtocolPrivate, ToolRuntime, NeedsRequestP
async def initialize(self):
pass
- async def register_tool(self, tool: Tool):
+ async def register_tool(self, tool: Tool) -> None:
pass
async def unregister_tool(self, tool_id: str) -> None:
@@ -77,12 +77,13 @@ class BingSearchToolRuntimeImpl(ToolsProtocolPrivate, ToolRuntime, NeedsRequestP
"q": kwargs["query"],
}
- response = requests.get(
- url=self.url,
- params=params,
- headers=headers,
- )
- response.raise_for_status()
+ async with httpx.AsyncClient() as client:
+ response = await client.get(
+ url=self.url,
+ params=params,
+ headers=headers,
+ )
+ response.raise_for_status()
return ToolInvocationResult(content=json.dumps(self._clean_response(response.json())))
diff --git a/llama_stack/providers/remote/tool_runtime/brave_search/brave_search.py b/llama_stack/providers/remote/tool_runtime/brave_search/brave_search.py
index 8ef9f5705..78b47eb56 100644
--- a/llama_stack/providers/remote/tool_runtime/brave_search/brave_search.py
+++ b/llama_stack/providers/remote/tool_runtime/brave_search/brave_search.py
@@ -6,7 +6,7 @@
from typing import Any, Dict, List, Optional
-import requests
+import httpx
from llama_stack.apis.common.content_types import URL
from llama_stack.apis.tools import (
@@ -30,7 +30,7 @@ class BraveSearchToolRuntimeImpl(ToolsProtocolPrivate, ToolRuntime, NeedsRequest
async def initialize(self):
pass
- async def register_tool(self, tool: Tool):
+ async def register_tool(self, tool: Tool) -> None:
pass
async def unregister_tool(self, tool_id: str) -> None:
@@ -74,8 +74,13 @@ class BraveSearchToolRuntimeImpl(ToolsProtocolPrivate, ToolRuntime, NeedsRequest
"Accept": "application/json",
}
payload = {"q": kwargs["query"]}
- response = requests.get(url=url, params=payload, headers=headers)
- response.raise_for_status()
+ async with httpx.AsyncClient() as client:
+ response = await client.get(
+ url=url,
+ params=payload,
+ headers=headers,
+ )
+ response.raise_for_status()
results = self._clean_brave_response(response.json())
content_items = "\n".join([str(result) for result in results])
return ToolInvocationResult(
diff --git a/llama_stack/providers/remote/tool_runtime/tavily_search/tavily_search.py b/llama_stack/providers/remote/tool_runtime/tavily_search/tavily_search.py
index 57749894a..5b23d94d3 100644
--- a/llama_stack/providers/remote/tool_runtime/tavily_search/tavily_search.py
+++ b/llama_stack/providers/remote/tool_runtime/tavily_search/tavily_search.py
@@ -7,7 +7,7 @@
import json
from typing import Any, Dict, List, Optional
-import requests
+import httpx
from llama_stack.apis.common.content_types import URL
from llama_stack.apis.tools import (
@@ -30,7 +30,7 @@ class TavilySearchToolRuntimeImpl(ToolsProtocolPrivate, ToolRuntime, NeedsReques
async def initialize(self):
pass
- async def register_tool(self, tool: Tool):
+ async def register_tool(self, tool: Tool) -> None:
pass
async def unregister_tool(self, tool_id: str) -> None:
@@ -66,10 +66,12 @@ class TavilySearchToolRuntimeImpl(ToolsProtocolPrivate, ToolRuntime, NeedsReques
async def invoke_tool(self, tool_name: str, kwargs: Dict[str, Any]) -> ToolInvocationResult:
api_key = self._get_api_key()
- response = requests.post(
- "https://api.tavily.com/search",
- json={"api_key": api_key, "query": kwargs["query"]},
- )
+ async with httpx.AsyncClient() as client:
+ response = await client.post(
+ "https://api.tavily.com/search",
+ json={"api_key": api_key, "query": kwargs["query"]},
+ )
+ response.raise_for_status()
return ToolInvocationResult(content=json.dumps(self._clean_tavily_response(response.json())))
diff --git a/llama_stack/providers/remote/tool_runtime/wolfram_alpha/wolfram_alpha.py b/llama_stack/providers/remote/tool_runtime/wolfram_alpha/wolfram_alpha.py
index 08529384a..8489fa7d8 100644
--- a/llama_stack/providers/remote/tool_runtime/wolfram_alpha/wolfram_alpha.py
+++ b/llama_stack/providers/remote/tool_runtime/wolfram_alpha/wolfram_alpha.py
@@ -7,7 +7,7 @@
import json
from typing import Any, Dict, List, Optional
-import requests
+import httpx
from llama_stack.apis.common.content_types import URL
from llama_stack.apis.tools import (
@@ -31,7 +31,7 @@ class WolframAlphaToolRuntimeImpl(ToolsProtocolPrivate, ToolRuntime, NeedsReques
async def initialize(self):
pass
- async def register_tool(self, tool: Tool):
+ async def register_tool(self, tool: Tool) -> None:
pass
async def unregister_tool(self, tool_id: str) -> None:
@@ -73,11 +73,9 @@ class WolframAlphaToolRuntimeImpl(ToolsProtocolPrivate, ToolRuntime, NeedsReques
"format": "plaintext",
"output": "json",
}
- response = requests.get(
- self.url,
- params=params,
- )
-
+ async with httpx.AsyncClient() as client:
+ response = await client.get(params=params, url=self.url)
+ response.raise_for_status()
return ToolInvocationResult(content=json.dumps(self._clean_wolfram_alpha_response(response.json())))
def _clean_wolfram_alpha_response(self, wa_response):
diff --git a/llama_stack/providers/utils/kvstore/mongodb/mongodb.py b/llama_stack/providers/utils/kvstore/mongodb/mongodb.py
index 965b4e213..c1581dc8d 100644
--- a/llama_stack/providers/utils/kvstore/mongodb/mongodb.py
+++ b/llama_stack/providers/utils/kvstore/mongodb/mongodb.py
@@ -8,9 +8,11 @@ import logging
from datetime import datetime
from typing import List, Optional
-from pymongo import MongoClient
+from pymongo import AsyncMongoClient
-from llama_stack.providers.utils.kvstore import KVStore, MongoDBKVStoreConfig
+from llama_stack.providers.utils.kvstore import KVStore
+
+from ..config import MongoDBKVStoreConfig
log = logging.getLogger(__name__)
@@ -30,7 +32,7 @@ class MongoDBKVStoreImpl(KVStore):
"password": self.config.password,
}
conn_creds = {k: v for k, v in conn_creds.items() if v is not None}
- self.conn = MongoClient(**conn_creds)
+ self.conn = AsyncMongoClient(**conn_creds)
self.collection = self.conn[self.config.db][self.config.collection_name]
except Exception as e:
log.exception("Could not connect to MongoDB database server")
@@ -44,17 +46,17 @@ class MongoDBKVStoreImpl(KVStore):
async def set(self, key: str, value: str, expiration: Optional[datetime] = None) -> None:
key = self._namespaced_key(key)
update_query = {"$set": {"value": value, "expiration": expiration}}
- self.collection.update_one({"key": key}, update_query, upsert=True)
+ await self.collection.update_one({"key": key}, update_query, upsert=True)
async def get(self, key: str) -> Optional[str]:
key = self._namespaced_key(key)
query = {"key": key}
- result = self.collection.find_one(query, {"value": 1, "_id": 0})
+ result = await self.collection.find_one(query, {"value": 1, "_id": 0})
return result["value"] if result else None
async def delete(self, key: str) -> None:
key = self._namespaced_key(key)
- self.collection.delete_one({"key": key})
+ await self.collection.delete_one({"key": key})
async def range(self, start_key: str, end_key: str) -> List[str]:
start_key = self._namespaced_key(start_key)
@@ -63,4 +65,7 @@ class MongoDBKVStoreImpl(KVStore):
"key": {"$gte": start_key, "$lt": end_key},
}
cursor = self.collection.find(query, {"value": 1, "_id": 0}).sort("key", 1)
- return [doc["value"] for doc in cursor]
+ result = []
+ async for doc in cursor:
+ result.append(doc["value"])
+ return result