Merge pull request #4956 from BerriAI/litellm_add_finetuning_Endpoints

[Feat] Add `litellm.create_fine_tuning_job()` , `litellm.list_fine_tuning_jobs()`, `litellm.cancel_fine_tuning_job()` finetuning endpoints
This commit is contained in:
Ishaan Jaff 2024-07-30 07:47:29 -07:00 committed by GitHub
commit 36dca6bcce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 791 additions and 1 deletions

View file

@ -906,6 +906,7 @@ from .proxy.proxy_cli import run_server
from .router import Router from .router import Router
from .assistants.main import * from .assistants.main import *
from .batches.main import * from .batches.main import *
from .fine_tuning.main import *
from .files.main import * from .files.main import *
from .scheduler import * from .scheduler import *
from .cost_calculator import response_cost_calculator, cost_per_token from .cost_calculator import response_cost_calculator, cost_per_token

429
litellm/fine_tuning/main.py Normal file
View file

@ -0,0 +1,429 @@
"""
Main File for Fine Tuning API implementation
https://platform.openai.com/docs/api-reference/fine-tuning
- fine_tuning.jobs.create()
- fine_tuning.jobs.list()
- client.fine_tuning.jobs.list_events()
"""
import asyncio
import contextvars
import os
from functools import partial
from typing import Any, Coroutine, Dict, Literal, Optional, Union
import httpx
import litellm
from litellm.llms.openai_fine_tuning.openai import (
FineTuningJob,
FineTuningJobCreate,
OpenAIFineTuningAPI,
)
from litellm.types.llms.openai import Hyperparameters
from litellm.types.router import *
from litellm.utils import supports_httpx_timeout
####### ENVIRONMENT VARIABLES ###################
openai_fine_tuning_instance = OpenAIFineTuningAPI()
#################################################
async def acreate_fine_tuning_job(
model: str,
training_file: str,
hyperparameters: Optional[Hyperparameters] = {}, # type: ignore
suffix: Optional[str] = None,
validation_file: Optional[str] = None,
integrations: Optional[List[str]] = None,
seed: Optional[int] = None,
custom_llm_provider: Literal["openai"] = "openai",
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
) -> FineTuningJob:
"""
Async: Creates and executes a batch from an uploaded file of request
"""
try:
loop = asyncio.get_event_loop()
kwargs["acreate_fine_tuning_job"] = True
# Use a partial function to pass your keyword arguments
func = partial(
create_fine_tuning_job,
model,
training_file,
hyperparameters,
suffix,
validation_file,
integrations,
seed,
custom_llm_provider,
extra_headers,
extra_body,
**kwargs,
)
# Add the context to the function
ctx = contextvars.copy_context()
func_with_context = partial(ctx.run, func)
init_response = await loop.run_in_executor(None, func_with_context)
if asyncio.iscoroutine(init_response):
response = await init_response
else:
response = init_response # type: ignore
return response
except Exception as e:
raise e
def create_fine_tuning_job(
model: str,
training_file: str,
hyperparameters: Optional[Hyperparameters] = {}, # type: ignore
suffix: Optional[str] = None,
validation_file: Optional[str] = None,
integrations: Optional[List[str]] = None,
seed: Optional[int] = None,
custom_llm_provider: Literal["openai"] = "openai",
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
) -> Union[FineTuningJob, Coroutine[Any, Any, FineTuningJob]]:
"""
Creates a fine-tuning job which begins the process of creating a new model from a given dataset.
Response includes details of the enqueued job including job status and the name of the fine-tuned models once complete
"""
try:
optional_params = GenericLiteLLMParams(**kwargs)
if custom_llm_provider == "openai":
# for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there
api_base = (
optional_params.api_base
or litellm.api_base
or os.getenv("OPENAI_API_BASE")
or "https://api.openai.com/v1"
)
organization = (
optional_params.organization
or litellm.organization
or os.getenv("OPENAI_ORGANIZATION", None)
or None # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105
)
# set API KEY
api_key = (
optional_params.api_key
or litellm.api_key # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there
or litellm.openai_key
or os.getenv("OPENAI_API_KEY")
)
### TIMEOUT LOGIC ###
timeout = (
optional_params.timeout or kwargs.get("request_timeout", 600) or 600
)
# set timeout for 10 minutes by default
if (
timeout is not None
and isinstance(timeout, httpx.Timeout)
and supports_httpx_timeout(custom_llm_provider) == False
):
read_timeout = timeout.read or 600
timeout = read_timeout # default 10 min timeout
elif timeout is not None and not isinstance(timeout, httpx.Timeout):
timeout = float(timeout) # type: ignore
elif timeout is None:
timeout = 600.0
_is_async = kwargs.pop("acreate_fine_tuning_job", False) is True
create_fine_tuning_job_data = FineTuningJobCreate(
model=model,
training_file=training_file,
hyperparameters=hyperparameters,
suffix=suffix,
validation_file=validation_file,
integrations=integrations,
seed=seed,
)
response = openai_fine_tuning_instance.create_fine_tuning_job(
api_base=api_base,
api_key=api_key,
organization=organization,
create_fine_tuning_job_data=create_fine_tuning_job_data,
timeout=timeout,
max_retries=optional_params.max_retries,
_is_async=_is_async,
)
else:
raise litellm.exceptions.BadRequestError(
message="LiteLLM doesn't support {} for 'create_batch'. Only 'openai' is supported.".format(
custom_llm_provider
),
model="n/a",
llm_provider=custom_llm_provider,
response=httpx.Response(
status_code=400,
content="Unsupported provider",
request=httpx.Request(method="create_thread", url="https://github.com/BerriAI/litellm"), # type: ignore
),
)
return response
except Exception as e:
raise e
async def acancel_fine_tuning_job(
fine_tuning_job_id: str,
custom_llm_provider: Literal["openai"] = "openai",
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
) -> FineTuningJob:
"""
Async: Immediately cancel a fine-tune job.
"""
try:
loop = asyncio.get_event_loop()
kwargs["acancel_fine_tuning_job"] = True
# Use a partial function to pass your keyword arguments
func = partial(
cancel_fine_tuning_job,
fine_tuning_job_id,
custom_llm_provider,
extra_headers,
extra_body,
**kwargs,
)
# Add the context to the function
ctx = contextvars.copy_context()
func_with_context = partial(ctx.run, func)
init_response = await loop.run_in_executor(None, func_with_context)
if asyncio.iscoroutine(init_response):
response = await init_response
else:
response = init_response # type: ignore
return response
except Exception as e:
raise e
def cancel_fine_tuning_job(
fine_tuning_job_id: str,
custom_llm_provider: Literal["openai"] = "openai",
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
) -> Union[FineTuningJob, Coroutine[Any, Any, FineTuningJob]]:
"""
Immediately cancel a fine-tune job.
Response includes details of the enqueued job including job status and the name of the fine-tuned models once complete
"""
try:
optional_params = GenericLiteLLMParams(**kwargs)
if custom_llm_provider == "openai":
# for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there
api_base = (
optional_params.api_base
or litellm.api_base
or os.getenv("OPENAI_API_BASE")
or "https://api.openai.com/v1"
)
organization = (
optional_params.organization
or litellm.organization
or os.getenv("OPENAI_ORGANIZATION", None)
or None # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105
)
# set API KEY
api_key = (
optional_params.api_key
or litellm.api_key # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there
or litellm.openai_key
or os.getenv("OPENAI_API_KEY")
)
### TIMEOUT LOGIC ###
timeout = (
optional_params.timeout or kwargs.get("request_timeout", 600) or 600
)
# set timeout for 10 minutes by default
if (
timeout is not None
and isinstance(timeout, httpx.Timeout)
and supports_httpx_timeout(custom_llm_provider) == False
):
read_timeout = timeout.read or 600
timeout = read_timeout # default 10 min timeout
elif timeout is not None and not isinstance(timeout, httpx.Timeout):
timeout = float(timeout) # type: ignore
elif timeout is None:
timeout = 600.0
_is_async = kwargs.pop("acancel_fine_tuning_job", False) is True
response = openai_fine_tuning_instance.cancel_fine_tuning_job(
api_base=api_base,
api_key=api_key,
organization=organization,
fine_tuning_job_id=fine_tuning_job_id,
timeout=timeout,
max_retries=optional_params.max_retries,
_is_async=_is_async,
)
else:
raise litellm.exceptions.BadRequestError(
message="LiteLLM doesn't support {} for 'create_batch'. Only 'openai' is supported.".format(
custom_llm_provider
),
model="n/a",
llm_provider=custom_llm_provider,
response=httpx.Response(
status_code=400,
content="Unsupported provider",
request=httpx.Request(method="create_thread", url="https://github.com/BerriAI/litellm"), # type: ignore
),
)
return response
except Exception as e:
raise e
async def alist_fine_tuning_jobs(
after: Optional[str] = None,
limit: Optional[int] = None,
custom_llm_provider: Literal["openai"] = "openai",
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
) -> FineTuningJob:
"""
Async: List your organization's fine-tuning jobs
"""
try:
loop = asyncio.get_event_loop()
kwargs["alist_fine_tuning_jobs"] = True
# Use a partial function to pass your keyword arguments
func = partial(
list_fine_tuning_jobs,
after,
limit,
custom_llm_provider,
extra_headers,
extra_body,
**kwargs,
)
# Add the context to the function
ctx = contextvars.copy_context()
func_with_context = partial(ctx.run, func)
init_response = await loop.run_in_executor(None, func_with_context)
if asyncio.iscoroutine(init_response):
response = await init_response
else:
response = init_response # type: ignore
return response
except Exception as e:
raise e
def list_fine_tuning_jobs(
after: Optional[str] = None,
limit: Optional[int] = None,
custom_llm_provider: Literal["openai"] = "openai",
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
):
"""
List your organization's fine-tuning jobs
Params:
- after: Optional[str] = None, Identifier for the last job from the previous pagination request.
- limit: Optional[int] = None, Number of fine-tuning jobs to retrieve. Defaults to 20
"""
try:
optional_params = GenericLiteLLMParams(**kwargs)
if custom_llm_provider == "openai":
# for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there
api_base = (
optional_params.api_base
or litellm.api_base
or os.getenv("OPENAI_API_BASE")
or "https://api.openai.com/v1"
)
organization = (
optional_params.organization
or litellm.organization
or os.getenv("OPENAI_ORGANIZATION", None)
or None # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105
)
# set API KEY
api_key = (
optional_params.api_key
or litellm.api_key # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there
or litellm.openai_key
or os.getenv("OPENAI_API_KEY")
)
### TIMEOUT LOGIC ###
timeout = (
optional_params.timeout or kwargs.get("request_timeout", 600) or 600
)
# set timeout for 10 minutes by default
if (
timeout is not None
and isinstance(timeout, httpx.Timeout)
and supports_httpx_timeout(custom_llm_provider) == False
):
read_timeout = timeout.read or 600
timeout = read_timeout # default 10 min timeout
elif timeout is not None and not isinstance(timeout, httpx.Timeout):
timeout = float(timeout) # type: ignore
elif timeout is None:
timeout = 600.0
_is_async = kwargs.pop("alist_fine_tuning_jobs", False) is True
response = openai_fine_tuning_instance.list_fine_tuning_jobs(
api_base=api_base,
api_key=api_key,
organization=organization,
after=after,
limit=limit,
timeout=timeout,
max_retries=optional_params.max_retries,
_is_async=_is_async,
)
else:
raise litellm.exceptions.BadRequestError(
message="LiteLLM doesn't support {} for 'create_batch'. Only 'openai' is supported.".format(
custom_llm_provider
),
model="n/a",
llm_provider=custom_llm_provider,
response=httpx.Response(
status_code=400,
content="Unsupported provider",
request=httpx.Request(method="create_thread", url="https://github.com/BerriAI/litellm"), # type: ignore
),
)
return response
except Exception as e:
raise e

