mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 03:04:13 +00:00
fix only use crypto imports when needed
This commit is contained in:
parent
4c99010eee
commit
4abb83b12d
1 changed files with 19 additions and 12 deletions
|
@ -5,9 +5,6 @@ import json
|
||||||
import os
|
import os
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
from cryptography.hazmat.primitives import hashes, serialization
|
|
||||||
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
|
||||||
|
|
||||||
from litellm._logging import verbose_proxy_logger
|
from litellm._logging import verbose_proxy_logger
|
||||||
from litellm.llms.custom_httpx.http_handler import HTTPHandler
|
from litellm.llms.custom_httpx.http_handler import HTTPHandler
|
||||||
|
|
||||||
|
@ -27,16 +24,22 @@ class LicenseCheck:
|
||||||
self.read_public_key()
|
self.read_public_key()
|
||||||
|
|
||||||
def read_public_key(self):
|
def read_public_key(self):
|
||||||
# current dir
|
try:
|
||||||
current_dir = os.path.dirname(os.path.realpath(__file__))
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
||||||
|
|
||||||
# check if public_key.pem exists
|
# current dir
|
||||||
_path_to_public_key = os.path.join(current_dir, "public_key.pem")
|
current_dir = os.path.dirname(os.path.realpath(__file__))
|
||||||
if os.path.exists(_path_to_public_key):
|
|
||||||
with open(_path_to_public_key, "rb") as key_file:
|
# check if public_key.pem exists
|
||||||
self.public_key = serialization.load_pem_public_key(key_file.read())
|
_path_to_public_key = os.path.join(current_dir, "public_key.pem")
|
||||||
else:
|
if os.path.exists(_path_to_public_key):
|
||||||
self.public_key = None
|
with open(_path_to_public_key, "rb") as key_file:
|
||||||
|
self.public_key = serialization.load_pem_public_key(key_file.read())
|
||||||
|
else:
|
||||||
|
self.public_key = None
|
||||||
|
except Exception as e:
|
||||||
|
verbose_proxy_logger.error(f"Error reading public key: {str(e)}")
|
||||||
|
|
||||||
def _verify(self, license_str: str) -> bool:
|
def _verify(self, license_str: str) -> bool:
|
||||||
url = "{}/verify_license/{}".format(self.base_url, license_str)
|
url = "{}/verify_license/{}".format(self.base_url, license_str)
|
||||||
|
@ -76,6 +79,9 @@ class LicenseCheck:
|
||||||
|
|
||||||
def verify_license_without_api_request(self, public_key, license_key):
|
def verify_license_without_api_request(self, public_key, license_key):
|
||||||
try:
|
try:
|
||||||
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
||||||
|
|
||||||
# Decode the license key
|
# Decode the license key
|
||||||
decoded = base64.b64decode(license_key)
|
decoded = base64.b64decode(license_key)
|
||||||
message, signature = decoded.split(b".", 1)
|
message, signature = decoded.split(b".", 1)
|
||||||
|
@ -107,4 +113,5 @@ class LicenseCheck:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
verbose_proxy_logger.error(str(e))
|
||||||
return False
|
return False
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue