This commit is contained in:
Xi Yan 2024-11-07 18:15:13 -08:00
commit 33b6d9b7b7
8 changed files with 67 additions and 304 deletions

View file

@ -7,14 +7,7 @@ from enum import Enum
from llama_models.llama3.api.datatypes import * # noqa: F403
from .....apis.common.job_types import Job
from .....apis.eval.eval import (
AppEvalTaskConfig,
BenchmarkEvalTaskConfig,
Eval,
EvalTaskConfig,
EvaluateResponse,
JobStatus,
)
from .....apis.eval.eval import Eval, EvalTaskConfig, EvaluateResponse, JobStatus
from llama_stack.apis.common.type_system import * # noqa: F403
from tqdm import tqdm
@ -28,12 +21,6 @@ from llama_stack.providers.datatypes import EvalTasksProtocolPrivate
from .config import MetaReferenceEvalConfig
# NOTE: this is the default eval task identifier for app eval
# it is used to make the router work for all app evals
# For app eval using other eval providers, the eval task identifier will be different
DEFAULT_EVAL_TASK_IDENTIFIER = "meta-reference::app_eval"
class ColumnName(Enum):
input_query = "input_query"
expected_answer = "expected_answer"
@ -60,30 +47,15 @@ class MetaReferenceEvalImpl(Eval, EvalTasksProtocolPrivate):
# TODO: assume sync job, will need jobs API for async scheduling
self.jobs = {}
# Keep track of benchmark eval tasks that are supported by this provider
self.eval_tasks = {}
async def initialize(self) -> None:
self.eval_tasks = {
# NOTE: In order to be routed to this provider, the eval task def must have
# a EvalTaskDef with identifier defined as DEFAULT_EVAL_TASK_IDENTIFIER
# for app eval where eval task benchmark_id is not pre-registered
DEFAULT_EVAL_TASK_IDENTIFIER: EvalTaskDef(
identifier=DEFAULT_EVAL_TASK_IDENTIFIER,
dataset_id="",
scoring_functions=[],
),
"meta-reference-mmlu": EvalTaskDef(
identifier="meta-reference-mmlu",
dataset_id="llamastack_mmlu",
scoring_functions=[
"meta-reference::regex_parser_multiple_choice_answer"
],
),
}
async def initialize(self) -> None: ...
async def shutdown(self) -> None: ...
async def register_eval_task(self, task_def: EvalTaskDef) -> None:
self.eval_tasks[task_def.identifier] = task_def
async def list_eval_tasks(self) -> List[EvalTaskDef]:
return list(self.eval_tasks.values())
@ -110,39 +82,15 @@ class MetaReferenceEvalImpl(Eval, EvalTasksProtocolPrivate):
f"Dataset {dataset_id} does not have a correct input schema in {expected_schemas}"
)
async def run_benchmark(
self,
benchmark_id: str,
benchmark_config: BenchmarkEvalTaskConfig,
) -> Job:
eval_task_def = self.eval_tasks[benchmark_id]
all_rows = await self.datasetio_api.get_rows_paginated(
dataset_id=eval_task_def.dataset_id,
rows_in_page=(
-1
if benchmark_config.num_examples is None
else benchmark_config.num_examples
),
)
res = await self.evaluate_rows(
input_rows=all_rows.rows,
scoring_functions=eval_task_def.scoring_functions,
task_config=benchmark_config,
)
# TODO: currently needs to wait for generation before returning
# need job scheduler queue (celery) w/ jobs api
job_id = str(len(self.jobs))
self.jobs[job_id] = res
return Job(job_id=job_id)
async def run_eval(
self,
task: EvalTaskDef,
task_config: AppEvalTaskConfig,
task_id: str,
task_config: EvalTaskConfig,
) -> Job:
dataset_id = task.dataset_id
task_def = self.eval_tasks[task_id]
dataset_id = task_def.dataset_id
candidate = task_config.eval_candidate
scoring_functions = task.scoring_functions
scoring_functions = task_def.scoring_functions
await self.validate_eval_input_dataset_schema(dataset_id=dataset_id)
all_rows = await self.datasetio_api.get_rows_paginated(
@ -152,6 +100,7 @@ class MetaReferenceEvalImpl(Eval, EvalTasksProtocolPrivate):
),
)
res = await self.evaluate_rows(
task_id=task_id,
input_rows=all_rows.rows,
scoring_functions=scoring_functions,
task_config=task_config,
@ -165,10 +114,10 @@ class MetaReferenceEvalImpl(Eval, EvalTasksProtocolPrivate):
async def evaluate_rows(
self,
task_id: str,
input_rows: List[Dict[str, Any]],
scoring_functions: List[str],
task_config: EvalTaskConfig,
eval_task_id: Optional[str] = None,
) -> EvaluateResponse:
candidate = task_config.eval_candidate
if candidate.type == "agent":
@ -238,17 +187,17 @@ class MetaReferenceEvalImpl(Eval, EvalTasksProtocolPrivate):
return EvaluateResponse(generations=generations, scores=score_response.results)
async def job_status(self, job_id: str, eval_task_id: str) -> Optional[JobStatus]:
async def job_status(self, task_id: str, job_id: str) -> Optional[JobStatus]:
if job_id in self.jobs:
return JobStatus.completed
return None
async def job_cancel(self, job_id: str, eval_task_id: str) -> None:
async def job_cancel(self, task_id: str, job_id: str) -> None:
raise NotImplementedError("Job cancel is not implemented yet")
async def job_result(self, job_id: str, eval_task_id: str) -> EvaluateResponse:
status = await self.job_status(job_id, eval_task_id)
async def job_result(self, task_id: str, job_id: str) -> EvaluateResponse:
status = await self.job_status(task_id, job_id)
if not status or status != JobStatus.completed:
raise ValueError(f"Job is not completed, Status: {status.value}")