forked from phoenix/litellm-mirror
build(ui): separating admin + user ui
This commit is contained in:
parent
d85c19394f
commit
e0ea2aa147
4 changed files with 71 additions and 28 deletions
30
ui/admin.py
30
ui/admin.py
|
@ -12,28 +12,26 @@ your_base_url = os.getenv("BASE_URL") # Example base URL
|
|||
|
||||
|
||||
# Function to encode the configuration
|
||||
def encode_config(proxy_url, allowed_email_subdomain):
|
||||
combined_string = (
|
||||
f"proxy_url={proxy_url}&accepted_email_subdomain={allowed_email_subdomain}"
|
||||
)
|
||||
def encode_config(proxy_url, allowed_email_subdomain, admin_emails):
|
||||
combined_string = f"proxy_url={proxy_url}&accepted_email_subdomain={allowed_email_subdomain}&admin_emails={admin_emails}"
|
||||
return base64.b64encode(combined_string.encode("utf-8")).decode("utf-8")
|
||||
|
||||
|
||||
# Simple function to update config values
|
||||
def update_config(proxy_url, allowed_email_subdomain):
|
||||
def update_config(proxy_url, allowed_email_subdomain, admin_emails):
|
||||
st.session_state["proxy_url"] = proxy_url
|
||||
st.session_state["allowed_email_subdomain"] = allowed_email_subdomain
|
||||
st.session_state["admin_emails"] = admin_emails
|
||||
st.session_state[
|
||||
"user_auth_url"
|
||||
] = f"{your_base_url}/?page={encode_config(proxy_url=proxy_url, allowed_email_subdomain=allowed_email_subdomain)}"
|
||||
] = f"{your_base_url}/?page={encode_config(proxy_url=proxy_url, allowed_email_subdomain=allowed_email_subdomain, admin_emails=admin_emails)}"
|
||||
|
||||
|
||||
def admin_page():
|
||||
# Display the form for the admin to set the proxy URL and allowed email subdomain
|
||||
st.header("Admin Configuration")
|
||||
def proxy_setup():
|
||||
# Create a configuration placeholder
|
||||
st.session_state.setdefault("proxy_url", "http://example.com")
|
||||
st.session_state.setdefault("allowed_email_subdomain", "example.com")
|
||||
st.session_state.setdefault("admin_emails", "admin@example.com")
|
||||
st.session_state.setdefault("user_auth_url", "NOT_GIVEN")
|
||||
|
||||
with st.form("config_form", clear_on_submit=False):
|
||||
|
@ -41,11 +39,17 @@ def admin_page():
|
|||
allowed_email_subdomain = st.text_input(
|
||||
"Set Allowed Email Subdomain", st.session_state["allowed_email_subdomain"]
|
||||
)
|
||||
admin_emails = st.text_input(
|
||||
"Allowed Admin Emails (add ',' to separate multiple emails)",
|
||||
st.session_state["admin_emails"],
|
||||
)
|
||||
submitted = st.form_submit_button("Save")
|
||||
|
||||
if submitted:
|
||||
update_config(
|
||||
proxy_url=proxy_url, allowed_email_subdomain=allowed_email_subdomain
|
||||
proxy_url=proxy_url,
|
||||
allowed_email_subdomain=allowed_email_subdomain,
|
||||
admin_emails=admin_emails,
|
||||
)
|
||||
|
||||
# Display the current configuration
|
||||
|
@ -54,3 +58,9 @@ def admin_page():
|
|||
f"Current Allowed Email Subdomain: {st.session_state['allowed_email_subdomain']}"
|
||||
)
|
||||
st.write(f"Current User Auth URL: {st.session_state['user_auth_url']}")
|
||||
|
||||
|
||||
def admin_page(is_admin="NOT_GIVEN"):
|
||||
# Display the form for the admin to set the proxy URL and allowed email subdomain
|
||||
st.header("Admin Configuration")
|
||||
proxy_setup()
|
||||
|
|
50
ui/app.py
50
ui/app.py
|
@ -7,8 +7,8 @@ load_dotenv()
|
|||
import streamlit as st
|
||||
import base64, binascii, os
|
||||
from admin import admin_page
|
||||
from auth import auth_page
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
from auth import auth_page, verify_with_otp
|
||||
import urllib.parse
|
||||
|
||||
|
||||
# Parse the query params in the URL
|
||||
|
@ -31,11 +31,10 @@ def is_base64(sb):
|
|||
return False
|
||||
|
||||
# Check if decoding is successful.
|
||||
# The result of the decode is not required, so it is ignored.
|
||||
_ = base64.urlsafe_b64decode(sb_bytes)
|
||||
decoded_params = base64.urlsafe_b64decode(sb_bytes)
|
||||
|
||||
# If the decode was successful, the input is likely base64
|
||||
return True
|
||||
return True, decoded_params
|
||||
except (binascii.Error, ValueError):
|
||||
# If an error occurs, return False, as the input is not base64
|
||||
return False
|
||||
|
@ -43,17 +42,46 @@ def is_base64(sb):
|
|||
|
||||
# Check the URL path and route to the correct page based on the path
|
||||
query_params = get_query_params()
|
||||
print(f"query_params: {query_params}")
|
||||
page_param = query_params.get("page", [None])[0]
|
||||
|
||||
# Route to the appropriate page based on the URL query param
|
||||
if page_param:
|
||||
token_hash = query_params.get("token_hash", [None])[0]
|
||||
decoded_token = None
|
||||
if token_hash is not None:
|
||||
print(f"token_hash: {token_hash}")
|
||||
decoded_token = verify_with_otp(token=token_hash)
|
||||
print(f"decoded_token: {decoded_token}")
|
||||
if page_param is not None:
|
||||
try:
|
||||
print(f"page_param: {page_param}")
|
||||
# Try to decode the page_param from base64
|
||||
if is_base64(page_param):
|
||||
auth_page(redirect_url=f"{os.getenv('BASE_URL')}/{page_param}")
|
||||
is_valid, decoded_params = is_base64(page_param)
|
||||
print(f"is_valid: {is_valid}; decoded_params: {decoded_params}")
|
||||
if is_valid:
|
||||
if decoded_token is None:
|
||||
auth_page(page_param=page_param)
|
||||
else:
|
||||
# Convert the bytes to a string
|
||||
params_str = decoded_params.decode("utf-8")
|
||||
|
||||
# Parse the parameters
|
||||
params = urllib.parse.parse_qs(params_str)
|
||||
|
||||
# Extract the value of admin_emails
|
||||
admin_emails = params.get("admin_emails", [""])[0].split(",")
|
||||
|
||||
print(admin_emails)
|
||||
print(vars(decoded_token.user))
|
||||
if decoded_token.user.email in admin_emails:
|
||||
# admin ui
|
||||
admin_page(is_admin=True)
|
||||
else:
|
||||
# user ui
|
||||
st.write(
|
||||
f"email: {decoded_token.user.email}; admin_emails: {admin_emails}"
|
||||
)
|
||||
else:
|
||||
st.error("Unknown page")
|
||||
except Exception as e:
|
||||
st.error("Failed to decode the page parameter. Error: " + str(e))
|
||||
else:
|
||||
admin_page()
|
||||
admin_page(is_admin=False)
|
||||
|
|
17
ui/auth.py
17
ui/auth.py
|
@ -5,9 +5,6 @@ Uses supabase passwordless auth: https://supabase.com/docs/reference/python/auth
|
|||
|
||||
Remember to set your redirect url to 8501 (streamlit default).
|
||||
"""
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
import streamlit as st
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
@ -21,17 +18,23 @@ key: str = os.environ.get("SUPABASE_KEY")
|
|||
supabase: Client = create_client(url, key)
|
||||
|
||||
|
||||
def sign_in_with_otp(email: str, redirect_url: str):
|
||||
def sign_in_with_otp(email: str, page_param: str):
|
||||
print(f"received page param: {page_param}")
|
||||
data = supabase.auth.sign_in_with_otp(
|
||||
{"email": email, "options": {"email_redirect_to": redirect_url}}
|
||||
{"email": email, "options": {"data": {"page_param": page_param}}}
|
||||
)
|
||||
print(f"data: {data}")
|
||||
# Redirect to Supabase UI with the return data
|
||||
st.write(f"Please check your email for a login link!")
|
||||
|
||||
|
||||
def verify_with_otp(token: str):
|
||||
res = supabase.auth.verify_otp({"token_hash": token, "type": "email"})
|
||||
return res
|
||||
|
||||
|
||||
# Create the Streamlit app
|
||||
def auth_page(redirect_url: str):
|
||||
def auth_page(page_param: str):
|
||||
st.title("User Authentication")
|
||||
|
||||
# User email input
|
||||
|
@ -39,4 +42,4 @@ def auth_page(redirect_url: str):
|
|||
|
||||
# Sign in button
|
||||
if st.button("Sign In"):
|
||||
sign_in_with_otp(email, redirect_url=redirect_url)
|
||||
sign_in_with_otp(email, page_param=page_param)
|
||||
|
|
|
@ -1,2 +1,4 @@
|
|||
streamlit
|
||||
streamlit_url_fragment
|
||||
python-dotenv
|
||||
supabase
|
Loading…
Add table
Add a link
Reference in a new issue