From f9640d8a58d9d1e5fce9c685078eb715386de5bd Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 19 Aug 2024 17:58:39 -0700 Subject: [PATCH] feat(langfuse_endpoints.py): support team based logging for langfuse pass-through endpoints --- litellm/proxy/litellm_pre_call_utils.py | 66 +++++++++++-------- .../vertex_ai_endpoints/langfuse_endpoints.py | 36 ++++++++-- 2 files changed, 69 insertions(+), 33 deletions(-) diff --git a/litellm/proxy/litellm_pre_call_utils.py b/litellm/proxy/litellm_pre_call_utils.py index dd39efd6b..85211f943 100644 --- a/litellm/proxy/litellm_pre_call_utils.py +++ b/litellm/proxy/litellm_pre_call_utils.py @@ -100,6 +100,40 @@ def convert_key_logging_metadata_to_callback( return team_callback_settings_obj +def _get_dynamic_logging_metadata( + user_api_key_dict: UserAPIKeyAuth, +) -> Optional[TeamCallbackMetadata]: + callback_settings_obj: Optional[TeamCallbackMetadata] = None + if user_api_key_dict.team_metadata is not None: + team_metadata = user_api_key_dict.team_metadata + if "callback_settings" in team_metadata: + callback_settings = team_metadata.get("callback_settings", None) or {} + callback_settings_obj = TeamCallbackMetadata(**callback_settings) + verbose_proxy_logger.debug( + "Team callback settings activated: %s", callback_settings_obj + ) + """ + callback_settings = { + { + 'callback_vars': {'langfuse_public_key': 'pk', 'langfuse_secret_key': 'sk_'}, + 'failure_callback': [], + 'success_callback': ['langfuse', 'langfuse'] + } + } + """ + elif ( + user_api_key_dict.metadata is not None + and "logging" in user_api_key_dict.metadata + ): + for item in user_api_key_dict.metadata["logging"]: + callback_settings_obj = convert_key_logging_metadata_to_callback( + data=AddTeamCallback(**item), + team_callback_settings_obj=callback_settings_obj, + ) + + return callback_settings_obj + + async def add_litellm_data_to_request( data: dict, request: Request, @@ -270,35 +304,9 @@ async def add_litellm_data_to_request( } # add the team-specific configs to the completion call # Team Callbacks controls - callback_settings_obj: Optional[TeamCallbackMetadata] = None - if user_api_key_dict.team_metadata is not None: - team_metadata = user_api_key_dict.team_metadata - if "callback_settings" in team_metadata: - callback_settings = team_metadata.get("callback_settings", None) or {} - callback_settings_obj = TeamCallbackMetadata(**callback_settings) - verbose_proxy_logger.debug( - "Team callback settings activated: %s", callback_settings_obj - ) - """ - callback_settings = { - { - 'callback_vars': {'langfuse_public_key': 'pk', 'langfuse_secret_key': 'sk_'}, - 'failure_callback': [], - 'success_callback': ['langfuse', 'langfuse'] - } - } - """ - elif ( - user_api_key_dict.metadata is not None - and "logging" in user_api_key_dict.metadata - ): - for item in user_api_key_dict.metadata["logging"]: - - callback_settings_obj = convert_key_logging_metadata_to_callback( - data=AddTeamCallback(**item), - team_callback_settings_obj=callback_settings_obj, - ) - + callback_settings_obj = _get_dynamic_logging_metadata( + user_api_key_dict=user_api_key_dict + ) if callback_settings_obj is not None: data["success_callback"] = callback_settings_obj.success_callback data["failure_callback"] = callback_settings_obj.failure_callback diff --git a/litellm/proxy/vertex_ai_endpoints/langfuse_endpoints.py b/litellm/proxy/vertex_ai_endpoints/langfuse_endpoints.py index a19a8b127..4626cd667 100644 --- a/litellm/proxy/vertex_ai_endpoints/langfuse_endpoints.py +++ b/litellm/proxy/vertex_ai_endpoints/langfuse_endpoints.py @@ -39,6 +39,7 @@ from litellm.batches.main import FileObject from litellm.fine_tuning.main import vertex_fine_tuning_apis_instance from litellm.proxy._types import * from litellm.proxy.auth.user_api_key_auth import user_api_key_auth +from litellm.proxy.litellm_pre_call_utils import _get_dynamic_logging_metadata from litellm.proxy.pass_through_endpoints.pass_through_endpoints import ( create_pass_through_route, ) @@ -71,13 +72,36 @@ async def langfuse_proxy_route( decoded_bytes = base64.b64decode(api_key) decoded_str = decoded_bytes.decode("utf-8") - api_key = decoded_str.split(":")[1] + api_key = decoded_str.split(":")[1] # assume api key is passed in as secret key user_api_key_dict = await user_api_key_auth( request=request, api_key="Bearer {}".format(api_key) ) - base_target_url = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com") + callback_settings_obj: Optional[TeamCallbackMetadata] = ( + _get_dynamic_logging_metadata(user_api_key_dict=user_api_key_dict) + ) + + dynamic_langfuse_public_key: Optional[str] = None + dynamic_langfuse_secret_key: Optional[str] = None + dynamic_langfuse_host: Optional[str] = None + if ( + callback_settings_obj is not None + and callback_settings_obj.callback_vars is not None + ): + for k, v in callback_settings_obj.callback_vars.items(): + if k == "langfuse_public_key": + dynamic_langfuse_public_key = v + elif k == "langfuse_secret_key": + dynamic_langfuse_secret_key = v + elif k == "langfuse_host": + dynamic_langfuse_host = v + + base_target_url: str = ( + dynamic_langfuse_host + or os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com") + or "https://cloud.langfuse.com" + ) if not ( base_target_url.startswith("http://") or base_target_url.startswith("https://") ): @@ -95,8 +119,12 @@ async def langfuse_proxy_route( updated_url = base_url.copy_with(path=encoded_endpoint) # Add or update query parameters - langfuse_public_key = litellm.utils.get_secret(secret_name="LANGFUSE_PUBLIC_KEY") - langfuse_secret_key = litellm.utils.get_secret(secret_name="LANGFUSE_SECRET_KEY") + langfuse_public_key = dynamic_langfuse_public_key or litellm.utils.get_secret( + secret_name="LANGFUSE_PUBLIC_KEY" + ) + langfuse_secret_key = dynamic_langfuse_secret_key or litellm.utils.get_secret( + secret_name="LANGFUSE_SECRET_KEY" + ) langfuse_combined_key = "Basic " + b64encode( f"{langfuse_public_key}:{langfuse_secret_key}".encode("utf-8")