""" SMD Researcher MCP Server This script implements an MCP server for SMD Researcher, allowing AI assistants to conduct research and generate reports via the MCP protocol. """ import os import logging import aiohttp import asyncio import requests import time from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP # Load environment variables load_dotenv() logging.basicConfig( level=logging.INFO, format='[%(asctime)s][%(levelname)s] - %(message)s', ) logger = logging.getLogger(__name__) # Initialize FastMCP server mcp = FastMCP("SMD Researcher", host="0.0.0.0", port=8000, timeout_keep_alive=720) async def summarize_to_words(text: str, title: str, target_word_count: int = 1000) -> str: url = f"https://maas.ai-2.kvant.cloud/engines/{os.getenv('SWISSDOX_SUMMARIZING_MODEL', '')}/chat/completions" headers = { "x-litellm-api-key": f"{os.getenv('SWISSDOX_SUMMARIZING_MODEL_APIKEY', '')}", "Content-type": "application/json", } payload = { "model": os.getenv('SWISSDOX_SUMMARIZING_MODEL', ''), "messages": [ { "role": "text summarizer", "content": f"You are summarizing the user input to a maximum of {target_word_count}" }, { "role": "user", "content": f"{title} - {text}" } ] } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=payload) as response: if response.status == 200: data = await response.json() return data.get("choices")[0].get("message").get("content") else: return await response.text() async def smd_detail_article(article_id): logger.info("Starting smd_detail_article function.") start_time = time.perf_counter() url = f"https://api.swissdox.ch/api/documents/{article_id}" headers = { "Authorization": f"Bearer {os.getenv('SWISSDOX_BEARER_TOKEN', '')}", "Content-type": "application/json", } payload = {"filters": [], "pagination": {"pageSize": 1, "currentPage": 1}} async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=payload) as response: if response.status == 200: data = await response.json() summarized_content = await summarize_to_words(title=data.get("title"), text=data.get("text"), target_word_count=10000) execution_time = time.perf_counter() - start_time logger.info(f"smd_detail_article executed in {execution_time:.2f} seconds.") logger.info(f"smd_article_summarization {summarized_content}") return { "message": summarized_content, "article_id": article_id } else: return { "message": await response.text(), "article_id": article_id } @mcp.tool() async def smd_research(search_query: str = "Bundesrat", date_from: str = "2024-05-30T22:00:00.000Z", date_to: str = "2025-05-31T21:59:59.999Z") -> dict: """ Execute a deep search on a given query using SMD Researcher. Use this tool when you need research on a topic. Args: search_query: The SMD search query, there are Logical Operators available (AND, OR, NOT) and for a excact match use "+" before the word. For excluding use "-" before the word. For queries with multiple words use quotes. Formulate the Query in German. Enrich the query with the relevant context keywords of the topic. date_from: The date to start research from, in the format YYYY-MM-DDTHH:MM:SS.SSSZ date_to: The date to end research at, in the format YYYY-MM-DDTHH:MM:SS.SSSZ Returns: String containing research status, ID, and the actual research context """ query = { "sort": { "field": "score", "direction": "desc" }, "filters": [ { "field": "datetime", "value": [ os.getenv('SWISSDOX_DATEFROM', '2020-12-31T23:00:00.000Z'), os.getenv('SWISSDOX_DATETO', '2023-12-31T22:59:00.000Z') ] }, { "field": "newspaper", "value": [ os.getenv('SWISSDOX_NEWSPAPER', 'NZZ') ] }, { "field": "query_text", "value": [search_query] } ], "exact": False, "pagination": { "pageSize": os.getenv('SWISSDOX_PAGESIZE', '10'), "currentPage": 1 }, "onlyResults": False } url = "https://api.swissdox.ch/api/documents/search" headers = { "authorization": f"Bearer {os.getenv('SWISSDOX_BEARER_TOKEN', '')}", "content-type": "application/json", } response = requests.post(url, headers=headers, json=query) if response.status_code == 200: result = response.json() articles = result.get("data", []) facets = result.get("facets", []) tasks = [] for article in articles: article_id = article.get("id") if article_id: tasks.append(smd_detail_article(article_id)) detailed_articles = await asyncio.gather(*tasks) logger.info(f"detailed_articles {detailed_articles}") return { "related_persons": facets.get("persons", []), "related_organizations": facets.get("persons", []), "detailed_articles": detailed_articles } else: return { "message": response.text } def run_server(): """Run the MCP server using FastMCP's built-in event loop handling.""" # Add startup message logger.info("Starting GPT Researcher MCP Server...") print("🚀 GPT Researcher MCP Server starting... Check researcher_mcp_server.log for details") # Let FastMCP handle the event loop try: mcp.run("sse") # Note: If we reach here, the server has stopped logger.info("MCP Server has stopped") except Exception as e: logger.error(f"Error running MCP server: {str(e)}") print(f"❌ MCP Server error: {str(e)}") return print("✅ MCP Server stopped") if __name__ == "__main__": # Use the non-async approach to avoid asyncio nesting issues run_server()