forked from phoenix/litellm-mirror
fix(proxy_server.py): enabling user auth via ui
https://github.com/BerriAI/litellm/issues/1231
This commit is contained in:
parent
ca40a88987
commit
a41e56a730
6 changed files with 233 additions and 128 deletions
|
@ -207,7 +207,6 @@ async def user_api_key_auth(
|
|||
return UserAPIKeyAuth()
|
||||
|
||||
route: str = request.url.path
|
||||
print(f"route: {route}")
|
||||
if route == "/user/auth":
|
||||
if general_settings.get("allow_user_auth", False) == True:
|
||||
return UserAPIKeyAuth()
|
||||
|
@ -226,8 +225,10 @@ async def user_api_key_auth(
|
|||
return UserAPIKeyAuth(api_key=master_key)
|
||||
|
||||
if (
|
||||
route.startswith("/key/") or route.startswith("/user/")
|
||||
) and not is_master_key_valid:
|
||||
(route.startswith("/key/") or route.startswith("/user/"))
|
||||
and not is_master_key_valid
|
||||
and general_settings.get("allow_user_auth", False) != True
|
||||
):
|
||||
raise Exception(
|
||||
f"If master key is set, only master key can be used to generate, delete, update or get info for new keys/users"
|
||||
)
|
||||
|
@ -1733,6 +1734,7 @@ async def user_auth(request: Request):
|
|||
|
||||
data = await request.json() # type: ignore
|
||||
user_email = data["user_email"]
|
||||
page_params = data["page"]
|
||||
if user_email is None:
|
||||
raise HTTPException(status_code=400, detail="User email is none")
|
||||
|
||||
|
@ -1752,7 +1754,6 @@ async def user_auth(request: Request):
|
|||
response = await prisma_client.get_generic_data(
|
||||
key="user_email", value=user_email, db="users"
|
||||
)
|
||||
print(f"response: {response}")
|
||||
### if so - generate a 24 hr key with that user id
|
||||
if response is not None:
|
||||
user_id = response.user_id
|
||||
|
@ -1772,11 +1773,10 @@ async def user_auth(request: Request):
|
|||
"from": f"LiteLLM Proxy <{os.getenv('RESEND_API_EMAIL')}>",
|
||||
"to": [user_email],
|
||||
"subject": "Your Magic Link",
|
||||
"html": f"<strong> Follow this link, to login:\n\n{base_url}user/?token={response['token']}&user_id={response['user_id']}</strong>",
|
||||
"html": f"<strong> Follow this link, to login:\n\n{base_url}user/?token={response['token']}&user_id={response['user_id']}&page={page_params}</strong>",
|
||||
}
|
||||
|
||||
email = resend.Emails.send(params)
|
||||
print(email)
|
||||
return "Email sent!"
|
||||
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ def update_config(proxy_url, allowed_email_subdomain, admin_emails):
|
|||
st.session_state["admin_emails"] = admin_emails
|
||||
st.session_state[
|
||||
"user_auth_url"
|
||||
] = f"{your_base_url}/?page={encode_config(proxy_url=proxy_url, allowed_email_subdomain=allowed_email_subdomain, admin_emails=admin_emails)}"
|
||||
] = f"{your_base_url}/user?page={encode_config(proxy_url=proxy_url, allowed_email_subdomain=allowed_email_subdomain, admin_emails=admin_emails)}"
|
||||
st.session_state["is_admin"] = True
|
||||
|
||||
|
||||
|
@ -144,3 +144,6 @@ def admin_page(is_admin="NOT_GIVEN"):
|
|||
st.success("Form submitted successfully!")
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
|
||||
admin_page()
|
||||
|
|
161
ui/app.py
161
ui/app.py
|
@ -7,81 +7,92 @@ load_dotenv()
|
|||
import streamlit as st
|
||||
import base64, binascii, os, json
|
||||
from admin import admin_page
|
||||
from auth import auth_page, verify_with_otp
|
||||
from .pages.auth import auth_page, verify_with_otp
|
||||
import urllib.parse
|
||||
|
||||
|
||||
# Parse the query params in the URL
|
||||
def get_query_params():
|
||||
# Get the query params from Streamlit's `server.request` function
|
||||
# This functionality is not officially documented and could change in the future versions of Streamlit
|
||||
query_params = st.experimental_get_query_params()
|
||||
return query_params
|
||||
|
||||
|
||||
def is_base64(sb):
|
||||
try:
|
||||
if isinstance(sb, str):
|
||||
# Try to encode it to bytes if it's a unicode string
|
||||
sb_bytes = sb.encode("ascii")
|
||||
elif isinstance(sb, bytes):
|
||||
sb_bytes = sb
|
||||
else:
|
||||
# If it is not a byte or a string, it is not base64
|
||||
return False
|
||||
|
||||
# Check if decoding is successful.
|
||||
decoded_params = base64.urlsafe_b64decode(sb_bytes)
|
||||
|
||||
# If the decode was successful, the input is likely base64
|
||||
return True, decoded_params
|
||||
except (binascii.Error, ValueError):
|
||||
# If an error occurs, return False, as the input is not base64
|
||||
return False
|
||||
|
||||
|
||||
# Check the URL path and route to the correct page based on the path
|
||||
query_params = get_query_params()
|
||||
print(f"query_params: {query_params}")
|
||||
page_param = query_params.get("page", [None])[0]
|
||||
token_hash = query_params.get("token_hash", [None])[0]
|
||||
decoded_token = None
|
||||
if token_hash is not None:
|
||||
print(f"token_hash: {token_hash}")
|
||||
decoded_token = verify_with_otp(token=token_hash)
|
||||
print(f"decoded_token: {decoded_token}")
|
||||
if page_param is not None:
|
||||
try:
|
||||
print(f"page_param: {page_param}")
|
||||
# Try to decode the page_param from base64
|
||||
is_valid, decoded_params = is_base64(page_param)
|
||||
print(f"is_valid: {is_valid}; decoded_params: {decoded_params}")
|
||||
if is_valid:
|
||||
if decoded_token is None:
|
||||
auth_page(page_param=page_param)
|
||||
else:
|
||||
# Convert the bytes to a string
|
||||
params_str = decoded_params.decode("utf-8")
|
||||
|
||||
# Parse the parameters
|
||||
params = urllib.parse.parse_qs(params_str)
|
||||
|
||||
# Extract the value of admin_emails
|
||||
admin_emails = params.get("admin_emails", [""])[0].split(",")
|
||||
|
||||
print(admin_emails)
|
||||
print(vars(decoded_token.user))
|
||||
if decoded_token.user.email in admin_emails:
|
||||
# admin ui
|
||||
admin_page(is_admin=True)
|
||||
else:
|
||||
# user ui
|
||||
st.write(
|
||||
f"email: {decoded_token.user.email}; admin_emails: {admin_emails}"
|
||||
)
|
||||
else:
|
||||
st.error("Unknown page")
|
||||
except Exception as e:
|
||||
st.error("Failed to decode the page parameter. Error: " + str(e))
|
||||
"""
|
||||
if user:
|
||||
if token + user_id:
|
||||
user_page(user)
|
||||
else:
|
||||
auth_page()
|
||||
else:
|
||||
admin_page(is_admin=False)
|
||||
# Fallback to default admin page
|
||||
admin_page()
|
||||
"""
|
||||
|
||||
|
||||
# # Parse the query params in the URL
|
||||
# def get_query_params():
|
||||
# # Get the query params from Streamlit's `server.request` function
|
||||
# # This functionality is not officially documented and could change in the future versions of Streamlit
|
||||
# query_params = st.experimental_get_query_params()
|
||||
# return query_params
|
||||
|
||||
|
||||
# def is_base64(sb):
|
||||
# try:
|
||||
# if isinstance(sb, str):
|
||||
# # Try to encode it to bytes if it's a unicode string
|
||||
# sb_bytes = sb.encode("ascii")
|
||||
# elif isinstance(sb, bytes):
|
||||
# sb_bytes = sb
|
||||
# else:
|
||||
# # If it is not a byte or a string, it is not base64
|
||||
# return False
|
||||
|
||||
# # Check if decoding is successful.
|
||||
# decoded_params = base64.urlsafe_b64decode(sb_bytes)
|
||||
|
||||
# # If the decode was successful, the input is likely base64
|
||||
# return True, decoded_params
|
||||
# except (binascii.Error, ValueError):
|
||||
# # If an error occurs, return False, as the input is not base64
|
||||
# return False
|
||||
|
||||
|
||||
# # # Check the URL path and route to the correct page based on the path
|
||||
# # query_params = get_query_params()
|
||||
# # print(f"query_params: {query_params}")
|
||||
# # page_param = query_params.get("page", [None])[0]
|
||||
# # token_hash = query_params.get("token_hash", [None])[0]
|
||||
# # decoded_token = None
|
||||
# # if token_hash is not None:
|
||||
# # print(f"token_hash: {token_hash}")
|
||||
# # decoded_token = verify_with_otp(token=token_hash)
|
||||
# # print(f"decoded_token: {decoded_token}")
|
||||
# # if page_param is not None:
|
||||
# # try:
|
||||
# # print(f"page_param: {page_param}")
|
||||
# # # Try to decode the page_param from base64
|
||||
# # is_valid, decoded_params = is_base64(page_param)
|
||||
# # print(f"is_valid: {is_valid}; decoded_params: {decoded_params}")
|
||||
# # if is_valid:
|
||||
# # if decoded_token is None:
|
||||
# # auth_page(page_param=page_param)
|
||||
# # else:
|
||||
# # # Convert the bytes to a string
|
||||
# # params_str = decoded_params.decode("utf-8")
|
||||
|
||||
# # # Parse the parameters
|
||||
# # params = urllib.parse.parse_qs(params_str)
|
||||
|
||||
# # # Extract the value of admin_emails
|
||||
# # admin_emails = params.get("admin_emails", [""])[0].split(",")
|
||||
|
||||
# # print(admin_emails)
|
||||
# # print(vars(decoded_token.user))
|
||||
# # if decoded_token.user.email in admin_emails:
|
||||
# # # admin ui
|
||||
# # admin_page(is_admin=True)
|
||||
# # else:
|
||||
# # # user ui
|
||||
# # st.write(
|
||||
# # f"email: {decoded_token.user.email}; admin_emails: {admin_emails}"
|
||||
# # )
|
||||
# # else:
|
||||
# # st.error("Unknown page")
|
||||
# # except Exception as e:
|
||||
# # st.error("Failed to decode the page parameter. Error: " + str(e))
|
||||
# # else:
|
||||
# # admin_page(is_admin=False)
|
||||
|
|
45
ui/auth.py
45
ui/auth.py
|
@ -1,45 +0,0 @@
|
|||
"""
|
||||
Auth in user, to proxy ui.
|
||||
|
||||
Uses supabase passwordless auth: https://supabase.com/docs/reference/python/auth-signinwithotp
|
||||
|
||||
Remember to set your redirect url to 8501 (streamlit default).
|
||||
"""
|
||||
import streamlit as st
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
import os
|
||||
from supabase import create_client, Client
|
||||
|
||||
# Set up Supabase client
|
||||
url: str = os.environ.get("SUPABASE_URL")
|
||||
key: str = os.environ.get("SUPABASE_KEY")
|
||||
supabase: Client = create_client(url, key)
|
||||
|
||||
|
||||
def sign_in_with_otp(email: str, page_param: str):
|
||||
print(f"received page param: {page_param}")
|
||||
data = supabase.auth.sign_in_with_otp(
|
||||
{"email": email, "options": {"data": {"page_param": page_param}}}
|
||||
)
|
||||
print(f"data: {data}")
|
||||
# Redirect to Supabase UI with the return data
|
||||
st.write(f"Please check your email for a login link!")
|
||||
|
||||
|
||||
def verify_with_otp(token: str):
|
||||
res = supabase.auth.verify_otp({"token_hash": token, "type": "email"})
|
||||
return res
|
||||
|
||||
|
||||
# Create the Streamlit app
|
||||
def auth_page(page_param: str):
|
||||
st.title("User Authentication")
|
||||
|
||||
# User email input
|
||||
email = st.text_input("Enter your email")
|
||||
|
||||
# Sign in button
|
||||
if st.button("Sign In"):
|
||||
sign_in_with_otp(email, page_param=page_param)
|
137
ui/pages/user.py
Normal file
137
ui/pages/user.py
Normal file
|
@ -0,0 +1,137 @@
|
|||
"""
|
||||
Auth in user, to proxy ui.
|
||||
|
||||
Uses supabase passwordless auth: https://supabase.com/docs/reference/python/auth-signinwithotp
|
||||
|
||||
Remember to set your redirect url to 8501 (streamlit default).
|
||||
"""
|
||||
import streamlit as st
|
||||
from dotenv import load_dotenv
|
||||
import requests, base64, binascii
|
||||
|
||||
load_dotenv()
|
||||
import os
|
||||
|
||||
|
||||
def is_base64(sb):
|
||||
try:
|
||||
if isinstance(sb, str):
|
||||
# Try to encode it to bytes if it's a unicode string
|
||||
sb_bytes = sb.encode("ascii")
|
||||
elif isinstance(sb, bytes):
|
||||
sb_bytes = sb
|
||||
else:
|
||||
# If it is not a byte or a string, it is not base64
|
||||
return False
|
||||
|
||||
# Check if decoding is successful.
|
||||
decoded_params = base64.urlsafe_b64decode(sb_bytes)
|
||||
params_str = decoded_params.decode("utf-8")
|
||||
param_dict = {}
|
||||
# split on the &
|
||||
params = params_str.split("&")
|
||||
# split on the =
|
||||
for param in params:
|
||||
split_val = param.split("=")
|
||||
param_dict[split_val[0].strip()] = split_val[1].strip()
|
||||
# If the decode was successful, the input is likely base64
|
||||
return True, param_dict
|
||||
except (binascii.Error, ValueError):
|
||||
# If an error occurs, return False, as the input is not base64
|
||||
return False
|
||||
|
||||
|
||||
def sign_in_with_otp(email: str, page_param: str):
|
||||
print(f"received page param: {page_param}")
|
||||
b64_flag, decoded_params = is_base64(sb=page_param)
|
||||
print(f"b64_flag: {b64_flag}")
|
||||
st.write(f"Decoded params: {decoded_params}")
|
||||
# requests.post()
|
||||
# data = supabase.auth.sign_in_with_otp(
|
||||
# {"email": email, "options": {"data": {"page_param": page_param}}}
|
||||
# )
|
||||
# print(f"data: {data}")
|
||||
# # Redirect to Supabase UI with the return data
|
||||
# st.write(f"Please check your email for a login link!")
|
||||
|
||||
|
||||
# def verify_with_otp(token: str):
|
||||
# res = supabase.auth.verify_otp({"token_hash": token, "type": "email"})
|
||||
# return res
|
||||
|
||||
|
||||
# Create the Streamlit app
|
||||
def auth_page(page_param: str):
|
||||
st.title("User Authentication")
|
||||
|
||||
# User email input
|
||||
email = st.text_input("Enter your email")
|
||||
|
||||
# Sign in button
|
||||
if st.button("Sign In"):
|
||||
b64_flag, decoded_params = is_base64(sb=page_param)
|
||||
# Define the endpoint you want to make a POST request to
|
||||
if decoded_params["proxy_url"].endswith("/"):
|
||||
post_endpoint = f"{decoded_params['proxy_url']}user/auth"
|
||||
else:
|
||||
post_endpoint = f"{decoded_params['proxy_url']}/user/auth"
|
||||
|
||||
try:
|
||||
assert email.split("@")[1] in decoded_params["accepted_email_subdomain"]
|
||||
except:
|
||||
raise Exception(
|
||||
f"Only emails from {decoded_params['accepted_email_subdomain']} are allowed"
|
||||
)
|
||||
response = requests.post(
|
||||
post_endpoint, json={"user_email": email, "page": page_param}
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
# Success!
|
||||
st.success(f"Email sent successfully!")
|
||||
|
||||
|
||||
def user_page(page_param: str, user_id: str, token: str):
|
||||
st.title("User Configuration")
|
||||
|
||||
# When the button is clicked
|
||||
if st.button("Create Key"):
|
||||
b64_flag, decoded_params = is_base64(sb=page_param)
|
||||
# Define the endpoint you want to make a POST request to
|
||||
if decoded_params["proxy_url"].endswith("/"):
|
||||
post_endpoint = f"{decoded_params['proxy_url']}key/generate"
|
||||
else:
|
||||
post_endpoint = f"{decoded_params['proxy_url']}/key/generate"
|
||||
# Make a POST request to the endpoint
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
response = requests.post(
|
||||
post_endpoint, json={"duration": "1hr", "user_id": user_id}, headers=headers
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
# Success! You can handle the JSON response if you're expecting one
|
||||
st.success("Key created successfully!")
|
||||
response_data = response.json()
|
||||
st.success(f"API Key: {response_data['key']}")
|
||||
|
||||
|
||||
def router():
|
||||
query_params = st.experimental_get_query_params()
|
||||
page_param = query_params.get("page", None)[0]
|
||||
if (
|
||||
query_params.get("token", None) is not None
|
||||
and query_params.get("user_id", None) is not None
|
||||
):
|
||||
# render user page
|
||||
user_page(
|
||||
page_param=page_param,
|
||||
user_id=query_params.get("user_id")[0],
|
||||
token=query_params.get("token")[0],
|
||||
)
|
||||
elif page_param is not None:
|
||||
auth_page(page_param=page_param)
|
||||
else:
|
||||
st.write("Please setup proxy")
|
||||
|
||||
|
||||
router()
|
|
@ -1,4 +1,3 @@
|
|||
streamlit
|
||||
streamlit_url_fragment
|
||||
python-dotenv
|
||||
supabase
|
Loading…
Add table
Add a link
Reference in a new issue