forked from phoenix/litellm-mirror
Revert "Merge pull request #2593 from BerriAI/litellm_reset_budget_fix"
This reverts commitafd363129f
, reversing changes made toc94bc94ad5
.
This commit is contained in:
parent
3a866dd349
commit
2dfdc8dd69
4 changed files with 42 additions and 40 deletions
|
@ -53,6 +53,11 @@ RUN pip install *.whl /wheels/* --no-index --find-links=/wheels/ && rm -f *.whl
|
||||||
# install semantic-cache [Experimental]- we need this here and not in requirements.txt because redisvl pins to pydantic 1.0
|
# install semantic-cache [Experimental]- we need this here and not in requirements.txt because redisvl pins to pydantic 1.0
|
||||||
RUN pip install redisvl==0.0.7 --no-deps
|
RUN pip install redisvl==0.0.7 --no-deps
|
||||||
|
|
||||||
|
# ensure pyjwt is used, not jwt
|
||||||
|
RUN pip uninstall jwt -y
|
||||||
|
RUN pip uninstall PyJWT -y
|
||||||
|
RUN pip install PyJWT --no-cache-dir
|
||||||
|
|
||||||
# Build Admin UI
|
# Build Admin UI
|
||||||
RUN chmod +x build_admin_ui.sh && ./build_admin_ui.sh
|
RUN chmod +x build_admin_ui.sh && ./build_admin_ui.sh
|
||||||
|
|
||||||
|
|
|
@ -2661,12 +2661,6 @@ async def startup_event():
|
||||||
### START BUDGET SCHEDULER ###
|
### START BUDGET SCHEDULER ###
|
||||||
if prisma_client is not None:
|
if prisma_client is not None:
|
||||||
scheduler = AsyncIOScheduler()
|
scheduler = AsyncIOScheduler()
|
||||||
verbose_proxy_logger.debug(
|
|
||||||
f"proxy_budget_rescheduler_max_time: {proxy_budget_rescheduler_max_time}"
|
|
||||||
)
|
|
||||||
verbose_proxy_logger.debug(
|
|
||||||
f"proxy_budget_rescheduler_min_time: {proxy_budget_rescheduler_min_time}"
|
|
||||||
)
|
|
||||||
interval = random.randint(
|
interval = random.randint(
|
||||||
proxy_budget_rescheduler_min_time, proxy_budget_rescheduler_max_time
|
proxy_budget_rescheduler_min_time, proxy_budget_rescheduler_max_time
|
||||||
) # random interval, so multiple workers avoid resetting budget at the same time
|
) # random interval, so multiple workers avoid resetting budget at the same time
|
||||||
|
@ -4999,7 +4993,6 @@ async def user_info(
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
global prisma_client
|
global prisma_client
|
||||||
verbose_proxy_logger.debug(f"Received `/user/info` call for user_id={user_id}")
|
|
||||||
try:
|
try:
|
||||||
if prisma_client is None:
|
if prisma_client is None:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
|
@ -5007,9 +5000,7 @@ async def user_info(
|
||||||
)
|
)
|
||||||
## GET USER ROW ##
|
## GET USER ROW ##
|
||||||
if user_id is not None:
|
if user_id is not None:
|
||||||
verbose_proxy_logger.debug(f"Making get_data call for user_id={user_id}")
|
|
||||||
user_info = await prisma_client.get_data(user_id=user_id)
|
user_info = await prisma_client.get_data(user_id=user_id)
|
||||||
verbose_proxy_logger.debug(f"Received get_data for user_id={user_id}")
|
|
||||||
elif view_all == True:
|
elif view_all == True:
|
||||||
if page is None:
|
if page is None:
|
||||||
page = 0
|
page = 0
|
||||||
|
@ -5090,7 +5081,6 @@ async def user_info(
|
||||||
key = key.dict()
|
key = key.dict()
|
||||||
key.pop("token", None)
|
key.pop("token", None)
|
||||||
|
|
||||||
verbose_proxy_logger.debug(f"RETURNING RESPONSE FOR /USER/INFO call ")
|
|
||||||
response_data = {
|
response_data = {
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
"user_info": user_info,
|
"user_info": user_info,
|
||||||
|
|
|
@ -1820,22 +1820,35 @@ async def reset_budget(prisma_client: PrismaClient):
|
||||||
if prisma_client is not None:
|
if prisma_client is not None:
|
||||||
### RESET KEY BUDGET ###
|
### RESET KEY BUDGET ###
|
||||||
now = datetime.utcnow()
|
now = datetime.utcnow()
|
||||||
asyncio.create_task(
|
keys_to_reset = await prisma_client.get_data(
|
||||||
prisma_client.db.litellm_verificationtoken.update_many(
|
table_name="key", query_type="find_all", expires=now, reset_at=now
|
||||||
where={"budget_reset_at": {"lt": now}},
|
|
||||||
data={"spend": 0, "budget_reset_at": now},
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
### RESET USER BUDGET ###
|
if keys_to_reset is not None and len(keys_to_reset) > 0:
|
||||||
try:
|
for key in keys_to_reset:
|
||||||
await prisma_client.db.litellm_usertable.update_many(
|
key.spend = 0.0
|
||||||
where={"budget_reset_at": {"lt": now}},
|
duration_s = _duration_in_seconds(duration=key.budget_duration)
|
||||||
data={"spend": 0, "budget_reset_at": now},
|
key.budget_reset_at = now + timedelta(seconds=duration_s)
|
||||||
|
|
||||||
|
await prisma_client.update_data(
|
||||||
|
query_type="update_many", data_list=keys_to_reset, table_name="key"
|
||||||
|
)
|
||||||
|
|
||||||
|
### RESET USER BUDGET ###
|
||||||
|
now = datetime.utcnow()
|
||||||
|
users_to_reset = await prisma_client.get_data(
|
||||||
|
table_name="user", query_type="find_all", reset_at=now
|
||||||
|
)
|
||||||
|
|
||||||
|
if users_to_reset is not None and len(users_to_reset) > 0:
|
||||||
|
for user in users_to_reset:
|
||||||
|
user.spend = 0.0
|
||||||
|
duration_s = _duration_in_seconds(duration=user.budget_duration)
|
||||||
|
user.budget_reset_at = now + timedelta(seconds=duration_s)
|
||||||
|
|
||||||
|
await prisma_client.update_data(
|
||||||
|
query_type="update_many", data_list=users_to_reset, table_name="user"
|
||||||
)
|
)
|
||||||
except Exception as e:
|
|
||||||
verbose_proxy_logger.debug(f"An exception occurs - {str(e)}")
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
async def update_spend(
|
async def update_spend(
|
||||||
|
|
|
@ -105,7 +105,6 @@ async def test_user_update():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(reason="Flaky test on circle ci ci/cd.")
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_users_budgets_reset():
|
async def test_users_budgets_reset():
|
||||||
"""
|
"""
|
||||||
|
@ -115,16 +114,6 @@ async def test_users_budgets_reset():
|
||||||
- Check if value updated
|
- Check if value updated
|
||||||
"""
|
"""
|
||||||
get_user = f"krrish_{time.time()}@berri.ai"
|
get_user = f"krrish_{time.time()}@berri.ai"
|
||||||
|
|
||||||
async def retry_request(func, *args, _max_attempts=5, **kwargs):
|
|
||||||
for attempt in range(_max_attempts):
|
|
||||||
try:
|
|
||||||
return await func(*args, **kwargs)
|
|
||||||
except aiohttp.client_exceptions.ClientOSError as e:
|
|
||||||
if attempt + 1 == _max_attempts:
|
|
||||||
raise # re-raise the last ClientOSError if all attempts failed
|
|
||||||
print(f"Attempt {attempt+1} failed, retrying...")
|
|
||||||
|
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
key_gen = await new_user(
|
key_gen = await new_user(
|
||||||
session, 0, user_id=get_user, budget=10, budget_duration="5s"
|
session, 0, user_id=get_user, budget=10, budget_duration="5s"
|
||||||
|
@ -136,12 +125,17 @@ async def test_users_budgets_reset():
|
||||||
reset_at_init_value = user_info["user_info"]["budget_reset_at"]
|
reset_at_init_value = user_info["user_info"]["budget_reset_at"]
|
||||||
i = 0
|
i = 0
|
||||||
reset_at_new_value = None
|
reset_at_new_value = None
|
||||||
await asyncio.sleep(70)
|
while i < 3:
|
||||||
user_info = await retry_request(
|
await asyncio.sleep(70)
|
||||||
get_user_info, session=session, get_user=get_user, call_user=key
|
user_info = await get_user_info(
|
||||||
)
|
session=session, get_user=get_user, call_user=key
|
||||||
reset_at_new_value = user_info["user_info"]["budget_reset_at"]
|
)
|
||||||
|
reset_at_new_value = user_info["user_info"]["budget_reset_at"]
|
||||||
|
try:
|
||||||
|
assert reset_at_init_value != reset_at_new_value
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
i + 1
|
||||||
assert reset_at_init_value != reset_at_new_value
|
assert reset_at_init_value != reset_at_new_value
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue