diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py
index b524e08e8..d7a9b1024 100644
--- a/litellm/proxy/proxy_server.py
+++ b/litellm/proxy/proxy_server.py
@@ -207,7 +207,6 @@ async def user_api_key_auth(
return UserAPIKeyAuth()
route: str = request.url.path
- print(f"route: {route}")
if route == "/user/auth":
if general_settings.get("allow_user_auth", False) == True:
return UserAPIKeyAuth()
@@ -226,8 +225,10 @@ async def user_api_key_auth(
return UserAPIKeyAuth(api_key=master_key)
if (
- route.startswith("/key/") or route.startswith("/user/")
- ) and not is_master_key_valid:
+ (route.startswith("/key/") or route.startswith("/user/"))
+ and not is_master_key_valid
+ and general_settings.get("allow_user_auth", False) != True
+ ):
raise Exception(
f"If master key is set, only master key can be used to generate, delete, update or get info for new keys/users"
)
@@ -1733,6 +1734,7 @@ async def user_auth(request: Request):
data = await request.json() # type: ignore
user_email = data["user_email"]
+ page_params = data["page"]
if user_email is None:
raise HTTPException(status_code=400, detail="User email is none")
@@ -1752,7 +1754,6 @@ async def user_auth(request: Request):
response = await prisma_client.get_generic_data(
key="user_email", value=user_email, db="users"
)
- print(f"response: {response}")
### if so - generate a 24 hr key with that user id
if response is not None:
user_id = response.user_id
@@ -1772,11 +1773,10 @@ async def user_auth(request: Request):
"from": f"LiteLLM Proxy <{os.getenv('RESEND_API_EMAIL')}>",
"to": [user_email],
"subject": "Your Magic Link",
- "html": f" Follow this link, to login:\n\n{base_url}user/?token={response['token']}&user_id={response['user_id']}",
+ "html": f" Follow this link, to login:\n\n{base_url}user/?token={response['token']}&user_id={response['user_id']}&page={page_params}",
}
email = resend.Emails.send(params)
- print(email)
return "Email sent!"
diff --git a/ui/admin.py b/ui/admin.py
index 63152b676..196091d2b 100644
--- a/ui/admin.py
+++ b/ui/admin.py
@@ -24,7 +24,7 @@ def update_config(proxy_url, allowed_email_subdomain, admin_emails):
st.session_state["admin_emails"] = admin_emails
st.session_state[
"user_auth_url"
- ] = f"{your_base_url}/?page={encode_config(proxy_url=proxy_url, allowed_email_subdomain=allowed_email_subdomain, admin_emails=admin_emails)}"
+ ] = f"{your_base_url}/user?page={encode_config(proxy_url=proxy_url, allowed_email_subdomain=allowed_email_subdomain, admin_emails=admin_emails)}"
st.session_state["is_admin"] = True
@@ -144,3 +144,6 @@ def admin_page(is_admin="NOT_GIVEN"):
st.success("Form submitted successfully!")
except Exception as e:
raise e
+
+
+admin_page()
diff --git a/ui/app.py b/ui/app.py
index eb82fafca..bbe403d47 100644
--- a/ui/app.py
+++ b/ui/app.py
@@ -7,81 +7,92 @@ load_dotenv()
import streamlit as st
import base64, binascii, os, json
from admin import admin_page
-from auth import auth_page, verify_with_otp
+from .pages.auth import auth_page, verify_with_otp
import urllib.parse
-
-# Parse the query params in the URL
-def get_query_params():
- # Get the query params from Streamlit's `server.request` function
- # This functionality is not officially documented and could change in the future versions of Streamlit
- query_params = st.experimental_get_query_params()
- return query_params
-
-
-def is_base64(sb):
- try:
- if isinstance(sb, str):
- # Try to encode it to bytes if it's a unicode string
- sb_bytes = sb.encode("ascii")
- elif isinstance(sb, bytes):
- sb_bytes = sb
- else:
- # If it is not a byte or a string, it is not base64
- return False
-
- # Check if decoding is successful.
- decoded_params = base64.urlsafe_b64decode(sb_bytes)
-
- # If the decode was successful, the input is likely base64
- return True, decoded_params
- except (binascii.Error, ValueError):
- # If an error occurs, return False, as the input is not base64
- return False
-
-
-# Check the URL path and route to the correct page based on the path
-query_params = get_query_params()
-print(f"query_params: {query_params}")
-page_param = query_params.get("page", [None])[0]
-token_hash = query_params.get("token_hash", [None])[0]
-decoded_token = None
-if token_hash is not None:
- print(f"token_hash: {token_hash}")
- decoded_token = verify_with_otp(token=token_hash)
- print(f"decoded_token: {decoded_token}")
-if page_param is not None:
- try:
- print(f"page_param: {page_param}")
- # Try to decode the page_param from base64
- is_valid, decoded_params = is_base64(page_param)
- print(f"is_valid: {is_valid}; decoded_params: {decoded_params}")
- if is_valid:
- if decoded_token is None:
- auth_page(page_param=page_param)
- else:
- # Convert the bytes to a string
- params_str = decoded_params.decode("utf-8")
-
- # Parse the parameters
- params = urllib.parse.parse_qs(params_str)
-
- # Extract the value of admin_emails
- admin_emails = params.get("admin_emails", [""])[0].split(",")
-
- print(admin_emails)
- print(vars(decoded_token.user))
- if decoded_token.user.email in admin_emails:
- # admin ui
- admin_page(is_admin=True)
- else:
- # user ui
- st.write(
- f"email: {decoded_token.user.email}; admin_emails: {admin_emails}"
- )
- else:
- st.error("Unknown page")
- except Exception as e:
- st.error("Failed to decode the page parameter. Error: " + str(e))
+"""
+if user:
+ if token + user_id:
+ user_page(user)
+ else:
+ auth_page()
else:
- admin_page(is_admin=False)
+ # Fallback to default admin page
+ admin_page()
+"""
+
+
+# # Parse the query params in the URL
+# def get_query_params():
+# # Get the query params from Streamlit's `server.request` function
+# # This functionality is not officially documented and could change in the future versions of Streamlit
+# query_params = st.experimental_get_query_params()
+# return query_params
+
+
+# def is_base64(sb):
+# try:
+# if isinstance(sb, str):
+# # Try to encode it to bytes if it's a unicode string
+# sb_bytes = sb.encode("ascii")
+# elif isinstance(sb, bytes):
+# sb_bytes = sb
+# else:
+# # If it is not a byte or a string, it is not base64
+# return False
+
+# # Check if decoding is successful.
+# decoded_params = base64.urlsafe_b64decode(sb_bytes)
+
+# # If the decode was successful, the input is likely base64
+# return True, decoded_params
+# except (binascii.Error, ValueError):
+# # If an error occurs, return False, as the input is not base64
+# return False
+
+
+# # # Check the URL path and route to the correct page based on the path
+# # query_params = get_query_params()
+# # print(f"query_params: {query_params}")
+# # page_param = query_params.get("page", [None])[0]
+# # token_hash = query_params.get("token_hash", [None])[0]
+# # decoded_token = None
+# # if token_hash is not None:
+# # print(f"token_hash: {token_hash}")
+# # decoded_token = verify_with_otp(token=token_hash)
+# # print(f"decoded_token: {decoded_token}")
+# # if page_param is not None:
+# # try:
+# # print(f"page_param: {page_param}")
+# # # Try to decode the page_param from base64
+# # is_valid, decoded_params = is_base64(page_param)
+# # print(f"is_valid: {is_valid}; decoded_params: {decoded_params}")
+# # if is_valid:
+# # if decoded_token is None:
+# # auth_page(page_param=page_param)
+# # else:
+# # # Convert the bytes to a string
+# # params_str = decoded_params.decode("utf-8")
+
+# # # Parse the parameters
+# # params = urllib.parse.parse_qs(params_str)
+
+# # # Extract the value of admin_emails
+# # admin_emails = params.get("admin_emails", [""])[0].split(",")
+
+# # print(admin_emails)
+# # print(vars(decoded_token.user))
+# # if decoded_token.user.email in admin_emails:
+# # # admin ui
+# # admin_page(is_admin=True)
+# # else:
+# # # user ui
+# # st.write(
+# # f"email: {decoded_token.user.email}; admin_emails: {admin_emails}"
+# # )
+# # else:
+# # st.error("Unknown page")
+# # except Exception as e:
+# # st.error("Failed to decode the page parameter. Error: " + str(e))
+# # else:
+# # admin_page(is_admin=False)
diff --git a/ui/auth.py b/ui/auth.py
deleted file mode 100644
index 2322de3e6..000000000
--- a/ui/auth.py
+++ /dev/null
@@ -1,45 +0,0 @@
-"""
-Auth in user, to proxy ui.
-
-Uses supabase passwordless auth: https://supabase.com/docs/reference/python/auth-signinwithotp
-
-Remember to set your redirect url to 8501 (streamlit default).
-"""
-import streamlit as st
-from dotenv import load_dotenv
-
-load_dotenv()
-import os
-from supabase import create_client, Client
-
-# Set up Supabase client
-url: str = os.environ.get("SUPABASE_URL")
-key: str = os.environ.get("SUPABASE_KEY")
-supabase: Client = create_client(url, key)
-
-
-def sign_in_with_otp(email: str, page_param: str):
- print(f"received page param: {page_param}")
- data = supabase.auth.sign_in_with_otp(
- {"email": email, "options": {"data": {"page_param": page_param}}}
- )
- print(f"data: {data}")
- # Redirect to Supabase UI with the return data
- st.write(f"Please check your email for a login link!")
-
-
-def verify_with_otp(token: str):
- res = supabase.auth.verify_otp({"token_hash": token, "type": "email"})
- return res
-
-
-# Create the Streamlit app
-def auth_page(page_param: str):
- st.title("User Authentication")
-
- # User email input
- email = st.text_input("Enter your email")
-
- # Sign in button
- if st.button("Sign In"):
- sign_in_with_otp(email, page_param=page_param)
diff --git a/ui/pages/user.py b/ui/pages/user.py
new file mode 100644
index 000000000..8bc65ca02
--- /dev/null
+++ b/ui/pages/user.py
@@ -0,0 +1,137 @@
+"""
+Auth in user, to proxy ui.
+
+Uses supabase passwordless auth: https://supabase.com/docs/reference/python/auth-signinwithotp
+
+Remember to set your redirect url to 8501 (streamlit default).
+"""
+import streamlit as st
+from dotenv import load_dotenv
+import requests, base64, binascii
+
+load_dotenv()
+import os
+
+
+def is_base64(sb):
+ try:
+ if isinstance(sb, str):
+ # Try to encode it to bytes if it's a unicode string
+ sb_bytes = sb.encode("ascii")
+ elif isinstance(sb, bytes):
+ sb_bytes = sb
+ else:
+ # If it is not a byte or a string, it is not base64
+ return False
+
+ # Check if decoding is successful.
+ decoded_params = base64.urlsafe_b64decode(sb_bytes)
+ params_str = decoded_params.decode("utf-8")
+ param_dict = {}
+ # split on the &
+ params = params_str.split("&")
+ # split on the =
+ for param in params:
+ split_val = param.split("=")
+ param_dict[split_val[0].strip()] = split_val[1].strip()
+ # If the decode was successful, the input is likely base64
+ return True, param_dict
+ except (binascii.Error, ValueError):
+ # If an error occurs, return False, as the input is not base64
+ return False
+
+
+def sign_in_with_otp(email: str, page_param: str):
+ print(f"received page param: {page_param}")
+ b64_flag, decoded_params = is_base64(sb=page_param)
+ print(f"b64_flag: {b64_flag}")
+ st.write(f"Decoded params: {decoded_params}")
+ # requests.post()
+ # data = supabase.auth.sign_in_with_otp(
+ # {"email": email, "options": {"data": {"page_param": page_param}}}
+ # )
+ # print(f"data: {data}")
+ # # Redirect to Supabase UI with the return data
+ # st.write(f"Please check your email for a login link!")
+
+
+# def verify_with_otp(token: str):
+# res = supabase.auth.verify_otp({"token_hash": token, "type": "email"})
+# return res
+
+
+# Create the Streamlit app
+def auth_page(page_param: str):
+ st.title("User Authentication")
+
+ # User email input
+ email = st.text_input("Enter your email")
+
+ # Sign in button
+ if st.button("Sign In"):
+ b64_flag, decoded_params = is_base64(sb=page_param)
+ # Define the endpoint you want to make a POST request to
+ if decoded_params["proxy_url"].endswith("/"):
+ post_endpoint = f"{decoded_params['proxy_url']}user/auth"
+ else:
+ post_endpoint = f"{decoded_params['proxy_url']}/user/auth"
+
+ try:
+ assert email.split("@")[1] in decoded_params["accepted_email_subdomain"]
+ except:
+ raise Exception(
+ f"Only emails from {decoded_params['accepted_email_subdomain']} are allowed"
+ )
+ response = requests.post(
+ post_endpoint, json={"user_email": email, "page": page_param}
+ )
+
+ if response.status_code == 200:
+ # Success!
+ st.success(f"Email sent successfully!")
+
+
+def user_page(page_param: str, user_id: str, token: str):
+ st.title("User Configuration")
+
+ # When the button is clicked
+ if st.button("Create Key"):
+ b64_flag, decoded_params = is_base64(sb=page_param)
+ # Define the endpoint you want to make a POST request to
+ if decoded_params["proxy_url"].endswith("/"):
+ post_endpoint = f"{decoded_params['proxy_url']}key/generate"
+ else:
+ post_endpoint = f"{decoded_params['proxy_url']}/key/generate"
+ # Make a POST request to the endpoint
+ headers = {"Authorization": f"Bearer {token}"}
+ response = requests.post(
+ post_endpoint, json={"duration": "1hr", "user_id": user_id}, headers=headers
+ )
+
+ if response.status_code == 200:
+ # Success! You can handle the JSON response if you're expecting one
+ st.success("Key created successfully!")
+ response_data = response.json()
+ st.success(f"API Key: {response_data['key']}")
+
+
+def router():
+ query_params = st.experimental_get_query_params()
+ page_param = query_params.get("page", None)[0]
+ if (
+ query_params.get("token", None) is not None
+ and query_params.get("user_id", None) is not None
+ ):
+ # render user page
+ user_page(
+ page_param=page_param,
+ user_id=query_params.get("user_id")[0],
+ token=query_params.get("token")[0],
+ )
+ elif page_param is not None:
+ auth_page(page_param=page_param)
+ else:
+ st.write("Please setup proxy")
+
+
+router()
diff --git a/ui/requirements.txt b/ui/requirements.txt
index 2cbb20962..bb61bdd15 100644
--- a/ui/requirements.txt
+++ b/ui/requirements.txt
@@ -1,4 +1,3 @@
streamlit
-streamlit_url_fragment
python-dotenv
supabase
\ No newline at end of file