Files
deploy.stack/mcp_server-py/mcp_server.py
cnphpbb 79cdb45e20 feat: 添加MCP服务器初始实现及相关文件
添加MCP服务器的初始实现,包括:
- 主程序文件mcp_server.py
- 配置文件config.toml
- 依赖文件requirements.txt
- 安装指南INSTALL.md
- 使用说明README.md

服务器功能包括:
- 接收客户端请求
- 调用OpenAI API进行分析
- 提供健康检查接口
- 支持与disk_inspection.py脚本对接
2025-09-10 22:15:10 +08:00

188 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import os
import sys
import logging
import json
import toml
import traceback
from flask import Flask, request, jsonify
import openai
class MCPServer:
def __init__(self):
# 初始化应用
self.app = Flask(__name__)
# 加载配置
self.config = self.load_config()
# 配置日志
self.setup_logging()
# 配置OpenAI客户端
self.setup_openai()
# 注册路由
self.register_routes()
def load_config(self):
"""加载配置文件"""
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
if not os.path.exists(config_path):
print(f"错误:配置文件不存在: {config_path}")
sys.exit(1)
with open(config_path, "r", encoding="utf-8") as f:
return toml.load(f)
def setup_logging(self):
"""配置日志系统"""
log_config = self.config.get("logging", {})
log_level = getattr(logging, log_config.get("level", "info").upper())
log_format = log_config.get("format", "text")
log_file = log_config.get("file_path", "/var/log/mcp_server.log")
# 确保日志目录存在
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
# 配置日志格式
if log_format == "json":
formatter = logging.Formatter(
'{"time":"%(asctime)s", "level":"%(levelname)s", "message":"%(message)s"}'
)
else:
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 配置文件日志
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setFormatter(formatter)
# 配置控制台日志
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# 获取根logger并配置
logger = logging.getLogger()
logger.setLevel(log_level)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
self.logger = logging.getLogger("MCPServer")
def setup_openai(self):
"""配置OpenAI客户端"""
openai_config = self.config.get("openai", {})
openai.api_key = openai_config.get("api_key")
# 如果配置了自定义API基础URL
if "base_url" in openai_config:
openai.api_base = openai_config["base_url"]
self.openai_config = openai_config
def register_routes(self):
"""注册API路由"""
@self.app.route("/mcp/v1/submit", methods=["POST"])
def submit_data():
"""接收并处理MCP提交的数据"""
try:
# 获取请求数据
data = request.get_json()
if not data:
self.logger.warning("接收到无效的JSON数据")
return jsonify({"error": "无效的JSON数据"}), 400
self.logger.info(f"接收到MCP提交请求类型: {data.get('type')}")
# 处理提交的数据
result = self.process_submission(data)
return jsonify({"status": "success", "result": result})
except Exception as e:
self.logger.error(f"处理提交请求时出错: {str(e)}")
self.logger.debug(traceback.format_exc())
return jsonify({"error": str(e)}), 500
@self.app.route("/health", methods=["GET"])
def health_check():
"""健康检查接口"""
return jsonify({"status": "healthy"}), 200
def process_submission(self, data):
"""处理提交的数据调用OpenAI API进行分析"""
submission_type = data.get("type", "")
content = data.get("content", "")
timestamp = data.get("timestamp", "")
# 根据数据类型生成不同的提示词
if submission_type == "disk_inspection_report":
prompt = self.generate_disk_report_prompt(content)
else:
prompt = f"请分析以下数据(类型:{submission_type}\n{content}"
# 调用OpenAI API
response = self.call_openai_api(prompt)
return {
"analysis": response,
"processed_at": self.get_current_timestamp(),
"original_type": submission_type
}
def generate_disk_report_prompt(self, report_content):
"""为硬盘巡检报告生成特定的提示词"""
return f"""你是一位专业的系统管理员,请分析以下硬盘巡检报告:
{report_content}
请提供详细的分析结果,包括:
1. 健康状态总结
2. 异常项分析(如果有)
3. 潜在风险评估
4. 维护建议
5. 需要特别关注的问题(如果有)"""
def call_openai_api(self, prompt):
"""调用OpenAI API"""
try:
# 调用OpenAI ChatCompletion API
response = openai.ChatCompletion.create(
model=self.openai_config.get("model", "gpt-3.5-turbo"),
messages=[
{"role": "system", "content": "你是一个专业的AI助手帮助用户分析和处理各种数据。"},
{"role": "user", "content": prompt}
],
max_tokens=self.openai_config.get("max_tokens", 2048),
temperature=self.openai_config.get("temperature", 0.7)
)
# 提取回复内容
return response["choices"][0]["message"]["content"].strip()
except Exception as e:
self.logger.error(f"调用OpenAI API时出错: {str(e)}")
raise
def get_current_timestamp(self):
"""获取当前时间戳"""
import datetime
return datetime.datetime.now().isoformat()
def run(self):
"""启动服务器"""
server_config = self.config.get("server", {})
host = server_config.get("host", "0.0.0.0")
port = server_config.get("port", 8080)
debug = server_config.get("debug", False)
self.logger.info(f"MCP服务器启动成功监听地址: {host}:{port}")
self.logger.info(f"OpenAI API配置: 模型={self.openai_config.get('model')}")
# 启动Flask服务器
self.app.run(host=host, port=port, debug=debug)
if __name__ == "__main__":
# 创建并运行MCP服务器
server = MCPServer()
server.run()