diff --git a/llama_stack/providers/utils/kvstore/mongodb/mongodb.py b/llama_stack/providers/utils/kvstore/mongodb/mongodb.py index 965b4e213..bdc4e7dff 100644 --- a/llama_stack/providers/utils/kvstore/mongodb/mongodb.py +++ b/llama_stack/providers/utils/kvstore/mongodb/mongodb.py @@ -8,9 +8,10 @@ import logging from datetime import datetime from typing import List, Optional -from pymongo import MongoClient +from pymongo import AsyncMongoClient -from llama_stack.providers.utils.kvstore import KVStore, MongoDBKVStoreConfig +from llama_stack.providers.utils.kvstore import KVStore +from ..config import MongoDBKVStoreConfig log = logging.getLogger(__name__) @@ -30,7 +31,7 @@ class MongoDBKVStoreImpl(KVStore): "password": self.config.password, } conn_creds = {k: v for k, v in conn_creds.items() if v is not None} - self.conn = MongoClient(**conn_creds) + self.conn = AsyncMongoClient(**conn_creds) self.collection = self.conn[self.config.db][self.config.collection_name] except Exception as e: log.exception("Could not connect to MongoDB database server") @@ -44,17 +45,17 @@ class MongoDBKVStoreImpl(KVStore): async def set(self, key: str, value: str, expiration: Optional[datetime] = None) -> None: key = self._namespaced_key(key) update_query = {"$set": {"value": value, "expiration": expiration}} - self.collection.update_one({"key": key}, update_query, upsert=True) + await self.collection.update_one({"key": key}, update_query, upsert=True) async def get(self, key: str) -> Optional[str]: key = self._namespaced_key(key) query = {"key": key} - result = self.collection.find_one(query, {"value": 1, "_id": 0}) + result = await self.collection.find_one(query, {"value": 1, "_id": 0}) return result["value"] if result else None async def delete(self, key: str) -> None: key = self._namespaced_key(key) - self.collection.delete_one({"key": key}) + await self.collection.delete_one({"key": key}) async def range(self, start_key: str, end_key: str) -> List[str]: start_key = self._namespaced_key(start_key) @@ -63,4 +64,7 @@ class MongoDBKVStoreImpl(KVStore): "key": {"$gte": start_key, "$lt": end_key}, } cursor = self.collection.find(query, {"value": 1, "_id": 0}).sort("key", 1) - return [doc["value"] for doc in cursor] + result = [] + async for doc in cursor: + result.append(doc["value"]) + return result