#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import subprocess import sys import os import datetime import re import json class DiskInspection: def __init__(self, device="/dev/sda", controller_count=8, log_dir="/root"): self.device = device self.controller_count = controller_count self.log_dir = log_dir self.current_date = datetime.datetime.now().strftime("%y%m%d") self.log_file = os.path.join(log_dir, f"smartctl.{self.current_date}.log") self.md_report = os.path.join(log_dir, f"disk_inspection_report.{self.current_date}.md") self.results = {} def run_smartctl(self, controller_id): """执行smartctl命令检查指定控制器的硬盘信息""" try: command = f"smartctl --all -d megaraid,{controller_id} {self.device}" result = subprocess.run( command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) # 保存到日志文件 with open(self.log_file, "a") as f: f.write(f"\n\n===== Controller {controller_id} =====\n") f.write(result.stdout) return result.stdout except subprocess.CalledProcessError as e: error_msg = f"执行smartctl命令失败 (Controller {controller_id}): {e.stderr}\n" print(error_msg) # 将错误信息也写入日志 with open(self.log_file, "a") as f: f.write(f"\n\n===== Controller {controller_id} (Error) =====\n") f.write(error_msg) return None def parse_smart_info(self, output, controller_id): """解析smartctl输出的信息""" if not output: return {"error": "No data available"} parsed_info = {} # 提取基本信息 device_info_match = re.search(r"Device Model:\s+(.*)", output) if device_info_match: parsed_info["device_model"] = device_info_match.group(1) serial_match = re.search(r"Serial Number:\s+(.*)", output) if serial_match: parsed_info["serial_number"] = serial_match.group(1) firmware_match = re.search(r"Firmware Version:\s+(.*)", output) if firmware_match: parsed_info["firmware_version"] = firmware_match.group(1) # 提取SMART健康状态 health_match = re.search(r"SMART overall-health status:\s+(.*)", output) if health_match: parsed_info["health_status"] = health_match.group(1) # 提取温度信息 temp_match = re.search(r"Temperature_Celsius\s+([0-9]+)", output) if temp_match: parsed_info["temperature"] = temp_match.group(1) # 提取重要的SMART属性 important_attributes = {} attributes_section = re.search(r"ID#\s+ATTRIBUTE_NAME\s+.*\n(.*?)(\n\s*\n|$)", output, re.DOTALL) if attributes_section: lines = attributes_section.group(1).strip().split("\n") for line in lines: parts = re.split(r"\s+", line.strip()) if len(parts) >= 10: attr_id = parts[0] attr_name = parts[1] value = parts[3] worst = parts[4] threshold = parts[5] # 关注重要的属性 critical_attrs = ["Reallocated_Sector_Ct", "Spin_Retry_Count", "End-to-End_Error", "CRC_Error_Count", "Multi_Zone_Error_Rate"] if attr_name in critical_attrs: important_attributes[attr_name] = { "value": value, "worst": worst, "threshold": threshold } if important_attributes: parsed_info["important_attributes"] = important_attributes return parsed_info def generate_md_report(self): """生成Markdown格式的报告""" with open(self.md_report, "w") as f: f.write(f"# 硬盘巡检报告\n\n") f.write(f"**检查日期**: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"**设备**: {self.device}\n\n") f.write("## 控制器检查结果\n\n") # 添加健康状态概览 healthy_count = sum(1 for cid, info in self.results.items() if "health_status" in info and info["health_status"] == "OK") f.write(f"### 健康状态概览\n") f.write(f"- 总控制器数: {self.controller_count}\n") f.write(f"- 健康控制器数: {healthy_count}\n") f.write(f"- 异常控制器数: {self.controller_count - healthy_count}\n\n") # 详细信息表格 f.write("### 详细信息\n") f.write("| 控制器ID | 设备型号 | 序列号 | 固件版本 | 健康状态 | 温度(°C) |\n") f.write("|----------|----------|--------|----------|----------|----------|\n") for controller_id in range(self.controller_count): info = self.results.get(controller_id, {}) f.write(f"| {controller_id} ") f.write(f"| {info.get('device_model', 'N/A')} ") f.write(f"| {info.get('serial_number', 'N/A')} ") f.write(f"| {info.get('firmware_version', 'N/A')} ") f.write(f"| {info.get('health_status', 'N/A')} ") f.write(f"| {info.get('temperature', 'N/A')} |\n") f.write("\n## 异常详情\n\n") # 添加异常详情 has_issues = False for controller_id in range(self.controller_count): info = self.results.get(controller_id, {}) if "health_status" in info and info["health_status"] != "OK": has_issues = True f.write(f"### 控制器 {controller_id}\n") f.write(f"- 健康状态: **{info['health_status']}**\n") if "important_attributes" in info: f.write("- 关键属性异常:\n") for attr_name, attr_info in info["important_attributes"].items(): f.write(f" - {attr_name}: 值={attr_info['value']}, 最差={attr_info['worst']}, 阈值={attr_info['threshold']}\n") f.write("\n") if not has_issues: f.write("暂无异常情况。\n") f.write("\n## 建议\n\n") if has_issues: f.write("1. 请关注异常控制器的状态变化\n") f.write("2. 建议备份重要数据\n") f.write("3. 必要时考虑更换有问题的硬盘\n") else: f.write("1. 保持定期检查硬盘健康状态\n") f.write("2. 继续做好数据备份工作\n") f.write("3. 注意监控系统温度变化\n") print(f"Markdown报告已生成: {self.md_report}") return self.md_report def submit_to_mcp(self, report_path): """将报告提交给AI分析(通过MCP服务器)""" try: # 读取报告内容 with open(report_path, 'r') as f: report_content = f.read() # 构建提交给MCP的数据 mcp_data = { "type": "disk_inspection_report", "timestamp": datetime.datetime.now().isoformat(), "content": report_content } print("正在准备通过MCP提交报告给AI分析...") # 创建一个临时JSON文件,用于存储要提交的数据 temp_json = os.path.join(self.log_dir, f"mcp_data_{self.current_date}.json") with open(temp_json, 'w') as f: json.dump(mcp_data, f, ensure_ascii=False, indent=2) print(f"MCP数据已准备好: {temp_json}") # 尝试使用requests库直接向MCP服务器发送请求 try: import requests # MCP服务器配置 mcp_server_url = "http://localhost:8080/mcp/v1/submit" print(f"正在向MCP服务器发送请求: {mcp_server_url}") response = requests.post( mcp_server_url, json=mcp_data, headers={"Content-Type": "application/json"} ) if response.status_code == 200: result = response.json() print("MCP提交成功!") print(f"AI分析结果: {result.get('result', {}).get('analysis', '无分析结果')}") # 保存AI分析结果到文件 analysis_file = os.path.join(self.log_dir, f"ai_analysis_{self.current_date}.json") with open(analysis_file, 'w') as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f"AI分析结果已保存: {analysis_file}") else: print(f"MCP服务器请求失败: HTTP {response.status_code}") print(f"响应内容: {response.text}") print("请检查MCP服务器是否正常运行") except ImportError: print("未安装requests库,无法直接发送HTTP请求") print("请运行: pip install requests 来安装必要的依赖") print("或者使用以下命令手动提交数据:") print(f"curl -X POST -H 'Content-Type: application/json' -d @{temp_json} http://localhost:8080/mcp/v1/submit") except Exception as e: print(f"发送请求到MCP服务器时出错: {str(e)}") print("请检查MCP服务器是否正常运行") print("或者使用以下命令手动提交数据:") print(f"curl -X POST -H 'Content-Type: application/json' -d @{temp_json} http://localhost:8080/mcp/v1/submit") return True except Exception as e: print(f"提交MCP失败: {str(e)}") return False def run_inspection(self): """运行完整的硬盘巡检流程""" print(f"开始硬盘巡检,设备: {self.device}") print(f"日志文件: {self.log_file}") # 清空日志文件(如果存在) if os.path.exists(self.log_file): open(self.log_file, 'w').close() # 检查每个控制器 for controller_id in range(self.controller_count): print(f"检查控制器 {controller_id}...") output = self.run_smartctl(controller_id) if output: self.results[controller_id] = self.parse_smart_info(output, controller_id) # 生成MD报告 report_path = self.generate_md_report() # 提交给MCP self.submit_to_mcp(report_path) print("硬盘巡检完成!") if __name__ == "__main__": # 检查是否以root权限运行 if os.geteuid() != 0: print("错误:请以root权限运行此脚本") sys.exit(1) # 检查是否安装了smartctl try: subprocess.run("which smartctl", shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.CalledProcessError: print("错误:未找到smartctl工具,请先安装") print("Ubuntu/Debian: apt install smartmontools") print("CentOS/RHEL: yum install smartmontools") sys.exit(1) # 创建并运行巡检实例 inspector = DiskInspection() inspector.run_inspection()