From dcd8cfe0f3bc951328ee0c2165ec29e6d433759f Mon Sep 17 00:00:00 2001 From: Dinesh Yeduguru Date: Tue, 5 Nov 2024 11:42:21 -0800 Subject: [PATCH] add postgres kvstoreimpl (#374) * add postgres kvstoreimpl * make table name configurable * add validator for table name * linter fix --------- Co-authored-by: Dinesh Yeduguru --- llama_stack/providers/utils/kvstore/config.py | 19 +++- .../providers/utils/kvstore/kvstore.py | 4 +- .../utils/kvstore/postgres/__init__.py | 7 ++ .../utils/kvstore/postgres/postgres.py | 103 ++++++++++++++++++ 4 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 llama_stack/providers/utils/kvstore/postgres/__init__.py create mode 100644 llama_stack/providers/utils/kvstore/postgres/postgres.py diff --git a/llama_stack/providers/utils/kvstore/config.py b/llama_stack/providers/utils/kvstore/config.py index c84212eed..b2f56b885 100644 --- a/llama_stack/providers/utils/kvstore/config.py +++ b/llama_stack/providers/utils/kvstore/config.py @@ -4,10 +4,11 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +import re from enum import Enum from typing import Literal, Optional, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator from typing_extensions import Annotated from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR @@ -51,6 +52,22 @@ class PostgresKVStoreConfig(CommonConfig): db: str = "llamastack" user: str password: Optional[str] = None + table_name: str = "llamastack_kvstore" + + @field_validator("table_name") + def validate_table_name(self, v: str) -> str: + # PostgreSQL identifiers rules: + # - Must start with a letter or underscore + # - Can contain letters, numbers, and underscores + # - Maximum length is 63 bytes + pattern = r"^[a-zA-Z_][a-zA-Z0-9_]*$" + if not re.match(pattern, v): + raise ValueError( + "Invalid table name. Must start with letter or underscore and contain only letters, numbers, and underscores" + ) + if len(v) > 63: + raise ValueError("Table name must be less than 63 characters") + return v KVStoreConfig = Annotated[ diff --git a/llama_stack/providers/utils/kvstore/kvstore.py b/llama_stack/providers/utils/kvstore/kvstore.py index a3cabc206..469f400d0 100644 --- a/llama_stack/providers/utils/kvstore/kvstore.py +++ b/llama_stack/providers/utils/kvstore/kvstore.py @@ -43,7 +43,9 @@ async def kvstore_impl(config: KVStoreConfig) -> KVStore: impl = SqliteKVStoreImpl(config) elif config.type == KVStoreType.postgres.value: - raise NotImplementedError() + from .postgres import PostgresKVStoreImpl + + impl = PostgresKVStoreImpl(config) else: raise ValueError(f"Unknown kvstore type {config.type}") diff --git a/llama_stack/providers/utils/kvstore/postgres/__init__.py b/llama_stack/providers/utils/kvstore/postgres/__init__.py new file mode 100644 index 000000000..efbf6299d --- /dev/null +++ b/llama_stack/providers/utils/kvstore/postgres/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .postgres import PostgresKVStoreImpl # noqa: F401 F403 diff --git a/llama_stack/providers/utils/kvstore/postgres/postgres.py b/llama_stack/providers/utils/kvstore/postgres/postgres.py new file mode 100644 index 000000000..23ceb58e4 --- /dev/null +++ b/llama_stack/providers/utils/kvstore/postgres/postgres.py @@ -0,0 +1,103 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from datetime import datetime +from typing import List, Optional + +import psycopg2 +from psycopg2.extras import DictCursor + +from ..api import KVStore +from ..config import PostgresKVStoreConfig + + +class PostgresKVStoreImpl(KVStore): + def __init__(self, config: PostgresKVStoreConfig): + self.config = config + self.conn = None + self.cursor = None + + async def initialize(self) -> None: + try: + self.conn = psycopg2.connect( + host=self.config.host, + port=self.config.port, + database=self.config.db, + user=self.config.user, + password=self.config.password, + ) + self.conn.autocommit = True + self.cursor = self.conn.cursor(cursor_factory=DictCursor) + + # Create table if it doesn't exist + self.cursor.execute( + f""" + CREATE TABLE IF NOT EXISTS {self.config.table_name} ( + key TEXT PRIMARY KEY, + value TEXT, + expiration TIMESTAMP + ) + """ + ) + except Exception as e: + import traceback + + traceback.print_exc() + raise RuntimeError("Could not connect to PostgreSQL database server") from e + + def _namespaced_key(self, key: str) -> str: + if not self.config.namespace: + return key + return f"{self.config.namespace}:{key}" + + async def set( + self, key: str, value: str, expiration: Optional[datetime] = None + ) -> None: + key = self._namespaced_key(key) + self.cursor.execute( + f""" + INSERT INTO {self.config.table_name} (key, value, expiration) + VALUES (%s, %s, %s) + ON CONFLICT (key) DO UPDATE + SET value = EXCLUDED.value, expiration = EXCLUDED.expiration + """, + (key, value, expiration), + ) + + async def get(self, key: str) -> Optional[str]: + key = self._namespaced_key(key) + self.cursor.execute( + f""" + SELECT value FROM {self.config.table_name} + WHERE key = %s + AND (expiration IS NULL OR expiration > NOW()) + """, + (key,), + ) + result = self.cursor.fetchone() + return result[0] if result else None + + async def delete(self, key: str) -> None: + key = self._namespaced_key(key) + self.cursor.execute( + f"DELETE FROM {self.config.table_name} WHERE key = %s", + (key,), + ) + + async def range(self, start_key: str, end_key: str) -> List[str]: + start_key = self._namespaced_key(start_key) + end_key = self._namespaced_key(end_key) + + self.cursor.execute( + f""" + SELECT value FROM {self.config.table_name} + WHERE key >= %s AND key < %s + AND (expiration IS NULL OR expiration > NOW()) + ORDER BY key + """, + (start_key, end_key), + ) + return [row[0] for row in self.cursor.fetchall()]