forked from DevOps/deploy.stack
添加MCP服务器的初始实现,包括: - 主程序文件mcp_server.py - 配置文件config.toml - 依赖文件requirements.txt - 安装指南INSTALL.md - 使用说明README.md 服务器功能包括: - 接收客户端请求 - 调用OpenAI API进行分析 - 提供健康检查接口 - 支持与disk_inspection.py脚本对接
188 lines
6.6 KiB
Python
188 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: UTF-8 -*-
|
||
|
||
import os
|
||
import sys
|
||
import logging
|
||
import json
|
||
import toml
|
||
import traceback
|
||
from flask import Flask, request, jsonify
|
||
import openai
|
||
|
||
class MCPServer:
|
||
def __init__(self):
|
||
# 初始化应用
|
||
self.app = Flask(__name__)
|
||
# 加载配置
|
||
self.config = self.load_config()
|
||
# 配置日志
|
||
self.setup_logging()
|
||
# 配置OpenAI客户端
|
||
self.setup_openai()
|
||
# 注册路由
|
||
self.register_routes()
|
||
|
||
def load_config(self):
|
||
"""加载配置文件"""
|
||
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
|
||
if not os.path.exists(config_path):
|
||
print(f"错误:配置文件不存在: {config_path}")
|
||
sys.exit(1)
|
||
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
return toml.load(f)
|
||
|
||
def setup_logging(self):
|
||
"""配置日志系统"""
|
||
log_config = self.config.get("logging", {})
|
||
log_level = getattr(logging, log_config.get("level", "info").upper())
|
||
log_format = log_config.get("format", "text")
|
||
log_file = log_config.get("file_path", "/var/log/mcp_server.log")
|
||
|
||
# 确保日志目录存在
|
||
log_dir = os.path.dirname(log_file)
|
||
if log_dir and not os.path.exists(log_dir):
|
||
os.makedirs(log_dir, exist_ok=True)
|
||
|
||
# 配置日志格式
|
||
if log_format == "json":
|
||
formatter = logging.Formatter(
|
||
'{"time":"%(asctime)s", "level":"%(levelname)s", "message":"%(message)s"}'
|
||
)
|
||
else:
|
||
formatter = logging.Formatter(
|
||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
|
||
# 配置文件日志
|
||
file_handler = logging.FileHandler(log_file, encoding="utf-8")
|
||
file_handler.setFormatter(formatter)
|
||
|
||
# 配置控制台日志
|
||
console_handler = logging.StreamHandler()
|
||
console_handler.setFormatter(formatter)
|
||
|
||
# 获取根logger并配置
|
||
logger = logging.getLogger()
|
||
logger.setLevel(log_level)
|
||
logger.addHandler(file_handler)
|
||
logger.addHandler(console_handler)
|
||
|
||
self.logger = logging.getLogger("MCPServer")
|
||
|
||
def setup_openai(self):
|
||
"""配置OpenAI客户端"""
|
||
openai_config = self.config.get("openai", {})
|
||
openai.api_key = openai_config.get("api_key")
|
||
|
||
# 如果配置了自定义API基础URL
|
||
if "base_url" in openai_config:
|
||
openai.api_base = openai_config["base_url"]
|
||
|
||
self.openai_config = openai_config
|
||
|
||
def register_routes(self):
|
||
"""注册API路由"""
|
||
@self.app.route("/mcp/v1/submit", methods=["POST"])
|
||
def submit_data():
|
||
"""接收并处理MCP提交的数据"""
|
||
try:
|
||
# 获取请求数据
|
||
data = request.get_json()
|
||
if not data:
|
||
self.logger.warning("接收到无效的JSON数据")
|
||
return jsonify({"error": "无效的JSON数据"}), 400
|
||
|
||
self.logger.info(f"接收到MCP提交请求,类型: {data.get('type')}")
|
||
|
||
# 处理提交的数据
|
||
result = self.process_submission(data)
|
||
|
||
return jsonify({"status": "success", "result": result})
|
||
except Exception as e:
|
||
self.logger.error(f"处理提交请求时出错: {str(e)}")
|
||
self.logger.debug(traceback.format_exc())
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
@self.app.route("/health", methods=["GET"])
|
||
def health_check():
|
||
"""健康检查接口"""
|
||
return jsonify({"status": "healthy"}), 200
|
||
|
||
def process_submission(self, data):
|
||
"""处理提交的数据,调用OpenAI API进行分析"""
|
||
submission_type = data.get("type", "")
|
||
content = data.get("content", "")
|
||
timestamp = data.get("timestamp", "")
|
||
|
||
# 根据数据类型生成不同的提示词
|
||
if submission_type == "disk_inspection_report":
|
||
prompt = self.generate_disk_report_prompt(content)
|
||
else:
|
||
prompt = f"请分析以下数据(类型:{submission_type}):\n{content}"
|
||
|
||
# 调用OpenAI API
|
||
response = self.call_openai_api(prompt)
|
||
|
||
return {
|
||
"analysis": response,
|
||
"processed_at": self.get_current_timestamp(),
|
||
"original_type": submission_type
|
||
}
|
||
|
||
def generate_disk_report_prompt(self, report_content):
|
||
"""为硬盘巡检报告生成特定的提示词"""
|
||
return f"""你是一位专业的系统管理员,请分析以下硬盘巡检报告:
|
||
|
||
{report_content}
|
||
|
||
请提供详细的分析结果,包括:
|
||
1. 健康状态总结
|
||
2. 异常项分析(如果有)
|
||
3. 潜在风险评估
|
||
4. 维护建议
|
||
5. 需要特别关注的问题(如果有)"""
|
||
|
||
def call_openai_api(self, prompt):
|
||
"""调用OpenAI API"""
|
||
try:
|
||
# 调用OpenAI ChatCompletion API
|
||
response = openai.ChatCompletion.create(
|
||
model=self.openai_config.get("model", "gpt-3.5-turbo"),
|
||
messages=[
|
||
{"role": "system", "content": "你是一个专业的AI助手,帮助用户分析和处理各种数据。"},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
max_tokens=self.openai_config.get("max_tokens", 2048),
|
||
temperature=self.openai_config.get("temperature", 0.7)
|
||
)
|
||
|
||
# 提取回复内容
|
||
return response["choices"][0]["message"]["content"].strip()
|
||
except Exception as e:
|
||
self.logger.error(f"调用OpenAI API时出错: {str(e)}")
|
||
raise
|
||
|
||
def get_current_timestamp(self):
|
||
"""获取当前时间戳"""
|
||
import datetime
|
||
return datetime.datetime.now().isoformat()
|
||
|
||
def run(self):
|
||
"""启动服务器"""
|
||
server_config = self.config.get("server", {})
|
||
host = server_config.get("host", "0.0.0.0")
|
||
port = server_config.get("port", 8080)
|
||
debug = server_config.get("debug", False)
|
||
|
||
self.logger.info(f"MCP服务器启动成功,监听地址: {host}:{port}")
|
||
self.logger.info(f"OpenAI API配置: 模型={self.openai_config.get('model')}")
|
||
|
||
# 启动Flask服务器
|
||
self.app.run(host=host, port=port, debug=debug)
|
||
|
||
if __name__ == "__main__":
|
||
# 创建并运行MCP服务器
|
||
server = MCPServer()
|
||
server.run() |