View file

@ -0,0 +1,199 @@
from typing import Any, Coroutine, Optional, Union
import httpx
from openai import AsyncOpenAI, OpenAI
from openai.pagination import AsyncCursorPage
from openai.types.fine_tuning import FineTuningJob
from litellm._logging import verbose_logger
from litellm.llms.base import BaseLLM
from litellm.types.llms.openai import FineTuningJobCreate
class OpenAIFineTuningAPI(BaseLLM):
"""
OpenAI methods to support for batches
"""
def __init__(self) -> None:
super().__init__()
def get_openai_client(
self,
api_key: Optional[str],
api_base: Optional[str],
timeout: Union[float, httpx.Timeout],
max_retries: Optional[int],
organization: Optional[str],
client: Optional[Union[OpenAI, AsyncOpenAI]] = None,
_is_async: bool = False,
) -> Optional[Union[OpenAI, AsyncOpenAI]]:
received_args = locals()
openai_client: Optional[Union[OpenAI, AsyncOpenAI]] = None
if client is None:
data = {}
for k, v in received_args.items():
if k == "self" or k == "client" or k == "_is_async":
pass
elif k == "api_base" and v is not None:
data["base_url"] = v
elif v is not None:
data[k] = v
if _is_async is True:
openai_client = AsyncOpenAI(**data)
else:
openai_client = OpenAI(**data) # type: ignore
else:
openai_client = client
return openai_client
async def acreate_fine_tuning_job(
self,
create_fine_tuning_job_data: FineTuningJobCreate,
openai_client: AsyncOpenAI,
) -> FineTuningJob:
response = await openai_client.fine_tuning.jobs.create(
**create_fine_tuning_job_data # type: ignore
)
return response
def create_fine_tuning_job(
self,
_is_async: bool,
create_fine_tuning_job_data: FineTuningJobCreate,
api_key: Optional[str],
api_base: Optional[str],
timeout: Union[float, httpx.Timeout],
max_retries: Optional[int],
organization: Optional[str],
client: Optional[Union[OpenAI, AsyncOpenAI]] = None,
) -> Union[FineTuningJob, Union[Coroutine[Any, Any, FineTuningJob]]]:
openai_client: Optional[Union[OpenAI, AsyncOpenAI]] = self.get_openai_client(
api_key=api_key,
api_base=api_base,
timeout=timeout,
max_retries=max_retries,
organization=organization,
client=client,
_is_async=_is_async,
)
if openai_client is None:
raise ValueError(
"OpenAI client is not initialized. Make sure api_key is passed or OPENAI_API_KEY is set in the environment."
)
if _is_async is True:
if not isinstance(openai_client, AsyncOpenAI):
raise ValueError(
"OpenAI client is not an instance of AsyncOpenAI. Make sure you passed an AsyncOpenAI client."
)
return self.acreate_fine_tuning_job( # type: ignore
create_fine_tuning_job_data=create_fine_tuning_job_data,
openai_client=openai_client,
)
verbose_logger.debug(
"creating fine tuning job, args= %s", create_fine_tuning_job_data
)
response = openai_client.fine_tuning.jobs.create(**create_fine_tuning_job_data) # type: ignore
return response
async def acancel_fine_tuning_job(
self,
fine_tuning_job_id: str,
openai_client: AsyncOpenAI,
) -> FineTuningJob:
response = await openai_client.fine_tuning.jobs.cancel(
fine_tuning_job_id=fine_tuning_job_id
)
return response
def cancel_fine_tuning_job(
self,
_is_async: bool,
fine_tuning_job_id: str,
api_key: Optional[str],
api_base: Optional[str],
timeout: Union[float, httpx.Timeout],
max_retries: Optional[int],
organization: Optional[str],
client: Optional[Union[OpenAI, AsyncOpenAI]] = None,
):
openai_client: Optional[Union[OpenAI, AsyncOpenAI]] = self.get_openai_client(
api_key=api_key,
api_base=api_base,
timeout=timeout,
max_retries=max_retries,
organization=organization,
client=client,
_is_async=_is_async,
)
if openai_client is None:
raise ValueError(
"OpenAI client is not initialized. Make sure api_key is passed or OPENAI_API_KEY is set in the environment."
)
if _is_async is True:
if not isinstance(openai_client, AsyncOpenAI):
raise ValueError(
"OpenAI client is not an instance of AsyncOpenAI. Make sure you passed an AsyncOpenAI client."
)
return self.acancel_fine_tuning_job( # type: ignore
fine_tuning_job_id=fine_tuning_job_id,
openai_client=openai_client,
)
verbose_logger.debug("canceling fine tuning job, args= %s", fine_tuning_job_id)
response = openai_client.fine_tuning.jobs.cancel(
fine_tuning_job_id=fine_tuning_job_id
)
return response
async def alist_fine_tuning_jobs(
self,
openai_client: AsyncOpenAI,
after: Optional[str] = None,
limit: Optional[int] = None,
):
response = await openai_client.fine_tuning.jobs.list(after=after, limit=limit) # type: ignore
return response
def list_fine_tuning_jobs(
self,
_is_async: bool,
api_key: Optional[str],
api_base: Optional[str],
timeout: Union[float, httpx.Timeout],
max_retries: Optional[int],
organization: Optional[str],
client: Optional[Union[OpenAI, AsyncOpenAI]] = None,
after: Optional[str] = None,
limit: Optional[int] = None,
):
openai_client: Optional[Union[OpenAI, AsyncOpenAI]] = self.get_openai_client(
api_key=api_key,
api_base=api_base,
timeout=timeout,
max_retries=max_retries,
organization=organization,
client=client,
_is_async=_is_async,
)
if openai_client is None:
raise ValueError(
"OpenAI client is not initialized. Make sure api_key is passed or OPENAI_API_KEY is set in the environment."
)
if _is_async is True:
if not isinstance(openai_client, AsyncOpenAI):
raise ValueError(
"OpenAI client is not an instance of AsyncOpenAI. Make sure you passed an AsyncOpenAI client."
)
return self.alist_fine_tuning_jobs( # type: ignore
after=after,
limit=limit,
openai_client=openai_client,
)
verbose_logger.debug("list fine tuning job, after= %s, limit= %s", after, limit)
response = openai_client.fine_tuning.jobs.list(after=after, limit=limit) # type: ignore
return response
pass

