refactor(proxy_server.py): refactoring background rq worker

This commit is contained in:
Krrish Dholakia 2023-11-21 13:47:00 -08:00
parent 760a465bd2
commit 68c955409d
4 changed files with 52 additions and 43 deletions

View file

@ -1,32 +0,0 @@
import os
import subprocess
import sys
import multiprocessing
from dotenv import load_dotenv
load_dotenv()
def run_rq_worker(redis_url):
command = ["rq", "worker", "--url", redis_url]
subprocess.run(command)
def start_rq_worker_in_background():
# Set OBJC_DISABLE_INITIALIZE_FORK_SAFETY to YES
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
# Check if required environment variables are set
required_vars = ["REDIS_USERNAME", "REDIS_PASSWORD", "REDIS_HOST", "REDIS_PORT"]
missing_vars = [var for var in required_vars if var not in os.environ]
if missing_vars:
print(f"Error: Redis environment variables not set. Please set {', '.join(missing_vars)}.")
sys.exit(1)
# Construct Redis URL
REDIS_URL = f"redis://{os.environ['REDIS_USERNAME']}:{os.environ['REDIS_PASSWORD']}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}"
# Run rq worker in a separate process
worker_process = multiprocessing.Process(target=run_rq_worker, args=(REDIS_URL,))
worker_process.start()
if __name__ == "__main__":
start_rq_worker_in_background()

View file

@ -0,0 +1,30 @@
import sys, os
from rq import Worker, Queue, Connection
from redis import Redis
from dotenv import load_dotenv
load_dotenv()
# Add the path to the local folder to sys.path
sys.path.insert(
0, os.path.abspath("../../..")
) # Adds the parent directory to the system path - for litellm local dev
# # Import your local module
# import litellm
# from litellm import litellm_queue_completion
# Set up RQ connection
redis_conn = Redis(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"), password=os.getenv("REDIS_PASSWORD"))
print(redis_conn.ping()) # Should print True if connected successfully
# Create a worker and add the queue
try:
queue = Queue(connection=redis_conn)
worker = Worker([queue], connection=redis_conn)
except Exception as e:
print(f"Error setting up worker: {e}")
exit()
# Run the worker
if __name__ == '__main__':
with Connection(redis_conn):
worker.work()