generator + scorer Api for MMLU

This commit is contained in:
Xi Yan 2024-10-13 23:27:02 -07:00
parent fb565dfb06
commit a25aff290e
14 changed files with 618 additions and 131 deletions

View file

@ -5,7 +5,7 @@
# the root directory of this source tree.
from abc import ABC, abstractmethod
from typing import Dict, Generic, List, Protocol
from typing import Dict, Generic, List, Optional, Protocol
from llama_models.schema_utils import webmethod
from pydantic import BaseModel
@ -24,14 +24,14 @@ class EvaluationJobLogStream(BaseModel):
@json_schema_type
class EvalResult(BaseModel):
"""Evaluation result."""
"""Aggregated final evaluation result."""
metrics: Dict[str, str]
metrics: Dict[str, float]
@json_schema_type
class SingleEvalResult(BaseModel):
"""Single evaluation result."""
"""Single evaluation result. Contains a scorer name, and corresponding metrics from scorer."""
score_data: Dict[str, float]
@ -64,57 +64,222 @@ class EvaluationJobCreateResponse(BaseModel):
@json_schema_type
class EvaluateTaskConfig(BaseModel):
# num examples to evaluate, evaluate all if None
n_samples: Optional[int] = None
# model evaluation params
class EvaluateDatasetConfig(BaseModel):
# identifier to previously registered dataset via DatasetDef
dataset_name: str
# limit number of rows to evaluate
row_limit: Optional[int] = None
kwargs: Optional[Dict[str, Any]] = None
@json_schema_type
class EvaluatePreprocessConfig(BaseModel):
kwargs: Optional[Dict[str, Any]] = None
@json_schema_type
class EvaluateModelGenerationConfig(BaseModel):
model: str
sampling_params: SamplingParams = SamplingParams()
kwargs: Optional[Dict[str, Any]] = None
class BaseTask(ABC, Generic[TDatasetSample, TProcessedSample]):
@json_schema_type
class EvaluatePostprocessConfig(BaseModel):
kwargs: Optional[Dict[str, Any]] = None
@json_schema_type
class EvaluateJudgeScoringConfig(BaseModel): ...
@json_schema_type
class LLMJudgeConfig(BaseModel):
judge_preprocess_config: EvaluatePreprocessConfig
judge_model_generation_config: EvaluateModelGenerationConfig
judge_postprocess_config: EvaluatePostprocessConfig
judge_scoring_config: EvaluateJudgeScoringConfig
@json_schema_type
class EvaluateSingleScorerConfig(BaseModel):
scorer_name: str
llm_judge_config: Optional[LLMJudgeConfig] = None
@json_schema_type
class EvaluateScoringConfig(BaseModel):
# list of scorer (metrics) names to use
scorer_config_list: List[EvaluateSingleScorerConfig]
@json_schema_type
class EvaluateTaskConfig(BaseModel):
dataset_config: EvaluateDatasetConfig
preprocess_config: Optional[EvaluatePreprocessConfig] = None
generation_config: EvaluateModelGenerationConfig
postprocess_config: Optional[EvaluatePostprocessConfig] = None
scoring_config: EvaluateScoringConfig
class BaseGeneratorProcessor(
ABC,
Generic[
TDatasetSample,
TPreprocessedSample,
TGenerationResponseSample,
TScorerInputSample,
],
):
"""
A task represents a single evaluation benchmark, including it's dataset, preprocessing, postprocessing and scoring methods.
Base class for all evaluation tasks. Each task needs to implement the following methods:
- F1: preprocess_sample(self)
Base class for all generator processors. Each processor needs to implement the following methods:
- F1: preprocess_sample(self, dataset)
- F2: postprocess_sample(self)
- F3: score_sample(self)
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._name = self.__class__.__name__
def __str__(self) -> str:
return self.__class__.__name__
def preprocess(
self, dataset: BaseDataset[TDatasetSample]
) -> List[TPreprocessedSample]:
return [self.preprocess_sample(sample) for sample in dataset]
def postprocess(
self,
generation: List[TGenerationResponseSample],
dataset: BaseDataset[TDatasetSample],
) -> List[TScorerInputSample]:
return [
self.postprocess_sample(generation_sample, dataset_sample)
for generation_sample, dataset_sample in zip(generation, dataset)
]
@abstractmethod
def preprocess_sample(self, sample: TDatasetSample) -> TProcessedSample:
def preprocess_sample(self, sample: TDatasetSample) -> TPreprocessedSample:
raise NotImplementedError()
@abstractmethod
def postprocess_sample(self, sample: TProcessedSample) -> TProcessedSample:
def postprocess_sample(
self,
generation_sample: TGenerationResponseSample,
dataset_sample: TDatasetSample,
) -> TScorerInputSample:
raise NotImplementedError()
class BaseGenerator(ABC, Generic[TGenerationResponseSample]):
"""
Base class for all generators. Each generator needs to implement the following methods:
- generate(self, preprocessed_dataset)
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
def __str__(self) -> str:
return self.__class__.__name__
@abstractmethod
def score_sample(self, sample: TProcessedSample) -> SingleEvalResult:
def generate(
self, preprocessed_dataset: List[TPreprocessedSample]
) -> List[TGenerationResponseSample]:
raise NotImplementedError()
class BaseScorer(ABC, Generic[TScorerInputSample]):
"""
Base class for all scorers. Each scorer needs to implement the following methods:
- score_sample(self, scorer_input_sample)
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
def __str__(self) -> str:
return self.__class__.__name__
@abstractmethod
def score_sample(self, scorer_input_sample: TScorerInputSample) -> SingleEvalResult:
raise NotImplementedError()
@abstractmethod
def aggregate_results(self, eval_results: List[SingleEvalResult]) -> EvalResult:
raise NotImplementedError()
def preprocess(
self, dataset: BaseDataset[TProcessedSample]
) -> List[TProcessedSample]:
return [self.preprocess_sample(sample) for sample in dataset]
def score(
self, prepared_eval_dataset: List[TScorerInputSample]
) -> List[SingleEvalResult]:
return [self.score_sample(sample) for sample in prepared_eval_dataset]
def postprocess(self, generation: List[TProcessedSample]) -> List[TProcessedSample]:
return [self.postprocess_sample(sample) for sample in generation]
def score(self, postprocessed: List[TProcessedSample]) -> List[SingleEvalResult]:
return [self.score_sample(sample) for sample in postprocessed]
class BaseTask(ABC):
def __init__(
self,
generator_processor: Optional[BaseGeneratorProcessor] = None,
generator: Optional[BaseGenerator] = None,
scorer: Optional[BaseScorer] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.generator_processor = generator_processor
self.generator = generator
self.scorer = scorer
@abstractmethod
def run(self, *args, **kwargs) -> EvalResult:
raise NotImplementedError()
# class BaseTask(ABC, Generic[TDatasetSample, TProcessedSample]):
# """
# A task represents a single evaluation benchmark, including it's dataset, preprocessing, postprocessing and scoring methods.
# Base class for all evaluation tasks. Each task needs to implement the following methods:
# - F1: preprocess_sample(self)
# - F2: postprocess_sample(self)
# - F3: score_sample(self)
# """
# def __init__(self, *args, **kwargs) -> None:
# super().__init__(*args, **kwargs)
# self._name = self.__class__.__name__
# @abstractmethod
# def preprocess_sample(self, sample: TDatasetSample) -> TProcessedSample:
# raise NotImplementedError()
# @abstractmethod
# def postprocess_sample(self, sample: TProcessedSample) -> TProcessedSample:
# raise NotImplementedError()
# @abstractmethod
# def score_sample(self, sample: TProcessedSample) -> SingleEvalResult:
# raise NotImplementedError()
# @abstractmethod
# def aggregate_results(self, eval_results: List[SingleEvalResult]) -> EvalResult:
# raise NotImplementedError()
# def preprocess(
# self, dataset: BaseDataset[TProcessedSample]
# ) -> List[TProcessedSample]:
# return [self.preprocess_sample(sample) for sample in dataset]
# def postprocess(self, generation: List[TProcessedSample]) -> List[TProcessedSample]:
# return [self.postprocess_sample(sample) for sample in generation]
# def score(self, postprocessed: List[TProcessedSample]) -> List[SingleEvalResult]:
# return [self.score_sample(sample) for sample in postprocessed]
class Evals(Protocol):
@webmethod(route="/evals/run")
async def run_evals(
@webmethod(route="/evals/run_eval_task")
async def run_eval_task(
self,
model: str,
task: str,
@ -122,6 +287,13 @@ class Evals(Protocol):
eval_task_config: Optional[EvaluateTaskConfig] = None,
) -> EvaluateResponse: ...
@webmethod(route="/evals/run_scorer")
async def run_scorer(
self,
dataset_config: EvaluateDatasetConfig,
eval_scoring_config: EvaluateScoringConfig,
) -> EvaluateResponse: ...
# @webmethod(route="/evals/jobs")
# def get_evaluation_jobs(self) -> List[EvaluationJob]: ...