diff --git a/.gitignore b/.gitignore index 9794991..0eb452c 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,6 @@ crontab/test_*.py crontab/test*.py g-b/* .gitignore +mcptime/go.sum +mcpTimeServer/go.sum +mcp_server_go/config.toml diff --git a/compare_services.py b/compare_services.py new file mode 100644 index 0000000..780634b --- /dev/null +++ b/compare_services.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import re + +# 提取服务信息的函数 +def extract_service_info(file_path): + """从文件中提取服务信息,返回服务名称和镜像的字典""" + services = {} + + with open(file_path, 'r', encoding='utf-8') as f: + lines = f.readlines() + + # 跳过表头 + for line in lines[2:]: + if line.strip(): + # 使用固定宽度解析每行数据 + try: + # 从固定位置提取各字段 + name_part = line[12:42].strip() # 服务名称 + image_part = line[70:136].strip() # 镜像名称 + + # 只有当name和image都有值时才添加 + if name_part and image_part: + # 保存原始名称和镜像 + services[name_part] = image_part + except Exception as e: + print(f"解析行时出错: {line.strip()}, 错误: {e}") + + return services + +def normalize_service_name(service_name): + """标准化服务名称,移除环境前缀,处理大小写和分隔符差异""" + # 转换为小写以处理大小写差异 + normalized = service_name.lower() + + # Blue环境的服务名称处理 + blue_patterns = [ + 'blue-php_', 'blue-ppos_', 'blue_api_', 'blue_web_' + ] + for pattern in blue_patterns: + if normalized.startswith(pattern.lower()): + normalized = normalized[len(pattern):] + break + + # Green环境的服务名称处理 + green_patterns = [ + 'papacore_', 'pposmix_', 'cron_', 'dataserv_', 'imgser_', 'ppcscore_' + ] + for pattern in green_patterns: + if normalized.startswith(pattern.lower()): + normalized = normalized[len(pattern):] + break + + # 替换所有分隔符为下划线,统一格式 + normalized = re.sub(r'[-_]+', '_', normalized) + + # 移除可能的前后空格 + normalized = normalized.strip('_') + + return normalized + +def compare_services(blue_services, green_services): + """比较两个环境中的服务镜像""" + # 1. blue_services 已经是 name 为 key,image 为值的字典对象 + # 2. green_services 已经是 name 为 key,image 为值的字典对象 + + # 创建标准化名称到原始名称和镜像的映射 + blue_norm_map = {normalize_service_name(name): (name, image) for name, image in blue_services.items()} + green_norm_map = {normalize_service_name(name): (name, image) for name, image in green_services.items()} + + + # print(blue_norm_map) + print("=" * 80) + print(green_norm_map) + + # 显示所有Blue和Green环境的服务镜像对比 + print("=== Blue环境与Green环境服务镜像详细对比 ===") + print("{:<35} {:<65} {:<65}".format("服务名称", "Blue镜像", "Green镜像")) + print("=" * 80) + + # 统计变量 + different_count = 0 + same_count = 0 + + # 创建数组用于存储更新命令 + update_commands = [] + + # 遍历Blue环境中的每个服务(使用标准化名称进行比较) + for norm_name, (blue_name, blue_image) in blue_norm_map.items(): + # 初始化green_name和green_image为默认值 + green_name = "-" # 表示未找到对应服务 + green_image = "-" + + # if norm_name == "member_middleplatform": + # green_name, green_image = green_norm_map.get("ppos_member_middlepla", ("未找到", "")) + + # if norm_name == "device_api": + # green_name, green_image = green_norm_map.get("ppos_device_api", ("未找到", "")) + + # if norm_name == "events-api": + # green_name, green_image = green_norm_map.get("ppos_events_api", ("未找到", "")) + + # if norm_name == "ac-api": + # green_name, green_image = green_norm_map.get("ppos_ac_api", ("未找到", "")) + + + if norm_name in green_norm_map: + green_name, green_image = green_norm_map[norm_name] + + print(norm_name, blue_name, green_name) + + # 查找Green环境中是否有相同标准化名称的服务 + #if norm_name in green_norm_map: + # 打印Blue中与Green不同的服务 + # if blue_image != green_image: + # print("{:<35} {:<65} {:<65}".format( + # blue_name[:30] + "..." if len(blue_name) > 30 else blue_name, + # blue_image[:60] + "..." if len(blue_image) > 60 else blue_image, + # green_image[:60] + "..." if len(green_image) > 60 else green_image + # )) + # # 生成更新命令并添加到数组中 + # update_command = generate_update_command(blue_name, green_image) + # update_commands.append(update_command) + # different_count += 1 + # else: + # same_count += 1 + + # # 显示Green环境中特有但Blue环境没有的服务 + # print() + # print("=== Green环境中特有但Blue环境没有的服务 ===") + # for norm_name, (green_name, green_image) in green_norm_map.items(): + # if norm_name not in blue_norm_map: + # print("{:<35} {:<65}".format( + # green_name[:30] + "..." if len(green_name) > 30 else green_name, + # green_image[:60] + "..." if len(green_image) > 60 else green_image + # )) + + # # 打印所有更新命令 + # if update_commands: + # print() + # print("=== 推荐的更新命令 ===") + # for command in update_commands: + # print(command) + + # print() + # print(f"Blue和Green环境中名称相同但镜像不同的服务总数: {different_count}") + # print(f"Blue和Green环境中名称相同且镜像相同的服务总数: {same_count}") + +# 添加一个函数 生成`docker service update --force --image hub.wesais.cn/ppos-pro/pposorder:v.250910.1819 blue-ppos_PPOSOrder` +def generate_update_command(service_name, image): + """生成更新服务镜像的命令""" + return f"docker service update --force --image {image} {service_name}" + + + +def main(): + """主函数,比较两个环境中的服务镜像""" + blue_path = '/home/geng/mydate/deploy.stack/g-b/blue' + green_path = '/home/geng/mydate/deploy.stack/g-b/green' + + # 提取服务信息 + blue_services = extract_service_info(blue_path) + green_services = extract_service_info(green_path) + # print(blue_services) + # print(green_services) + + # 比较服务镜像 + compare_services(blue_services, green_services) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/crontab/disk_inspection.py b/crontab/disk_inspection.py index 4795958..cadc51c 100644 --- a/crontab/disk_inspection.py +++ b/crontab/disk_inspection.py @@ -52,28 +52,47 @@ class DiskInspection: return None def parse_smart_info(self, output, controller_id): - """解析smartctl输出的信息,根据实际smartctl_log格式调整""" + """解析smartctl输出的信息,兼容SAS和SSD硬盘格式""" if not output: return {"error": "No data available"} parsed_info = {} - # 提取基本信息 + # 提取基本信息 - 兼容SAS和SSD格式 + # 厂商信息 (SAS格式) vendor_match = re.search(r"Vendor:\s+(.*)", output) if vendor_match: parsed_info["vendor"] = vendor_match.group(1) + # 产品型号 (SAS格式) product_match = re.search(r"Product:\s+(.*)", output) if product_match: parsed_info["device_model"] = product_match.group(1) + # 设备型号 (SSD格式) + elif not parsed_info.get("device_model"): + device_model_match = re.search(r"Device Model:\s+(.*)", output) + if device_model_match: + parsed_info["device_model"] = device_model_match.group(1) + # 固件版本 (SAS格式) revision_match = re.search(r"Revision:\s+(.*)", output) if revision_match: parsed_info["firmware_version"] = revision_match.group(1) + # 固件版本 (SSD格式) + elif not parsed_info.get("firmware_version"): + firmware_match = re.search(r"Firmware Version:\s+(.*)", output) + if firmware_match: + parsed_info["firmware_version"] = firmware_match.group(1) + # 序列号 (SAS格式) serial_match = re.search(r"Serial number:\s+(.*)", output) if serial_match: parsed_info["serial_number"] = serial_match.group(1) + # 序列号 (SSD格式) + elif not parsed_info.get("serial_number"): + serial_ssd_match = re.search(r"Serial Number:\s+(.*)", output) + if serial_ssd_match: + parsed_info["serial_number"] = serial_ssd_match.group(1) # 提取容量信息 capacity_match = re.search(r"User Capacity:\s+(.*?)\s+bytes", output) @@ -85,27 +104,69 @@ class DiskInspection: if rotation_match: parsed_info["rotation_rate"] = rotation_match.group(1) - # 提取SMART健康状态 + # 提取SMART健康状态 - 兼容SAS和SSD格式 + # SAS格式 health_match = re.search(r"SMART Health Status:\s+(.*)", output) if health_match: parsed_info["health_status"] = health_match.group(1) + # SSD格式 + elif not parsed_info.get("health_status"): + health_ssd_match = re.search(r"SMART overall-health self-assessment test result:\s+(.*)", output) + if health_ssd_match: + parsed_info["health_status"] = health_ssd_match.group(1) - # 提取温度信息 + # 提取温度信息 - 兼容SAS和SSD格式 + # SAS格式 temp_match = re.search(r"Current Drive Temperature:\s+([0-9]+)", output) if temp_match: parsed_info["temperature"] = temp_match.group(1) + # SSD格式 - 增强版,能处理更多格式变化 + elif not parsed_info.get("temperature"): + # 尝试匹配194 Temperature_Celsius行,精确提取RAW_VALUE + temp_ssd_match = re.search(r"194 Temperature_Celsius.*?\b(\d+)\b\s*$", output) + if temp_ssd_match: + parsed_info["temperature"] = temp_ssd_match.group(1) + else: + # 尝试匹配其他可能的温度表示方式 + temp_ssd_alt_match = re.search(r"Temperature_Celsius.*?\b(\d+)\b", output) + if temp_ssd_alt_match: + parsed_info["temperature"] = temp_ssd_alt_match.group(1) - # 提取通电时间 + # 提取通电时间 - 兼容SAS和SSD格式 + # SAS格式 power_on_match = re.search(r"Accumulated power on time, hours:minutes\s+([0-9:]+)", output) if power_on_match: - parsed_info["power_on_time"] = power_on_match.group(1) - - # 提取制造日期 + parsed_info["power_on_time"] = f"{ power_on_match.group(1)}" + # SSD格式 - 增强版,能处理更多格式变化 + elif not parsed_info.get("power_on_time"): + # 尝试匹配9 Power_On_Hours行,精确提取RAW_VALUE + # 改进的正则:添加\s*匹配行首空格,使用re.MULTILINE匹配多行 + power_on_ssd_match = re.search(r"\s*9 Power_On_Hours.*?\b(\d+)\b\s*$", output, re.MULTILINE) + if power_on_ssd_match: + parsed_info["power_on_time"] = f"{power_on_ssd_match.group(1)}" + else: + # 尝试其他可能的通电时间表示方式,使用更宽松的正则 + power_on_ssd_alt_match = re.search(r"Power_On_Hours.*?\b(\d+)\b", output, re.MULTILINE) + if power_on_ssd_alt_match: + parsed_info["power_on_time"] = f"{power_on_ssd_alt_match.group(1)}" + # 提取制造日期 - 增强版,支持多种格式 + # 格式1: Manufactured in week XX of year XXXX manufactured_match = re.search(r"Manufactured in week (\d+) of year (\d+)", output) if manufactured_match: parsed_info["manufactured_date"] = f"{manufactured_match.group(2)}-W{manufactured_match.group(1)}" + else: + # 格式2: 尝试从固件版本或其他字段提取制造年份信息 + # 例如:固件版本通常包含年份信息 SN14546 -> 2024年 + firmware = parsed_info.get("firmware_version", "") + if firmware: + # 尝试从固件版本中提取制造年份信息 + year_match = re.search(r"(\d{2})\d{2}", firmware) + if year_match: + year = year_match.group(1) + # 假设年份是21世纪 + parsed_info["manufactured_date"] = f"20{year}" - # 提取错误计数信息 + # 提取错误计数信息 (SAS格式) error_read_match = re.search(r"read:\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+)", output) if error_read_match: parsed_info["read_errors"] = { @@ -122,7 +183,8 @@ class DiskInspection: "data_processed": f"{error_write_match.group(6)} GB" } - # 提取SMART自检日志信息 + # 提取SMART自检日志信息 - 兼容SAS和SSD格式 + # SAS格式 self_test_match = re.search(r"SMART Self-test log\nNum\s+Test\s+Status\s+segment\s+LifeTime\s+LBA_first_err \[SK ASC ASQ\]\n(.*?)(\n\s*\n|$)", output, re.DOTALL) if self_test_match: test_results = [] @@ -140,11 +202,38 @@ class DiskInspection: test_results.append(test_info) if test_results: parsed_info["self_test_results"] = test_results + # SSD格式 + elif not parsed_info.get("self_test_results"): + self_test_ssd_match = re.search(r"SMART Self-test log structure revision number \d+\nNum\s+Test_Description\s+Status\s+Remaining\s+LifeTime\(hours\)\s+LBA_of_first_error\n(.*?)(\n\s*\n|$)", output, re.DOTALL) + if self_test_ssd_match: + test_results = [] + lines = self_test_ssd_match.group(1).strip().split("\n") + for line in lines: + if line.strip() and line.startswith("#"): + parts = re.split(r"\s+", line.strip()) + if len(parts) >= 5: + test_info = { + "id": parts[0].replace("#", ""), + "type": parts[1], + "status": " ".join(parts[2:4]) if parts[2] == "Completed" and parts[3] == "without" else parts[2], + "lifetime_hours": parts[4] + } + test_results.append(test_info) + if test_results: + parsed_info["self_test_results"] = test_results + + # 检测硬盘类型 (根据旋转速率或特定属性) + if "rotation_rate" in parsed_info and "Solid State Device" in parsed_info["rotation_rate"]: + parsed_info["disk_type"] = "SSD" + elif "rotation_rate" in parsed_info and "rpm" in parsed_info["rotation_rate"]: + parsed_info["disk_type"] = "HDD" + else: + parsed_info["disk_type"] = "Unknown" return parsed_info def generate_md_report(self): - """生成Markdown格式的报告,根据实际smartctl_log格式调整""" + """生成Markdown格式的报告,兼容SAS和SSD硬盘格式""" with open(self.md_report, "w") as f: f.write(f"# 硬盘巡检报告\n\n") f.write(f"**检查日期**: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") @@ -153,24 +242,34 @@ class DiskInspection: # 添加健康状态概览 healthy_count = sum(1 for cid, info in self.results.items() - if "health_status" in info and info["health_status"] == "OK") + if "health_status" in info and (info["health_status"] == "OK" or info["health_status"] == "PASSED")) detected_count = sum(1 for cid, info in self.results.items() if "health_status" in info) + + # 统计硬盘类型 + hdd_count = sum(1 for cid, info in self.results.items() + if "disk_type" in info and info["disk_type"] == "HDD") + ssd_count = sum(1 for cid, info in self.results.items() + if "disk_type" in info and info["disk_type"] == "SSD") + f.write(f"### 健康状态概览\n") f.write(f"- 总控制器数: {self.controller_count}\n") f.write(f"- 检测到的控制器数: {detected_count}\n") f.write(f"- 健康控制器数: {healthy_count}\n") - f.write(f"- 异常控制器数: {detected_count - healthy_count}\n\n") + f.write(f"- 异常控制器数: {detected_count - healthy_count}\n") + f.write(f"- HDD硬盘数: {hdd_count}\n") + f.write(f"- SSD硬盘数: {ssd_count}\n\n") - # 详细信息表格 - 简化版 + # 详细信息表格 - 简化版,添加硬盘类型列 f.write("### 基本信息概览\n") - f.write("| 控制器ID | 厂商 | 设备型号 | 序列号 | 健康状态 | 温度(°C) | 通电时间 |\n") - f.write("|----------|------|----------|--------|----------|----------|----------|\n") + f.write("| 控制器ID | 硬盘类型 | 厂商 | 设备型号 | 序列号 | 健康状态 | 温度(°C) | 通电时间 |\n") + f.write("|----------|----------|------|----------|--------|----------|----------|----------|\n") for controller_id in range(self.controller_count): info = self.results.get(controller_id, {}) if "health_status" in info: # 只显示有数据的控制器 f.write(f"| {controller_id} ") + f.write(f"| {info.get('disk_type', 'N/A')} ") f.write(f"| {info.get('vendor', 'N/A')} ") f.write(f"| {info.get('device_model', 'N/A')} ") f.write(f"| {info.get('serial_number', 'N/A')} ") @@ -192,12 +291,13 @@ class DiskInspection: f.write(f"- **固件版本**: {info.get('firmware_version', 'N/A')}\n") f.write(f"- **容量**: {info.get('capacity', 'N/A')}\n") f.write(f"- **旋转速率**: {info.get('rotation_rate', 'N/A')}\n") + f.write(f"- **硬盘类型**: {info.get('disk_type', 'N/A')}\n") f.write(f"- **制造日期**: {info.get('manufactured_date', 'N/A')}\n") f.write(f"- **健康状态**: **{info.get('health_status', 'N/A')}**\n") f.write(f"- **当前温度**: {info.get('temperature', 'N/A')}°C\n") f.write(f"- **累计通电时间**: {info.get('power_on_time', 'N/A')}\n") - # 错误计数信息 + # 错误计数信息 (主要适用于SAS硬盘) if "read_errors" in info: read_err = info["read_errors"] f.write("- **读取错误**:\n") @@ -218,8 +318,8 @@ class DiskInspection: for test in info["self_test_results"]: f.write(f" - 测试 #{test['id']}: {test['type']} - {test['status']} (运行时间: {test['lifetime_hours']}小时)\n") - # 检查是否有异常 - if info.get("health_status", "") != "OK": + # 检查是否有异常 - 兼容SAS(OK)和SSD(PASSED)的健康状态表示 + if info.get("health_status", "") not in ["OK", "PASSED"]: has_issues = True f.write("\n**⚠️ 警告:此控制器状态异常,请及时关注!**\n") @@ -230,7 +330,7 @@ class DiskInspection: f.write("**发现异常控制器,请关注以下问题:**\n\n") for controller_id in range(self.controller_count): info = self.results.get(controller_id, {}) - if info.get("health_status", "") != "OK": + if info.get("health_status", "") not in ["OK", "PASSED"]: f.write(f"- **控制器 {controller_id}**: 状态为 {info.get('health_status', '未知')}\n") f.write("\n") else: diff --git a/mcpTimeServer/README.md b/mcpTimeServer/README.md new file mode 100644 index 0000000..87601ce --- /dev/null +++ b/mcpTimeServer/README.md @@ -0,0 +1,152 @@ +# MCP Time Server + +一个使用Golang实现的MCP(Model Context Protocol)服务器,提供获取当前系统时间和SSE(Server-Sent Events)实时时间流功能。 + +## 项目概述 + +这个服务器实现了自定义的MCP协议,允许客户端通过HTTP接口获取当前系统时间,并且支持通过SSE技术订阅实时时间更新流。服务器使用slog库进行日志记录,日志仅输出到控制台。 + +## 功能特性 + +- 提供获取当前系统时间的MCP工具(`get_current_time`) +- 支持自定义时间格式 +- 实现SSE(Server-Sent Events)功能,提供实时时间流订阅(`subscribe_time_stream`) +- 使用slog进行结构化日志记录 +- 基于HTTP实现MCP协议 +- 提供健康检查接口 +- 支持优雅关闭 + +## 目录结构 + +``` +mcpTimeServer/ +├── main.go # 服务器主程序 +├── go.mod # Go模块定义 +├── install.sh # 安装脚本 +└── README.md # 项目说明文档 +``` + +## 安装与配置 + +### 前提条件 + +- Go 1.21或更高版本 + +### 安装步骤 + +1. 克隆项目或进入项目目录 + +2. 运行安装脚本: + ```bash + chmod +x install.sh + ./install.sh + ``` + + 安装脚本会检查Go环境、安装依赖并编译项目。 + +## 使用方法 + +### 启动服务器 + +```bash +./mcp_time_server +``` + +服务器启动后,会在8080端口监听HTTP请求。 + +## API说明 + +### MCP协议接口 + +#### 提交MCP请求 + +- **URL**: `/mcp/v1/submit` +- **方法**: `POST` +- **Content-Type**: `application/json` + +**请求体示例**: +```json +{ + "data": {"format": "2006-01-02 15:04:05"}, // 可选参数 + "type": "get_current_time", + "timestamp": 1699999999 +} +``` + +#### 可用工具 + +1. **get_current_time** + - **参数**: `format`(可选)- 时间格式字符串,使用Go的时间格式语法(如"2006-01-02 15:04:05") + - **返回结果**: + ```json + { + "current_time": "格式化的时间字符串", + "timestamp": 时间戳(Unix时间,秒) + } + ``` + +2. **subscribe_time_stream** + - **参数**: + - `interval`(可选)- 时间更新间隔(秒),默认为1秒 + - `format`(可选)- 时间格式字符串 + - **返回结果**: + ```json + { + "stream_id": "唯一的流标识符", + "sse_url": "http://localhost:8080/sse", + "interval": 更新间隔(秒) + } + ``` + +### SSE接口 + +- **URL**: `/sse` +- **方法**: `GET` +- **Content-Type**: `text/event-stream` + +连接后,服务器会以指定的间隔发送时间更新事件。 + +### 健康检查接口 + +- **URL**: `/health` +- **方法**: `GET` +- **返回**: 状态码200表示服务器正常运行 + +## 测试方法 + +服务器启动后,可以使用以下方式测试: + +1. **健康检查**: + ```bash + curl http://localhost:8080/health + ``` + +2. **获取当前时间**: + ```bash + curl -X POST http://localhost:8080/mcp/v1/submit -H 'Content-Type: application/json' -d '{"data":{},"type":"get_current_time","timestamp":'$(date +%s)'}' + ``` + +3. **使用自定义格式获取当前时间**: + ```bash + curl -X POST http://localhost:8080/mcp/v1/submit -H 'Content-Type: application/json' -d '{"data":{"format":"2006-01-02 15:04:05"},"type":"get_current_time","timestamp":'$(date +%s)'}' + ``` + +4. **订阅时间流**: + ```bash + curl http://localhost:8080/sse + ``` + 或直接在浏览器中访问 `http://localhost:8080/sse` + +## 依赖说明 + +- github.com/google/uuid v1.6.0:用于生成唯一标识符 + +## 注意事项 + +- 服务器默认监听在8080端口 +- 日志仅输出到控制台,不会写入文件 +- SSE连接会在客户端断开或服务器关闭时终止 + +## License + +MIT \ No newline at end of file diff --git a/mcpTimeServer/go.mod b/mcpTimeServer/go.mod new file mode 100644 index 0000000..458d85b --- /dev/null +++ b/mcpTimeServer/go.mod @@ -0,0 +1,5 @@ +module mcpTimeServer + +go 1.21 + +require github.com/google/uuid v1.6.0 diff --git a/mcpTimeServer/install.sh b/mcpTimeServer/install.sh new file mode 100644 index 0000000..df7862a --- /dev/null +++ b/mcpTimeServer/install.sh @@ -0,0 +1,71 @@ +#!/bin/bash + +# MCP Time Server 安装脚本 + +# 设置颜色 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # 无颜色 + +# 检查是否安装了Go +echo -e "${BLUE}检查Go环境...${NC}" +if ! command -v go &> /dev/null +then + echo -e "${RED}错误: 未安装Go。请先安装Go 1.21或更高版本。${NC}" + exit 1 +fi + +# 检查Go版本 +GO_VERSION=$(go version | awk '{print $3}' | sed 's/go//') +GO_MAJOR=$(echo $GO_VERSION | cut -d. -f1) +GO_MINOR=$(echo $GO_VERSION | cut -d. -f2) + +if [ $GO_MAJOR -lt 1 ] || ([ $GO_MAJOR -eq 1 ] && [ $GO_MINOR -lt 21 ]); then + echo -e "${RED}错误: Go版本过低 ($GO_VERSION)。请安装Go 1.21或更高版本。${NC}" + exit 1 +fi + +echo -e "${GREEN}已安装Go ($GO_VERSION)${NC}" + +# 检查当前目录 +echo -e "${BLUE}\n检查项目目录...${NC}" +if [ ! -f "main.go" ]; then + echo -e "${RED}错误: 请在包含main.go的项目根目录下运行此脚本。${NC}" + exit 1 +fi + +echo -e "${GREEN}项目目录正确${NC}" + +# 安装依赖 +echo -e "${BLUE}\n安装项目依赖...${NC}" +go mod tidy +if [ $? -ne 0 ]; then + echo -e "${RED}安装依赖失败,请检查网络连接。${NC}" + exit 1 +fi + +echo -e "${GREEN}依赖安装成功${NC}" + +# 编译项目 +echo -e "${BLUE}\n编译项目...${NC}" +go build -o mcp_time_server +if [ $? -ne 0 ]; then + echo -e "${RED}编译失败。${NC}" + exit 1 +fi + +chmod +x mcp_time_server +echo -e "${GREEN}编译成功,生成可执行文件: mcp_time_server${NC}" + +# 显示使用说明 +echo -e "\n${GREEN}安装完成!${NC}" +echo -e "${BLUE}\n使用说明:${NC}" +echo -e "1. 启动服务器: ./mcp_time_server" +echo -e "2. 服务器启动后,可以通过以下方式测试:" +echo -e " - 健康检查: curl http://localhost:8080/health" +echo -e " - 获取当前时间: curl -X POST http://localhost:8080/mcp/v1/submit -H 'Content-Type: application/json' -d '{\"data\":{},\"type\":\"get_current_time\",\"timestamp\":$(date +%s)}'" +echo -e " - 订阅时间流: 使用浏览器或SSE客户端访问 http://localhost:8080/sse" +echo -e "\n${YELLOW}注意: 服务器默认监听在8080端口。${NC}" +echo -e "\n${GREEN}MCP Time Server安装成功!${NC}" \ No newline at end of file diff --git a/mcpTimeServer/main.go b/mcpTimeServer/main.go new file mode 100644 index 0000000..98ec34b --- /dev/null +++ b/mcpTimeServer/main.go @@ -0,0 +1,385 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/google/uuid" +) + +// MCPRequest MCP请求结构体 +type MCPRequest struct { + Data interface{} `json:"data"` + Type string `json:"type"` + Metadata map[string]string `json:"metadata,omitempty"` + Timestamp int64 `json:"timestamp"` +} + +// MCPResponse MCP响应结构体 +type MCPResponse struct { + Success bool `json:"success"` + Message string `json:"message,omitempty"` + Data interface{} `json:"data,omitempty"` + RequestID string `json:"request_id,omitempty"` + Timestamp int64 `json:"timestamp"` +} + +// SSEClient SSE客户端连接管理 +type SSEClient struct { + ID string + Channel chan []byte +} + +// SSEManager SSE管理器 +type SSEManager struct { + clients map[string]*SSEClient + mutex sync.Mutex +} + +// NewSSEManager 创建新的SSE管理器 +func NewSSEManager() *SSEManager { + return &SSEManager{ + clients: make(map[string]*SSEClient), + } +} + +// AddClient 添加SSE客户端 +func (m *SSEManager) AddClient(clientID string) *SSEClient { + m.mutex.Lock() + defer m.mutex.Unlock() + + client := &SSEClient{ + ID: clientID, + Channel: make(chan []byte, 10), + } + m.clients[clientID] = client + + return client +} + +// RemoveClient 移除SSE客户端 +func (m *SSEManager) RemoveClient(clientID string) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if client, exists := m.clients[clientID]; exists { + close(client.Channel) + delete(m.clients, clientID) + } +} + +// Broadcast 广播消息给所有SSE客户端 +func (m *SSEManager) Broadcast(message []byte) { + m.mutex.Lock() + defer m.mutex.Unlock() + + for _, client := range m.clients { + select { + case client.Channel <- message: + default: + // 如果客户端通道已满,跳过 + } + } +} + +// 格式化时间,处理可能的格式错误 +func formatTime(t time.Time, format string) (string, error) { + if format == "" { + return t.Format(time.RFC3339), nil + } + + // 尝试使用自定义格式 + formatted := t.Format(format) + if formatted == format { + // 如果格式化后的结果与格式字符串相同,说明格式无效 + return "", errors.New("无效的时间格式") + } + + return formatted, nil +} + +// 处理获取当前时间请求 +func handleGetCurrentTime(data map[string]interface{}) (map[string]interface{}, error) { + // 获取当前时间 + now := time.Now() + + // 获取可选的格式参数 + format, _ := data["format"].(string) + + // 格式化时间 + formattedTime, err := formatTime(now, format) + if err != nil { + // 如果格式无效,使用默认格式 + formattedTime = now.Format(time.RFC3339) + } + + // 返回结果 + result := map[string]interface{}{ + "current_time": formattedTime, + "timestamp": now.Unix(), + } + + return result, nil +} + +// 处理订阅时间流请求 +func handleSubscribeTimeStream(data map[string]interface{}) (map[string]interface{}, error) { + // 生成唯一的流ID + streamID := uuid.New().String() + + // 获取可选的间隔参数(默认1秒) + interval := 1.0 + if intervalVal, ok := data["interval"].(float64); ok { + if intervalVal > 0 { + interval = intervalVal + } + } + + // 获取可选的格式参数 + format, _ := data["format"].(string) + + // 存储订阅信息(实际项目中可能需要更复杂的存储机制) + // 这里我们只是返回流信息,实际的SSE连接会在/sse端点建立 + + // 返回结果 + result := map[string]interface{}{ + "stream_id": streamID, + "sse_url": "http://localhost:8080/sse", + "interval": interval, + "format": format, + } + + return result, nil +} + +// 处理MCP请求提交 +func handleSubmit(w http.ResponseWriter, r *http.Request) { + // 检查请求方法 + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // 解析请求体 + var request MCPRequest + if err := json.NewDecoder(r.Body).Decode(&request); err != nil { + response := MCPResponse{ + Success: false, + Message: "Invalid request body", + Timestamp: time.Now().Unix(), + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + return + } + + // 生成请求ID + requestID := uuid.New().String() + + // 根据工具类型处理请求 + var result map[string]interface{} + var err error + switch request.Type { + case "get_current_time": + // 确保data是map类型 + dataMap, ok := request.Data.(map[string]interface{}) + if !ok { + dataMap = make(map[string]interface{}) + } + result, err = handleGetCurrentTime(dataMap) + case "subscribe_time_stream": + // 确保data是map类型 + dataMap, ok := request.Data.(map[string]interface{}) + if !ok { + dataMap = make(map[string]interface{}) + } + result, err = handleSubscribeTimeStream(dataMap) + default: + err = errors.New("未知的工具类型") + } + + // 构建响应 + response := MCPResponse{ + Success: err == nil, + Message: func() string { if err != nil { return err.Error() } return "" }(), + Data: result, + RequestID: requestID, + Timestamp: time.Now().Unix(), + } + + // 返回响应 + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + + // 记录日志 + slog.Debug("处理MCP请求", "request_id", requestID, "tool_type", request.Type, "success", err == nil) +} + +// 处理SSE连接 +func handleSSE(w http.ResponseWriter, r *http.Request, sseManager *SSEManager) { + // 设置响应头 + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // 获取或生成客户端ID + clientID := r.URL.Query().Get("client_id") + if clientID == "" { + clientID = uuid.New().String() + } + + // 添加客户端到管理器 + client := sseManager.AddClient(clientID) + defer sseManager.RemoveClient(clientID) + + // 发送初始连接确认事件 + fmt.Fprintf(w, "event: connected\ndata: {\"client_id\":\"%s\"}\n\n", clientID) + flusher, ok := w.(http.Flusher) + if !ok { + slog.Error("不支持SSE", "client_id", clientID) + return + } + flusher.Flush() + + // 获取时间格式参数(如果有) + format := r.URL.Query().Get("format") + + // 获取时间间隔参数(如果有) + interval := 1.0 + if intervalStr := r.URL.Query().Get("interval"); intervalStr != "" { + fmt.Sscanf(intervalStr, "%f", &interval) + if interval <= 0 { + interval = 1.0 + } + } + + // 创建上下文用于取消操作 + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + // 创建定时器 + ticker := time.NewTicker(time.Duration(interval * float64(time.Second))) + defer ticker.Stop() + + slog.Info("SSE连接已建立", "client_id", clientID, "interval", interval, "format", format) + + // 发送时间更新 + for { + select { + case <-ctx.Done(): + // 客户端断开连接 + slog.Info("SSE连接已断开", "client_id", clientID) + return + case now := <-ticker.C: + // 格式化时间 + var formattedTime string + if format == "" { + formattedTime = now.Format(time.RFC3339) + } else { + var err error + formattedTime, err = formatTime(now, format) + if err != nil { + // 如果格式无效,使用默认格式 + formattedTime = now.Format(time.RFC3339) + } + } + + // 创建时间更新事件 + timeData := map[string]interface{}{ + "current_time": formattedTime, + "timestamp": now.Unix(), + "interval": interval, + } + + // 转换为JSON + dataJSON, _ := json.Marshal(timeData) + + // 发送SSE事件 + fmt.Fprintf(w, "event: time_update\ndata: %s\n\n", string(dataJSON)) + flusher.Flush() + slog.Debug("发送SSE时间更新", "client_id", clientID, "time", formattedTime) + case message := <-client.Channel: + // 发送广播消息 + fmt.Fprintf(w, "event: broadcast\ndata: %s\n\n", string(message)) + flusher.Flush() + } + } +} + +// handleHealth 处理健康检查请求 +func handleHealth(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) +} + +func main() { + // 设置slog日志,仅输出到控制台,不写入文件 + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + slog.SetDefault(logger) + + logger.Info("MCP Time Server 启动") + + // 创建SSE管理器 + sseManager := NewSSEManager() + + // 创建路由器 + r := http.NewServeMux() + + // 注册健康检查端点 + r.HandleFunc("/health", handleHealth) + + // 注册MCP提交端点 + r.HandleFunc("/mcp/v1/submit", handleSubmit) + + // 注册SSE端点 + r.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) { + handleSSE(w, r, sseManager) + }) + + // 创建HTTP服务器 + srv := &http.Server{ + Addr: ":8080", + Handler: r, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 120 * time.Second, + } + + // 启动服务器 + go func() { + logger.Info("MCP Time Server 启动成功", "address", ":8080") + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("服务器启动失败", "error", err) + os.Exit(1) + } + }() + + // 等待中断信号 + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + + logger.Info("MCP Time Server 正在关闭...") + + // 创建超时上下文 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // 优雅关闭服务器 + if err := srv.Shutdown(ctx); err != nil { + logger.Error("服务器关闭失败", "error", err) + os.Exit(1) + } + + logger.Info("MCP Time Server 已安全关闭") +} \ No newline at end of file diff --git a/mcpTimeServer/mcp_client_example b/mcpTimeServer/mcp_client_example new file mode 100755 index 0000000..42f1836 Binary files /dev/null and b/mcpTimeServer/mcp_client_example differ diff --git a/mcpTimeServer/mcp_client_example.go b/mcpTimeServer/mcp_client_example.go new file mode 100644 index 0000000..7e4d62d --- /dev/null +++ b/mcpTimeServer/mcp_client_example.go @@ -0,0 +1,190 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" +) + +// 这里使用通用的map类型表示请求和响应,避免与main.go中的结构体冲突 + +func main() { + // 服务器地址 + serverURL := "http://localhost:8080" + + // 1. 测试健康检查 + testHealthCheck(serverURL) + + // 2. 测试获取当前时间(默认格式) + testGetCurrentTime(serverURL, "") + + // 3. 测试获取当前时间(自定义格式) + testGetCurrentTime(serverURL, "2006-01-02 15:04:05") + + // 4. 测试订阅时间流 + testSubscribeTimeStream(serverURL) +} + +// 测试健康检查 +func testHealthCheck(serverURL string) { + url := fmt.Sprintf("%s/health", serverURL) + resp, err := http.Get(url) + if err != nil { + fmt.Printf("健康检查请求失败: %v\n", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + fmt.Println("✅ 健康检查成功: 服务器正常运行") + } else { + fmt.Printf("❌ 健康检查失败: 状态码 %d\n", resp.StatusCode) + } +} + +// 测试获取当前时间 +func testGetCurrentTime(serverURL string, format string) { + url := fmt.Sprintf("%s/mcp/v1/submit", serverURL) + + // 构建请求体 + requestBody := map[string]interface{}{ + "data": map[string]interface{}{}, + "type": "get_current_time", + "timestamp": time.Now().Unix(), + } + + // 添加格式参数(如果提供) + if format != "" { + data := requestBody["data"].(map[string]interface{}) + data["format"] = format + } + + // 序列化请求体 + jsonData, err := json.Marshal(requestBody) + if err != nil { + fmt.Printf("序列化请求体失败: %v\n", err) + return + } + + // 发送POST请求 + resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + fmt.Printf("发送请求失败: %v\n", err) + return + } + defer resp.Body.Close() + + // 读取响应 + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("读取响应失败: %v\n", err) + return + } + + // 解析响应为map + var response map[string]interface{} + if err := json.Unmarshal(body, &response); err != nil { + fmt.Printf("解析响应失败: %v\n", err) + return + } + + // 检查响应状态 + success, ok := response["success"].(bool) + if !ok || !success { + message := "未知错误" + if msg, ok := response["message"].(string); ok { + message = msg + } + fmt.Printf("❌ 获取当前时间失败: %s\n", message) + return + } + + // 提取数据 + data, ok := response["data"].(map[string]interface{}) + if !ok { + fmt.Println("❌ 响应数据格式错误") + return + } + + // 获取时间信息 + currentTime, _ := data["current_time"].(string) + timestamp, _ := data["timestamp"].(float64) + fmt.Printf("✅ 获取当前时间成功 (格式: %s):\n", format) + fmt.Printf(" 时间: %s\n", currentTime) + fmt.Printf(" 时间戳: %.0f\n", timestamp) +} + +// 测试订阅时间流 +func testSubscribeTimeStream(serverURL string) { + url := fmt.Sprintf("%s/mcp/v1/submit", serverURL) + + // 构建请求体 + requestBody := map[string]interface{}{ + "data": map[string]interface{}{ + "interval": 2, // 每2秒更新一次 + "format": "2006-01-02 15:04:05", + }, + "type": "subscribe_time_stream", + "timestamp": time.Now().Unix(), + } + + // 序列化请求体 + jsonData, err := json.Marshal(requestBody) + if err != nil { + fmt.Printf("序列化请求体失败: %v\n", err) + return + } + + // 发送POST请求 + resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + fmt.Printf("发送请求失败: %v\n", err) + return + } + defer resp.Body.Close() + + // 读取响应 + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("读取响应失败: %v\n", err) + return + } + + // 解析响应为map + var response map[string]interface{} + if err := json.Unmarshal(body, &response); err != nil { + fmt.Printf("解析响应失败: %v\n", err) + return + } + + // 检查响应状态 + success, ok := response["success"].(bool) + if !ok || !success { + message := "未知错误" + if msg, ok := response["message"].(string); ok { + message = msg + } + fmt.Printf("❌ 订阅时间流失败: %s\n", message) + return + } + + // 提取数据 + data, ok := response["data"].(map[string]interface{}) + if !ok { + fmt.Println("❌ 响应数据格式错误") + return + } + + // 获取流信息 + streamID, _ := data["stream_id"].(string) + sseURL, _ := data["sse_url"].(string) + interval, _ := data["interval"].(float64) + fmt.Printf("✅ 订阅时间流成功:\n") + fmt.Printf(" 流ID: %s\n", streamID) + fmt.Printf(" SSE URL: %s\n", sseURL) + fmt.Printf(" 更新间隔: %.0f秒\n", interval) + fmt.Println(" 提示: 可以使用curl或浏览器访问SSE URL来接收实时时间更新") +} \ No newline at end of file diff --git a/mcpTimeServer/mcp_time_server b/mcpTimeServer/mcp_time_server new file mode 100755 index 0000000..9105baf Binary files /dev/null and b/mcpTimeServer/mcp_time_server differ diff --git a/mcptime/go.mod b/mcptime/go.mod new file mode 100644 index 0000000..2c3386f --- /dev/null +++ b/mcptime/go.mod @@ -0,0 +1,17 @@ +module mcptime + +go 1.25.0 + +require github.com/mark3labs/mcp-go v0.39.1 + +require ( + github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/invopop/jsonschema v0.13.0 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/spf13/cast v1.7.1 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect + github.com/yosida95/uritemplate/v3 v3.0.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/mcptime/main.go b/mcptime/main.go new file mode 100644 index 0000000..60751af --- /dev/null +++ b/mcptime/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" +) + +func main() { + s := server.NewMCPServer( + "Time Service", + "1.0.0", + server.WithResourceCapabilities(true, true), + server.WithLogging(), + ) + + tool_time := mcp.NewTool("get_current_time", + mcp.WithDescription("获取当前系统时间,返回格式:yyyy-MM-dd HH:mm:ss"), + ) + s.AddTool(tool_time, currentTimeHandler) + + tool_date := mcp.NewTool("get_current_date", + mcp.WithDescription("get current date,output format:yyyy-MM-dd"), + ) + s.AddTool(tool_date, currentDateHandler) + + if err := server.ServeStdio(s); err != nil { + fmt.Printf("Server start fail: %v\n", err) + } else { + log.Println("Time MCP start") + } +} + +func currentTimeHandler(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { + currentTime := time.Now().Format("2006-01-02 15:04:05") + return mcp.NewToolResultText(fmt.Sprintf("%s", currentTime)), nil +} + +func currentDateHandler(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { + currentTime := time.Now().Format("2006-01-02") + return mcp.NewToolResultText(fmt.Sprintf("%s", currentTime)), nil +}