View file

@ -0,0 +1,114 @@
import os
import sys
import traceback
import pytest
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
from openai import APITimeoutError as Timeout
import litellm
litellm.num_retries = 0
import logging
from litellm import create_fine_tuning_job
from litellm._logging import verbose_logger
def test_create_fine_tune_job():
verbose_logger.setLevel(logging.DEBUG)
file_name = "openai_batch_completions.jsonl"
_current_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(_current_dir, file_name)
file_obj = litellm.create_file(
file=open(file_path, "rb"),
purpose="fine-tune",
custom_llm_provider="openai",
)
print("Response from creating file=", file_obj)
create_fine_tuning_response = litellm.create_fine_tuning_job(
model="gpt-3.5-turbo-0125",
training_file=file_obj.id,
)
print("response from litellm.create_fine_tuning_job=", create_fine_tuning_response)
assert create_fine_tuning_response.id is not None
assert create_fine_tuning_response.model == "gpt-3.5-turbo-0125"
# list fine tuning jobs
print("listing ft jobs")
ft_jobs = litellm.list_fine_tuning_jobs(limit=2)
print("response from litellm.list_fine_tuning_jobs=", ft_jobs)
assert len(list(ft_jobs)) > 0
# delete file
litellm.file_delete(
file_id=file_obj.id,
)
# cancel ft job
response = litellm.cancel_fine_tuning_job(
fine_tuning_job_id=create_fine_tuning_response.id,
)
print("response from litellm.cancel_fine_tuning_job=", response)
assert response.status == "cancelled"
assert response.id == create_fine_tuning_response.id
pass
@pytest.mark.asyncio
async def test_create_fine_tune_jobs_async():
verbose_logger.setLevel(logging.DEBUG)
file_name = "openai_batch_completions.jsonl"
_current_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(_current_dir, file_name)
file_obj = await litellm.acreate_file(
file=open(file_path, "rb"),
purpose="fine-tune",
custom_llm_provider="openai",
)
print("Response from creating file=", file_obj)
create_fine_tuning_response = await litellm.acreate_fine_tuning_job(
model="gpt-3.5-turbo-0125",
training_file=file_obj.id,
)
print("response from litellm.create_fine_tuning_job=", create_fine_tuning_response)
assert create_fine_tuning_response.id is not None
assert create_fine_tuning_response.model == "gpt-3.5-turbo-0125"
# list fine tuning jobs
print("listing ft jobs")
ft_jobs = await litellm.alist_fine_tuning_jobs(limit=2)
print("response from litellm.list_fine_tuning_jobs=", ft_jobs)
assert len(list(ft_jobs)) > 0
# delete file
await litellm.afile_delete(
file_id=file_obj.id,
)
# cancel ft job
response = await litellm.acancel_fine_tuning_job(
fine_tuning_job_id=create_fine_tuning_response.id,
)
print("response from litellm.cancel_fine_tuning_job=", response)
assert response.status == "cancelled"
assert response.id == create_fine_tuning_response.id
pass

