test fix post call rules (#9826)

This commit is contained in:
Ishaan Jaff 2025-04-08 13:55:37 -07:00 committed by GitHub
parent e6403b717c
commit 441c7275ed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 104 additions and 0 deletions

View file

@ -14,6 +14,9 @@ def get_instance_fn(value: str, config_file_path: Optional[str] = None) -> Any:
module_name = ".".join(parts[:-1])
instance_name = parts[-1]
# Security: Check if the module name contains any dangerous modules that can execute arbitrary code
security_checks(module_name=module_name)
# If config_file_path is provided, use it to determine the module spec and load the module
if config_file_path is not None:
directory = os.path.dirname(config_file_path)
@ -47,6 +50,35 @@ def get_instance_fn(value: str, config_file_path: Optional[str] = None) -> Any:
raise e
def security_checks(
module_name: str,
):
"""
This function checks if the module name contains any dangerous modules that can execute arbitrary code.
Reference: https://huntr.com/bounties/1d98bebb-6cf4-46c9-87c3-d3b1972973b5
"""
DANGEROUS_MODULES = [
"os",
"sys",
"subprocess",
"shutil",
"socket",
"multiprocessing",
"threading",
"ctypes",
"pickle",
"marshal",
"builtins",
"__builtin__",
]
# Security: Check if the module name contains any dangerous modules
if any(dangerous in module_name.lower() for dangerous in DANGEROUS_MODULES):
raise ImportError(
f"Importing from module {module_name} is not allowed for security reasons"
)
def validate_custom_validate_return_type(
fn: Optional[Callable[..., Any]]
) -> Optional[Callable[..., Literal[True]]]:

View file

@ -0,0 +1,72 @@
import json
import os
import sys
import pytest
from fastapi.testclient import TestClient
from litellm.proxy.types_utils.utils import security_checks
sys.path.insert(
0, os.path.abspath("../../..")
) # Adds the parent directory to the system path
def test_security_checks_blocks_dangerous_modules():
"""
Resolves: https://huntr.com/bounties/1d98bebb-6cf4-46c9-87c3-d3b1972973b5
This test checks if the security_checks function correctly blocks the import of dangerous modules.
"""
dangerous_module = "/usr/lib/python3/os.system"
with pytest.raises(ImportError) as exc_info:
security_checks(dangerous_module)
assert "not allowed for security reasons" in str(exc_info.value)
assert dangerous_module in str(exc_info.value)
def test_security_checks_various_dangerous_modules():
dangerous_modules = [
"subprocess.run",
"socket.socket",
"pickle.loads",
"marshal.loads",
"ctypes.CDLL",
"builtins.eval",
"__builtin__.exec",
"shutil.rmtree",
"multiprocessing.Process",
"threading.Thread",
]
for module in dangerous_modules:
with pytest.raises(ImportError) as exc_info:
security_checks(module)
assert "not allowed for security reasons" in str(exc_info.value)
assert module in str(exc_info.value)
def test_security_checks_case_insensitive():
# Test that the check is case-insensitive
variations = ["OS.system", "os.System", "Os.SyStEm", "SUBPROCESS.run"]
for module in variations:
with pytest.raises(ImportError) as exc_info:
security_checks(module)
assert "not allowed for security reasons" in str(exc_info.value)
def test_security_checks_nested_paths():
# Test nested paths that contain dangerous modules
nested_paths = [
"some/path/to/os/system",
"myproject/utils/subprocess_wrapper",
"lib/helpers/socket_utils",
"../../../system/os.py",
]
for path in nested_paths:
with pytest.raises(ImportError) as exc_info:
security_checks(path)
assert "not allowed for security reasons" in str(exc_info.value)