[Evals API][3/n] scoring_functions / scoring meta-reference implementations (#296)

* wip

* dataset validation

* test_scoring

* cleanup

* clean up test

* comments

* error checking

* dataset client

* test client:

* datasetio client

* clean up

* basic scoring function works

* scorer wip

* equality scorer

* score batch impl

* score batch

* update scoring test

* refactor

* validate scorer input

* address comments

* add all rows scores to ScoringResult

* bugfix

* scoring function def rename
This commit is contained in:
Xi Yan 2024-10-24 14:52:30 -07:00 committed by GitHub
parent e70420a06e
commit cb84034567
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 904 additions and 51 deletions

View file

@ -0,0 +1,103 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
import os
from pathlib import Path
from typing import Optional
import fire
import httpx
from termcolor import cprint
from llama_stack.apis.datasets import * # noqa: F403
from llama_stack.apis.datasetio import * # noqa: F403
from llama_stack.apis.common.type_system import * # noqa: F403
from llama_stack.apis.datasets.client import DatasetsClient
from llama_stack.providers.tests.datasetio.test_datasetio import data_url_from_file
class DatasetIOClient(DatasetIO):
def __init__(self, base_url: str):
self.base_url = base_url
async def initialize(self) -> None:
pass
async def shutdown(self) -> None:
pass
async def get_rows_paginated(
self,
dataset_id: str,
rows_in_page: int,
page_token: Optional[str] = None,
filter_condition: Optional[str] = None,
) -> PaginatedRowsResult:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/datasetio/get_rows_paginated",
params={
"dataset_id": dataset_id,
"rows_in_page": rows_in_page,
"page_token": page_token,
"filter_condition": filter_condition,
},
headers={"Content-Type": "application/json"},
timeout=60,
)
response.raise_for_status()
if not response.json():
return
return PaginatedRowsResult(**response.json())
async def run_main(host: str, port: int):
client = DatasetsClient(f"http://{host}:{port}")
# register dataset
test_file = (
Path(os.path.abspath(__file__)).parent.parent.parent
/ "providers/tests/datasetio/test_dataset.csv"
)
test_url = data_url_from_file(str(test_file))
response = await client.register_dataset(
DatasetDefWithProvider(
identifier="test-dataset",
provider_id="meta0",
url=URL(
uri=test_url,
),
dataset_schema={
"generated_answer": StringType(),
"expected_answer": StringType(),
"input_query": StringType(),
},
)
)
# list datasets
list_dataset = await client.list_datasets()
cprint(list_dataset, "blue")
# datsetio client to get the rows
datasetio_client = DatasetIOClient(f"http://{host}:{port}")
response = await datasetio_client.get_rows_paginated(
dataset_id="test-dataset",
rows_in_page=4,
page_token=None,
filter_condition=None,
)
cprint(f"Returned {len(response.rows)} rows \n {response}", "green")
def main(host: str, port: int):
asyncio.run(run_main(host, port))
if __name__ == "__main__":
fire.Fire(main)

View file