View file

@ -30,7 +30,7 @@ from openai.types.beta.thread_create_params import (
from openai.types.beta.threads.message import Message as OpenAIMessage from openai.types.beta.threads.message import Message as OpenAIMessage
from openai.types.beta.threads.message_content import MessageContent from openai.types.beta.threads.message_content import MessageContent
from openai.types.beta.threads.run import Run from openai.types.beta.threads.run import Run
from pydantic import BaseModel from pydantic import BaseModel, Field
from typing_extensions import Dict, Required, override from typing_extensions import Dict, Required, override
FileContent = Union[IO[bytes], bytes, PathLike] FileContent = Union[IO[bytes], bytes, PathLike]
@ -455,3 +455,50 @@ class ChatCompletionUsageBlock(TypedDict):
prompt_tokens: int prompt_tokens: int
completion_tokens: int completion_tokens: int
total_tokens: int total_tokens: int
class Hyperparameters(TypedDict):
batch_size: Optional[Union[str, int]] # "Number of examples in each batch."
learning_rate_multiplier: Optional[
Union[str, float]
] # Scaling factor for the learning rate
n_epochs: Optional[Union[str, int]] # "The number of epochs to train the model for"
class FineTuningJobCreate(TypedDict):
"""
FineTuningJobCreate - Create a fine-tuning job
Example Request
```
{
"model": "gpt-3.5-turbo",
"training_file": "file-abc123",
"hyperparameters": {
"batch_size": "auto",
"learning_rate_multiplier": 0.1,
"n_epochs": 3
},
"suffix": "custom-model-name",
"validation_file": "file-xyz789",
"integrations": ["slack"],
"seed": 42
}
```
"""
model: str # "The name of the model to fine-tune."
training_file: str # "The ID of an uploaded file that contains training data."
hyperparameters: Optional[
Hyperparameters
] # "The hyperparameters used for the fine-tuning job."
suffix: Optional[
str
] # "A string of up to 18 characters that will be added to your fine-tuned model name."
validation_file: Optional[
str
] # "The ID of an uploaded file that contains validation data."
integrations: Optional[
List[str]
] # "A list of integrations to enable for your fine-tuning job."
seed: Optional[int] # "The seed controls the reproducibility of the job."