#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import os import sys import logging import json import toml import traceback from flask import Flask, request, jsonify import openai class MCPServer: def __init__(self): # 初始化应用 self.app = Flask(__name__) # 加载配置 self.config = self.load_config() # 配置日志 self.setup_logging() # 配置OpenAI客户端 self.setup_openai() # 注册路由 self.register_routes() def load_config(self): """加载配置文件""" config_path = os.path.join(os.path.dirname(__file__), "config.toml") if not os.path.exists(config_path): print(f"错误:配置文件不存在: {config_path}") sys.exit(1) with open(config_path, "r", encoding="utf-8") as f: return toml.load(f) def setup_logging(self): """配置日志系统""" log_config = self.config.get("logging", {}) log_level = getattr(logging, log_config.get("level", "info").upper()) log_format = log_config.get("format", "text") log_file = log_config.get("file_path", "/var/log/mcp_server.log") # 确保日志目录存在 log_dir = os.path.dirname(log_file) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) # 配置日志格式 if log_format == "json": formatter = logging.Formatter( '{"time":"%(asctime)s", "level":"%(levelname)s", "message":"%(message)s"}' ) else: formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 配置文件日志 file_handler = logging.FileHandler(log_file, encoding="utf-8") file_handler.setFormatter(formatter) # 配置控制台日志 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) # 获取根logger并配置 logger = logging.getLogger() logger.setLevel(log_level) logger.addHandler(file_handler) logger.addHandler(console_handler) self.logger = logging.getLogger("MCPServer") def setup_openai(self): """配置OpenAI客户端""" openai_config = self.config.get("openai", {}) openai.api_key = openai_config.get("api_key") # 如果配置了自定义API基础URL if "base_url" in openai_config: openai.api_base = openai_config["base_url"] self.openai_config = openai_config def register_routes(self): """注册API路由""" @self.app.route("/mcp/v1/submit", methods=["POST"]) def submit_data(): """接收并处理MCP提交的数据""" try: # 获取请求数据 data = request.get_json() if not data: self.logger.warning("接收到无效的JSON数据") return jsonify({"error": "无效的JSON数据"}), 400 self.logger.info(f"接收到MCP提交请求,类型: {data.get('type')}") # 处理提交的数据 result = self.process_submission(data) return jsonify({"status": "success", "result": result}) except Exception as e: self.logger.error(f"处理提交请求时出错: {str(e)}") self.logger.debug(traceback.format_exc()) return jsonify({"error": str(e)}), 500 @self.app.route("/health", methods=["GET"]) def health_check(): """健康检查接口""" return jsonify({"status": "healthy"}), 200 def process_submission(self, data): """处理提交的数据,调用OpenAI API进行分析""" submission_type = data.get("type", "") content = data.get("content", "") timestamp = data.get("timestamp", "") # 根据数据类型生成不同的提示词 if submission_type == "disk_inspection_report": prompt = self.generate_disk_report_prompt(content) else: prompt = f"请分析以下数据(类型:{submission_type}):\n{content}" # 调用OpenAI API response = self.call_openai_api(prompt) return { "analysis": response, "processed_at": self.get_current_timestamp(), "original_type": submission_type } def generate_disk_report_prompt(self, report_content): """为硬盘巡检报告生成特定的提示词""" return f"""你是一位专业的系统管理员,请分析以下硬盘巡检报告: {report_content} 请提供详细的分析结果,包括: 1. 健康状态总结 2. 异常项分析(如果有) 3. 潜在风险评估 4. 维护建议 5. 需要特别关注的问题(如果有)""" def call_openai_api(self, prompt): """调用OpenAI API""" try: # 调用OpenAI ChatCompletion API response = openai.ChatCompletion.create( model=self.openai_config.get("model", "gpt-3.5-turbo"), messages=[ {"role": "system", "content": "你是一个专业的AI助手,帮助用户分析和处理各种数据。"}, {"role": "user", "content": prompt} ], max_tokens=self.openai_config.get("max_tokens", 2048), temperature=self.openai_config.get("temperature", 0.7) ) # 提取回复内容 return response["choices"][0]["message"]["content"].strip() except Exception as e: self.logger.error(f"调用OpenAI API时出错: {str(e)}") raise def get_current_timestamp(self): """获取当前时间戳""" import datetime return datetime.datetime.now().isoformat() def run(self): """启动服务器""" server_config = self.config.get("server", {}) host = server_config.get("host", "0.0.0.0") port = server_config.get("port", 8080) debug = server_config.get("debug", False) self.logger.info(f"MCP服务器启动成功,监听地址: {host}:{port}") self.logger.info(f"OpenAI API配置: 模型={self.openai_config.get('model')}") # 启动Flask服务器 self.app.run(host=host, port=port, debug=debug) if __name__ == "__main__": # 创建并运行MCP服务器 server = MCPServer() server.run()