mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 18:54:30 +00:00
* track LiteLLM_OrganizationMembership * add add_internal_user_to_organization * add org membership to schema * read organization membership when reading user info in auth checks * add check for valid organization_id * add test for test_create_new_user_in_organization * test test_create_new_user_in_organization * add new ADMIN role * add test for org admins creating teams * add test for test_org_admin_create_user_permissions * test_org_admin_create_user_team_wrong_org_permissions * test_org_admin_create_user_team_wrong_org_permissions * fix organization_role_based_access_check * fix getting user members * fix TeamBase * fix types used for use role * fix type checks * sync prisma schema * docs - organization admins * fix use organization_endpoints for /organization management * add types for org member endpoints * fix role name for org admin * add type for member add response * add organization/member_add * add error handling for adding members to an org * add nice doc string for oranization/member_add * fix test_create_new_user_in_organization * linting fix * use simple route changes * fix types * add organization member roles * add org admin auth checks * add auth checks for orgs * test for creating teams as org admin * simplify org id usage * fix typo * test test_org_admin_create_user_team_wrong_org_permissions * fix type check issue * code quality fix * fix schema.prisma
161 lines
6.3 KiB
Python
161 lines
6.3 KiB
Python
"""
|
|
Auth Checks for Organizations
|
|
"""
|
|
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
from fastapi import status
|
|
|
|
from litellm.proxy._types import *
|
|
|
|
|
|
def organization_role_based_access_check(
|
|
request_body: dict,
|
|
user_object: Optional[LiteLLM_UserTable],
|
|
route: str,
|
|
):
|
|
"""
|
|
Role based access control checks only run if a user is part of an Organization
|
|
|
|
Organization Checks:
|
|
ONLY RUN IF user_object.organization_memberships is not None
|
|
|
|
1. Only Proxy Admins can access /organization/new
|
|
2. IF route is a LiteLLMRoutes.org_admin_only_routes, then check if user is an Org Admin for that organization
|
|
|
|
"""
|
|
|
|
if user_object is None:
|
|
return
|
|
|
|
passed_organization_id: Optional[str] = request_body.get("organization_id", None)
|
|
|
|
if route == "/organization/new":
|
|
if user_object.user_role != LitellmUserRoles.PROXY_ADMIN.value:
|
|
raise ProxyException(
|
|
message=f"Only proxy admins can create new organizations. You are {user_object.user_role}",
|
|
type=ProxyErrorTypes.auth_error.value,
|
|
param="user_role",
|
|
code=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
|
|
if user_object.user_role == LitellmUserRoles.PROXY_ADMIN.value:
|
|
return
|
|
|
|
# Checks if route is an Org Admin Only Route
|
|
if route in LiteLLMRoutes.org_admin_only_routes.value:
|
|
_user_organizations, _user_organization_role_mapping = (
|
|
get_user_organization_info(user_object)
|
|
)
|
|
|
|
if user_object.organization_memberships is None:
|
|
raise ProxyException(
|
|
message=f"Tried to access route={route} but you are not a member of any organization. Please contact the proxy admin to request access.",
|
|
type=ProxyErrorTypes.auth_error.value,
|
|
param="organization_id",
|
|
code=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
|
|
if passed_organization_id is None:
|
|
raise ProxyException(
|
|
message="Passed organization_id is None, please pass an organization_id in your request",
|
|
type=ProxyErrorTypes.auth_error.value,
|
|
param="organization_id",
|
|
code=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
|
|
user_role: Optional[LitellmUserRoles] = _user_organization_role_mapping.get(
|
|
passed_organization_id
|
|
)
|
|
if user_role is None:
|
|
raise ProxyException(
|
|
message=f"You do not have a role within the selected organization. Passed organization_id: {passed_organization_id}. Please contact the organization admin to request access.",
|
|
type=ProxyErrorTypes.auth_error.value,
|
|
param="organization_id",
|
|
code=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
|
|
if user_role != LitellmUserRoles.ORG_ADMIN.value:
|
|
raise ProxyException(
|
|
message=f"You do not have the required role to perform {route} in Organization {passed_organization_id}. Your role is {user_role} in Organization {passed_organization_id}",
|
|
type=ProxyErrorTypes.auth_error.value,
|
|
param="user_role",
|
|
code=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
elif route == "/team/new":
|
|
# if user is part of multiple teams, then they need to specify the organization_id
|
|
_user_organizations, _user_organization_role_mapping = (
|
|
get_user_organization_info(user_object)
|
|
)
|
|
if (
|
|
user_object.organization_memberships is not None
|
|
and len(user_object.organization_memberships) > 0
|
|
):
|
|
if passed_organization_id is None:
|
|
raise ProxyException(
|
|
message=f"Passed organization_id is None, please specify the organization_id in your request. You are part of multiple organizations: {_user_organizations}",
|
|
type=ProxyErrorTypes.auth_error.value,
|
|
param="organization_id",
|
|
code=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
|
|
_user_role_in_passed_org = _user_organization_role_mapping.get(
|
|
passed_organization_id
|
|
)
|
|
if _user_role_in_passed_org != LitellmUserRoles.ORG_ADMIN.value:
|
|
raise ProxyException(
|
|
message=f"You do not have the required role to call {route}. Your role is {_user_role_in_passed_org} in Organization {passed_organization_id}",
|
|
type=ProxyErrorTypes.auth_error.value,
|
|
param="user_role",
|
|
code=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
|
|
|
|
def get_user_organization_info(
|
|
user_object: LiteLLM_UserTable,
|
|
) -> Tuple[List[str], Dict[str, Optional[LitellmUserRoles]]]:
|
|
"""
|
|
Helper function to extract user organization information.
|
|
|
|
Args:
|
|
user_object (LiteLLM_UserTable): The user object containing organization memberships.
|
|
|
|
Returns:
|
|
Tuple[List[str], Dict[str, Optional[LitellmUserRoles]]]: A tuple containing:
|
|
- List of organization IDs the user is a member of
|
|
- Dictionary mapping organization IDs to user roles
|
|
"""
|
|
_user_organizations: List[str] = []
|
|
_user_organization_role_mapping: Dict[str, Optional[LitellmUserRoles]] = {}
|
|
|
|
if user_object.organization_memberships is not None:
|
|
for _membership in user_object.organization_memberships:
|
|
if _membership.organization_id is not None:
|
|
_user_organizations.append(_membership.organization_id)
|
|
_user_organization_role_mapping[_membership.organization_id] = _membership.user_role # type: ignore
|
|
|
|
return _user_organizations, _user_organization_role_mapping
|
|
|
|
|
|
def _user_is_org_admin(
|
|
request_data: dict,
|
|
user_object: Optional[LiteLLM_UserTable] = None,
|
|
) -> bool:
|
|
"""
|
|
Helper function to check if user is an org admin for the passed organization_id
|
|
"""
|
|
if request_data.get("organization_id", None) is None:
|
|
return False
|
|
|
|
if user_object is None:
|
|
return False
|
|
|
|
if user_object.organization_memberships is None:
|
|
return False
|
|
|
|
for _membership in user_object.organization_memberships:
|
|
if _membership.organization_id == request_data.get("organization_id", None):
|
|
if _membership.user_role == LitellmUserRoles.ORG_ADMIN.value:
|
|
return True
|
|
|
|
return False
|