mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 10:44:24 +00:00
71 lines
No EOL
2.5 KiB
Python
71 lines
No EOL
2.5 KiB
Python
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
import json, subprocess
|
|
import psutil # Import the psutil library
|
|
import atexit
|
|
try:
|
|
### OPTIONAL DEPENDENCIES ### - pip install redis and celery only when a user opts into using the async endpoints which require both
|
|
from celery import Celery
|
|
import redis
|
|
except:
|
|
import sys
|
|
|
|
subprocess.check_call(
|
|
[
|
|
sys.executable,
|
|
"-m",
|
|
"pip",
|
|
"install",
|
|
"redis",
|
|
"celery"
|
|
]
|
|
)
|
|
|
|
import time
|
|
import sys, os
|
|
sys.path.insert(
|
|
0, os.path.abspath("../../..")
|
|
) # Adds the parent directory to the system path - for litellm local dev
|
|
import litellm
|
|
|
|
# Redis connection setup
|
|
pool = redis.ConnectionPool(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"), password=os.getenv("REDIS_PASSWORD"), db=0, max_connections=5)
|
|
redis_client = redis.Redis(connection_pool=pool)
|
|
|
|
# Celery setup
|
|
celery_app = Celery('tasks', broker=f"redis://default:{os.getenv('REDIS_PASSWORD')}@{os.getenv('REDIS_HOST')}:{os.getenv('REDIS_PORT')}", backend=f"redis://default:{os.getenv('REDIS_PASSWORD')}@{os.getenv('REDIS_HOST')}:{os.getenv('REDIS_PORT')}")
|
|
celery_app.conf.update(
|
|
broker_pool_limit = None,
|
|
broker_transport_options = {'connection_pool': pool},
|
|
result_backend_transport_options = {'connection_pool': pool},
|
|
)
|
|
|
|
|
|
# Celery task
|
|
@celery_app.task(name='process_job', max_retries=3)
|
|
def process_job(*args, **kwargs):
|
|
try:
|
|
llm_router: litellm.Router = litellm.Router(model_list=kwargs.pop("llm_model_list"))
|
|
response = llm_router.completion(*args, **kwargs) # type: ignore
|
|
if isinstance(response, litellm.ModelResponse):
|
|
response = response.model_dump_json()
|
|
return json.loads(response)
|
|
return str(response)
|
|
except Exception as e:
|
|
raise e
|
|
|
|
# Ensure Celery workers are terminated when the script exits
|
|
def cleanup():
|
|
try:
|
|
# Get a list of all running processes
|
|
for process in psutil.process_iter(attrs=['pid', 'name']):
|
|
# Check if the process is a Celery worker process
|
|
if process.info['name'] == 'celery':
|
|
print(f"Terminating Celery worker with PID {process.info['pid']}")
|
|
# Terminate the Celery worker process
|
|
psutil.Process(process.info['pid']).terminate()
|
|
except Exception as e:
|
|
print(f"Error during cleanup: {e}")
|
|
|
|
# Register the cleanup function to run when the script exits
|
|
atexit.register(cleanup) |