@ -29,7 +29,7 @@ class DatasetIO(Protocol):
# keeping for aligning with inference/safety, but this is not used
dataset_store: DatasetStore
@webmethod(route="/dataio/get_rows_paginated")
@webmethod(route="/datasetio/get_rows_paginated", method="GET")
async def get_rows_paginated(
self,
dataset_id: str,

View file

@ -0,0 +1,116 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
import json
import os
from pathlib import Path
from typing import Optional
import fire
import httpx
from termcolor import cprint
from .datasets import * # noqa: F403
from llama_stack.apis.datasets import * # noqa: F403
from llama_stack.apis.common.type_system import * # noqa: F403
from llama_stack.providers.tests.datasetio.test_datasetio import data_url_from_file
class DatasetsClient(Datasets):
def __init__(self, base_url: str):
self.base_url = base_url
async def initialize(self) -> None:
pass
async def shutdown(self) -> None:
pass
async def register_dataset(
self,
dataset_def: DatasetDefWithProvider,
) -> None:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/datasets/register",
json={
"dataset_def": json.loads(dataset_def.json()),
},
headers={"Content-Type": "application/json"},
timeout=60,
)
response.raise_for_status()
return
async def get_dataset(
self,
dataset_identifier: str,
) -> Optional[DatasetDefWithProvider]:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/datasets/get",
params={
"dataset_identifier": dataset_identifier,
},
headers={"Content-Type": "application/json"},
timeout=60,
)
response.raise_for_status()
if not response.json():
return
return DatasetDefWithProvider(**response.json())
async def list_datasets(self) -> List[DatasetDefWithProvider]:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/datasets/list",
headers={"Content-Type": "application/json"},
timeout=60,
)
response.raise_for_status()
if not response.json():
return
return [DatasetDefWithProvider(**x) for x in response.json()]
async def run_main(host: str, port: int):
client = DatasetsClient(f"http://{host}:{port}")
# register dataset
test_file = (
Path(os.path.abspath(__file__)).parent.parent.parent
/ "providers/tests/datasetio/test_dataset.csv"
)
test_url = data_url_from_file(str(test_file))
response = await client.register_dataset(
DatasetDefWithProvider(
identifier="test-dataset",
provider_id="meta0",
url=URL(
uri=test_url,
),
dataset_schema={
"generated_answer": StringType(),
"expected_answer": StringType(),
"input_query": StringType(),
},
)
)
# list datasets
list_dataset = await client.list_datasets()
cprint(list_dataset, "blue")
def main(host: str, port: int):
asyncio.run(run_main(host, port))
if __name__ == "__main__":
fire.Fire(main)

View file

@ -20,7 +20,7 @@ class DatasetDef(BaseModel):
identifier: str = Field(
description="A unique name for the dataset",
)
columns_schema: Dict[str, ParamType] = Field(
dataset_schema: Dict[str, ParamType] = Field(
description="The schema definition for this dataset",
)
url: URL

View file

@ -0,0 +1,132 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
import os
from pathlib import Path
import fire
import httpx
from termcolor import cprint
from llama_stack.apis.datasets import * # noqa: F403
from llama_stack.apis.scoring import * # noqa: F403
from llama_stack.apis.common.type_system import * # noqa: F403
from llama_stack.apis.datasetio.client import DatasetIOClient
from llama_stack.apis.datasets.client import DatasetsClient
from llama_stack.providers.tests.datasetio.test_datasetio import data_url_from_file
class ScoringClient(Scoring):
def __init__(self, base_url: str):
self.base_url = base_url
async def initialize(self) -> None:
pass
async def shutdown(self) -> None:
pass
async def score_batch(
self, dataset_id: str, scoring_functions: List[str]
) -> ScoreBatchResponse:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/scoring/score_batch",
json={
"dataset_id": dataset_id,
"scoring_functions": scoring_functions,
},
headers={"Content-Type": "application/json"},
timeout=60,
)
response.raise_for_status()
if not response.json():
return
return ScoreBatchResponse(**response.json())
async def score(
self, input_rows: List[Dict[str, Any]], scoring_functions: List[str]
) -> ScoreResponse:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/scoring/score",
json={
"input_rows": input_rows,
"scoring_functions": scoring_functions,
},
headers={"Content-Type": "application/json"},
timeout=60,
)
response.raise_for_status()
if not response.json():
return
return ScoreResponse(**response.json())
async def run_main(host: str, port: int):
client = DatasetsClient(f"http://{host}:{port}")
# register dataset
test_file = (
Path(os.path.abspath(__file__)).parent.parent.parent
/ "providers/tests/datasetio/test_dataset.csv"
)
test_url = data_url_from_file(str(test_file))
response = await client.register_dataset(
DatasetDefWithProvider(
identifier="test-dataset",
provider_id="meta0",
url=URL(
uri=test_url,
),
dataset_schema={
"generated_answer": StringType(),
"expected_answer": StringType(),
"input_query": StringType(),
},
)
)
# list datasets
list_dataset = await client.list_datasets()
cprint(list_dataset, "blue")
# datsetio client to get the rows
datasetio_client = DatasetIOClient(f"http://{host}:{port}")
response = await datasetio_client.get_rows_paginated(
dataset_id="test-dataset",
rows_in_page=4,
page_token=None,
filter_condition=None,
)
cprint(f"Returned {len(response.rows)} rows \n {response}", "green")
# scoring client to score the rows
scoring_client = ScoringClient(f"http://{host}:{port}")
response = await scoring_client.score(
input_rows=response.rows,
scoring_functions=["equality"],
)
cprint(f"score response={response}", "blue")
# test scoring batch using datasetio api
scoring_client = ScoringClient(f"http://{host}:{port}")
response = await scoring_client.score_batch(
dataset_id="test-dataset",
scoring_functions=["equality"],
)
cprint(f"score_batch response={response}", "cyan")
def main(host: str, port: int):
asyncio.run(run_main(host, port))
if __name__ == "__main__":
fire.Fire(main)

