litellm/litellm/llms/fine_tuning_apis/vertex_ai.py
Ishaan Jaff 920f4c9f82
(fix) add linting check to ban creating AsyncHTTPHandler during LLM calling (#6855)
* fix triton

* fix TEXT_COMPLETION_CODESTRAL

* fix REPLICATE

* fix CLARIFAI

* fix HUGGINGFACE

* add test_no_async_http_handler_usage

* fix PREDIBASE

* fix anthropic use get_async_httpx_client

* fix vertex fine tuning

* fix dbricks get_async_httpx_client

* fix get_async_httpx_client vertex

* fix get_async_httpx_client

* fix get_async_httpx_client

* fix make_async_azure_httpx_request

* fix check_for_async_http_handler

* test: cleanup mistral model

* add check for AsyncClient

* fix check_for_async_http_handler

* fix get_async_httpx_client

* fix tests using in_memory_llm_clients_cache

* fix langfuse import

* fix import

---------

Co-authored-by: Krrish Dholakia <krrishdholakia@gmail.com>
2024-11-21 19:03:02 -08:00

326 lines
12 KiB
Python

import traceback
from datetime import datetime
from typing import Any, Coroutine, Literal, Optional, Union
import httpx
from openai.types.fine_tuning.fine_tuning_job import FineTuningJob, Hyperparameters
import litellm
from litellm._logging import verbose_logger
from litellm.llms.base import BaseLLM
from litellm.llms.custom_httpx.http_handler import (
AsyncHTTPHandler,
HTTPHandler,
get_async_httpx_client,
)
from litellm.llms.vertex_ai_and_google_ai_studio.gemini.vertex_and_google_ai_studio_gemini import (
VertexLLM,
)
from litellm.types.llms.openai import FineTuningJobCreate
from litellm.types.llms.vertex_ai import (
FineTuneJobCreate,
FineTunesupervisedTuningSpec,
ResponseTuningJob,
)
class VertexFineTuningAPI(VertexLLM):
"""
Vertex methods to support for batches
"""
def __init__(self) -> None:
super().__init__()
self.async_handler = get_async_httpx_client(
llm_provider=litellm.LlmProviders.VERTEX_AI,
params={"timeout": 600.0},
)
def convert_response_created_at(self, response: ResponseTuningJob):
try:
create_time_str = response.get("createTime", "") or ""
create_time_datetime = datetime.fromisoformat(
create_time_str.replace("Z", "+00:00")
)
# Convert to Unix timestamp (seconds since epoch)
created_at = int(create_time_datetime.timestamp())
return created_at
except Exception:
return 0
def convert_vertex_response_to_open_ai_response(
self, response: ResponseTuningJob
) -> FineTuningJob:
status: Literal[
"validating_files", "queued", "running", "succeeded", "failed", "cancelled"
] = "queued"
if response["state"] == "JOB_STATE_PENDING":
status = "queued"
if response["state"] == "JOB_STATE_SUCCEEDED":
status = "succeeded"
if response["state"] == "JOB_STATE_FAILED":
status = "failed"
if response["state"] == "JOB_STATE_CANCELLED":
status = "cancelled"
if response["state"] == "JOB_STATE_RUNNING":
status = "running"
created_at = self.convert_response_created_at(response)
training_uri = ""
if "supervisedTuningSpec" in response and response["supervisedTuningSpec"]:
training_uri = response["supervisedTuningSpec"]["trainingDatasetUri"] or ""
return FineTuningJob(
id=response["name"] or "",
created_at=created_at,
fine_tuned_model=response["tunedModelDisplayName"],
finished_at=None,
hyperparameters=Hyperparameters(
n_epochs=0,
),
model=response["baseModel"] or "",
object="fine_tuning.job",
organization_id="",
result_files=[],
seed=0,
status=status,
trained_tokens=None,
training_file=training_uri,
validation_file=None,
estimated_finish=None,
integrations=[],
)
def convert_openai_request_to_vertex(
self, create_fine_tuning_job_data: FineTuningJobCreate, **kwargs
) -> FineTuneJobCreate:
"""
convert request from OpenAI format to Vertex format
https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/tuning
supervised_tuning_spec = FineTunesupervisedTuningSpec(
"""
hyperparameters = create_fine_tuning_job_data.hyperparameters
supervised_tuning_spec = FineTunesupervisedTuningSpec(
training_dataset_uri=create_fine_tuning_job_data.training_file,
validation_dataset=create_fine_tuning_job_data.validation_file,
)
if hyperparameters:
if hyperparameters.n_epochs:
supervised_tuning_spec["epoch_count"] = int(hyperparameters.n_epochs)
if hyperparameters.learning_rate_multiplier:
supervised_tuning_spec["learning_rate_multiplier"] = float(
hyperparameters.learning_rate_multiplier
)
supervised_tuning_spec["adapter_size"] = kwargs.get("adapter_size")
fine_tune_job = FineTuneJobCreate(
baseModel=create_fine_tuning_job_data.model,
supervisedTuningSpec=supervised_tuning_spec,
tunedModelDisplayName=create_fine_tuning_job_data.suffix,
)
return fine_tune_job
async def acreate_fine_tuning_job(
self,
fine_tuning_url: str,
headers: dict,
request_data: FineTuneJobCreate,
):
from litellm.fine_tuning.main import FineTuningJob
try:
verbose_logger.debug(
"about to create fine tuning job: %s, request_data: %s",
fine_tuning_url,
request_data,
)
if self.async_handler is None:
raise ValueError(
"VertexAI Fine Tuning - async_handler is not initialized"
)
response = await self.async_handler.post(
headers=headers,
url=fine_tuning_url,
json=request_data, # type: ignore
)
if response.status_code != 200:
raise Exception(
f"Error creating fine tuning job. Status code: {response.status_code}. Response: {response.text}"
)
verbose_logger.debug(
"got response from creating fine tuning job: %s", response.json()
)
vertex_response = ResponseTuningJob( # type: ignore
**response.json(),
)
verbose_logger.debug("vertex_response %s", vertex_response)
open_ai_response = self.convert_vertex_response_to_open_ai_response(
vertex_response
)
return open_ai_response
except Exception as e:
verbose_logger.error("asyncerror creating fine tuning job %s", e)
trace_back_str = traceback.format_exc()
verbose_logger.error(trace_back_str)
raise e
def create_fine_tuning_job(
self,
_is_async: bool,
create_fine_tuning_job_data: FineTuningJobCreate,
vertex_project: Optional[str],
vertex_location: Optional[str],
vertex_credentials: Optional[str],
api_base: Optional[str],
timeout: Union[float, httpx.Timeout],
**kwargs,
):
verbose_logger.debug(
"creating fine tuning job, args= %s", create_fine_tuning_job_data
)
_auth_header, vertex_project = self._ensure_access_token(
credentials=vertex_credentials,
project_id=vertex_project,
custom_llm_provider="vertex_ai_beta",
)
auth_header, _ = self._get_token_and_url(
model="",
auth_header=_auth_header,
gemini_api_key=None,
vertex_credentials=vertex_credentials,
vertex_project=vertex_project,
vertex_location=vertex_location,
stream=False,
custom_llm_provider="vertex_ai_beta",
api_base=api_base,
)
headers = {
"Authorization": f"Bearer {auth_header}",
"Content-Type": "application/json",
}
fine_tune_job = self.convert_openai_request_to_vertex(
create_fine_tuning_job_data=create_fine_tuning_job_data, **kwargs
)
fine_tuning_url = f"https://{vertex_location}-aiplatform.googleapis.com/v1/projects/{vertex_project}/locations/{vertex_location}/tuningJobs"
if _is_async is True:
return self.acreate_fine_tuning_job( # type: ignore
fine_tuning_url=fine_tuning_url,
headers=headers,
request_data=fine_tune_job,
)
sync_handler = HTTPHandler(timeout=httpx.Timeout(timeout=600.0, connect=5.0))
verbose_logger.debug(
"about to create fine tuning job: %s, request_data: %s",
fine_tuning_url,
fine_tune_job,
)
response = sync_handler.post(
headers=headers,
url=fine_tuning_url,
json=fine_tune_job, # type: ignore
)
if response.status_code != 200:
raise Exception(
f"Error creating fine tuning job. Status code: {response.status_code}. Response: {response.text}"
)
verbose_logger.debug(
"got response from creating fine tuning job: %s", response.json()
)
vertex_response = ResponseTuningJob( # type: ignore
**response.json(),
)
verbose_logger.debug("vertex_response %s", vertex_response)
open_ai_response = self.convert_vertex_response_to_open_ai_response(
vertex_response
)
return open_ai_response
async def pass_through_vertex_ai_POST_request(
self,
request_data: dict,
vertex_project: str,
vertex_location: str,
vertex_credentials: str,
request_route: str,
):
_auth_header, vertex_project = await self._ensure_access_token_async(
credentials=vertex_credentials,
project_id=vertex_project,
custom_llm_provider="vertex_ai_beta",
)
auth_header, _ = self._get_token_and_url(
model="",
auth_header=_auth_header,
gemini_api_key=None,
vertex_credentials=vertex_credentials,
vertex_project=vertex_project,
vertex_location=vertex_location,
stream=False,
custom_llm_provider="vertex_ai_beta",
api_base="",
)
headers = {
"Authorization": f"Bearer {auth_header}",
"Content-Type": "application/json",
}
url = None
if request_route == "/tuningJobs":
url = f"https://{vertex_location}-aiplatform.googleapis.com/v1/projects/{vertex_project}/locations/{vertex_location}/tuningJobs"
elif "/tuningJobs/" in request_route and "cancel" in request_route:
url = f"https://{vertex_location}-aiplatform.googleapis.com/v1/projects/{vertex_project}/locations/{vertex_location}/tuningJobs{request_route}"
elif "generateContent" in request_route:
url = f"https://{vertex_location}-aiplatform.googleapis.com/v1/projects/{vertex_project}/locations/{vertex_location}{request_route}"
elif "predict" in request_route:
url = f"https://{vertex_location}-aiplatform.googleapis.com/v1/projects/{vertex_project}/locations/{vertex_location}{request_route}"
elif "/batchPredictionJobs" in request_route:
url = f"https://{vertex_location}-aiplatform.googleapis.com/v1/projects/{vertex_project}/locations/{vertex_location}{request_route}"
elif "countTokens" in request_route:
url = f"https://{vertex_location}-aiplatform.googleapis.com/v1/projects/{vertex_project}/locations/{vertex_location}{request_route}"
elif "cachedContents" in request_route:
_model = request_data.get("model")
if _model is not None and "/publishers/google/models/" not in _model:
request_data["model"] = (
f"projects/{vertex_project}/locations/{vertex_location}/publishers/google/models/{_model}"
)
url = f"https://{vertex_location}-aiplatform.googleapis.com/v1beta1/projects/{vertex_project}/locations/{vertex_location}{request_route}"
else:
raise ValueError(f"Unsupported Vertex AI request route: {request_route}")
if self.async_handler is None:
raise ValueError("VertexAI Fine Tuning - async_handler is not initialized")
response = await self.async_handler.post(
headers=headers,
url=url,
json=request_data, # type: ignore
)
if response.status_code != 200:
raise Exception(
f"Error creating fine tuning job. Status code: {response.status_code}. Response: {response.text}"
)
response_json = response.json()
return response_json