View file

@ -13,18 +13,27 @@ from llama_models.llama3.api.datatypes import * # noqa: F403
from llama_stack.apis.scoring_functions import * # noqa: F403
ScoringResult = Dict[str, Any]
# mapping of metric to value
ScoringResultRow = Dict[str, Any]
@json_schema_type
class ScoringResult(BaseModel):
score_rows: List[ScoringResultRow]
# aggregated metrics to value
aggregated_results: Dict[str, Any]
@json_schema_type
class ScoreBatchResponse(BaseModel):
dataset_id: str
dataset_id: Optional[str] = None
results: Dict[str, ScoringResult]
@json_schema_type
class ScoreResponse(BaseModel):
# each key in the dict is a scoring function name
results: List[Dict[str, ScoringResult]]
results: Dict[str, ScoringResult]
class ScoringFunctionStore(Protocol):
@ -37,7 +46,10 @@ class Scoring(Protocol):
@webmethod(route="/scoring/score_batch")
async def score_batch(
self, dataset_id: str, scoring_functions: List[str]
self,
dataset_id: str,
scoring_functions: List[str],
save_results_dataset: bool = False,
) -> ScoreBatchResponse: ...
@webmethod(route="/scoring/score")

View file

@ -4,20 +4,10 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import (
Any,
Dict,
List,
Literal,
Optional,
Protocol,
runtime_checkable,
Union,
)
from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
from llama_models.schema_utils import json_schema_type, webmethod
from pydantic import BaseModel, Field
from typing_extensions import Annotated
from llama_stack.apis.common.type_system import ParamType
@ -33,45 +23,37 @@ class Parameter(BaseModel):
# with standard metrics so they can be rolled up?
class LLMAsJudgeContext(BaseModel):
judge_model: str
prompt_template: Optional[str] = None
@json_schema_type
class CommonDef(BaseModel):
name: str
class ScoringFunctionDef(BaseModel):
identifier: str
description: Optional[str] = None
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Any additional metadata for this definition",
)
# Hack: same with memory_banks for union defs
provider_id: str = ""
@json_schema_type
class DeterministicFunctionDef(CommonDef):
type: Literal["deterministic"] = "deterministic"
parameters: List[Parameter] = Field(
description="List of parameters for the deterministic function",
default_factory=list,
)
return_type: ParamType = Field(
description="The return type of the deterministic function",
)
context: Optional[LLMAsJudgeContext] = None
# We can optionally add information here to support packaging of code, etc.
@json_schema_type
class LLMJudgeFunctionDef(CommonDef):
type: Literal["judge"] = "judge"
model: str = Field(
description="The LLM model to use for the judge function",
class ScoringFunctionDefWithProvider(ScoringFunctionDef):
provider_id: str = Field(
description="ID of the provider which serves this dataset",
)
ScoringFunctionDef = Annotated[
Union[DeterministicFunctionDef, LLMJudgeFunctionDef], Field(discriminator="type")
]
ScoringFunctionDefWithProvider = ScoringFunctionDef
@runtime_checkable
class ScoringFunctions(Protocol):
@webmethod(route="/scoring_functions/list", method="GET")
@ -84,5 +66,5 @@ class ScoringFunctions(Protocol):
@webmethod(route="/scoring_functions/register", method="POST")
async def register_scoring_function(
self, function: ScoringFunctionDefWithProvider
self, function_def: ScoringFunctionDefWithProvider
) -> None: ...