Files
deploy.stack/crontab/disk_inspection.py
cnphpbb 8333ccdea9 feat(disk_inspection): 增强硬盘巡检功能并优化报告格式
- 添加对smartctl日志格式的适配,支持更多设备信息提取
- 重构报告生成逻辑,增加详细设备信息和错误统计
- 优化MCP服务器提交数据的格式和错误处理
- 添加配置常量集中管理参数
- 更新.gitignore忽略smartctl日志文件
2025-09-11 11:59:05 +08:00

362 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# 配置项
DEFAULT_DEVICE = "/dev/sda" # 默认检查的设备路径
DEFAULT_CONTROLLER_COUNT = 8 # 默认控制器数量
DEFAULT_LOG_DIR = "/root" # 默认日志目录
MCP_SERVER_URL = "http://10.10.13.143:4527/mcp/v1/submit" # MCP服务器提交接口
CRITICAL_ATTRIBUTES = ["Reallocated_Sector_Ct", "Spin_Retry_Count",
"End-to-End_Error", "CRC_Error_Count",
"Multi_Zone_Error_Rate"] # 关注的重要SMART属性
import subprocess
import sys
import os
import datetime
import re
import json
import requests
class DiskInspection:
def __init__(self, device=DEFAULT_DEVICE, controller_count=DEFAULT_CONTROLLER_COUNT, log_dir=DEFAULT_LOG_DIR):
self.device = device
self.controller_count = controller_count
self.log_dir = log_dir
self.current_date = datetime.datetime.now().strftime("%y%m%d")
self.log_file = os.path.join(log_dir, f"smartctl.{self.current_date}.log")
self.md_report = os.path.join(log_dir, f"disk_inspection_report.{self.current_date}.md")
self.results = {}
def run_smartctl(self, controller_id):
"""执行smartctl命令检查指定控制器的硬盘信息"""
try:
command = f"smartctl --all -d megaraid,{controller_id} {self.device}"
result = subprocess.run(
command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True
)
# 保存到日志文件
with open(self.log_file, "a") as f:
f.write(f"\n\n===== Controller {controller_id} =====\n")
f.write(result.stdout)
return result.stdout
except subprocess.CalledProcessError as e:
error_msg = f"执行smartctl命令失败 (Controller {controller_id}): {e.stderr}\n"
print(error_msg)
# 将错误信息也写入日志
with open(self.log_file, "a") as f:
f.write(f"\n\n===== Controller {controller_id} (Error) =====\n")
f.write(error_msg)
return None
def parse_smart_info(self, output, controller_id):
"""解析smartctl输出的信息根据实际smartctl_log格式调整"""
if not output:
return {"error": "No data available"}
parsed_info = {}
# 提取基本信息
vendor_match = re.search(r"Vendor:\s+(.*)", output)
if vendor_match:
parsed_info["vendor"] = vendor_match.group(1)
product_match = re.search(r"Product:\s+(.*)", output)
if product_match:
parsed_info["device_model"] = product_match.group(1)
revision_match = re.search(r"Revision:\s+(.*)", output)
if revision_match:
parsed_info["firmware_version"] = revision_match.group(1)
serial_match = re.search(r"Serial number:\s+(.*)", output)
if serial_match:
parsed_info["serial_number"] = serial_match.group(1)
# 提取容量信息
capacity_match = re.search(r"User Capacity:\s+(.*?)\s+bytes", output)
if capacity_match:
parsed_info["capacity"] = capacity_match.group(1)
# 提取旋转速率
rotation_match = re.search(r"Rotation Rate:\s+(.*)", output)
if rotation_match:
parsed_info["rotation_rate"] = rotation_match.group(1)
# 提取SMART健康状态
health_match = re.search(r"SMART Health Status:\s+(.*)", output)
if health_match:
parsed_info["health_status"] = health_match.group(1)
# 提取温度信息
temp_match = re.search(r"Current Drive Temperature:\s+([0-9]+)", output)
if temp_match:
parsed_info["temperature"] = temp_match.group(1)
# 提取通电时间
power_on_match = re.search(r"Accumulated power on time, hours:minutes\s+([0-9:]+)", output)
if power_on_match:
parsed_info["power_on_time"] = power_on_match.group(1)
# 提取制造日期
manufactured_match = re.search(r"Manufactured in week (\d+) of year (\d+)", output)
if manufactured_match:
parsed_info["manufactured_date"] = f"{manufactured_match.group(2)}-W{manufactured_match.group(1)}"
# 提取错误计数信息
error_read_match = re.search(r"read:\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+)", output)
if error_read_match:
parsed_info["read_errors"] = {
"corrected": error_read_match.group(4),
"uncorrected": error_read_match.group(7),
"data_processed": f"{error_read_match.group(6)} GB"
}
error_write_match = re.search(r"write:\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+)", output)
if error_write_match:
parsed_info["write_errors"] = {
"corrected": error_write_match.group(4),
"uncorrected": error_write_match.group(7),
"data_processed": f"{error_write_match.group(6)} GB"
}
# 提取SMART自检日志信息
self_test_match = re.search(r"SMART Self-test log\nNum\s+Test\s+Status\s+segment\s+LifeTime\s+LBA_first_err \[SK ASC ASQ\]\n(.*?)(\n\s*\n|$)", output, re.DOTALL)
if self_test_match:
test_results = []
lines = self_test_match.group(1).strip().split("\n")
for line in lines:
if line.strip() and not line.startswith(" "):
parts = re.split(r"\s+", line.strip())
if len(parts) >= 5:
test_info = {
"id": parts[0],
"type": parts[1],
"status": parts[2],
"lifetime_hours": parts[4]
}
test_results.append(test_info)
if test_results:
parsed_info["self_test_results"] = test_results
return parsed_info
def generate_md_report(self):
"""生成Markdown格式的报告根据实际smartctl_log格式调整"""
with open(self.md_report, "w") as f:
f.write(f"# 硬盘巡检报告\n\n")
f.write(f"**检查日期**: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"**设备**: {self.device}\n\n")
f.write("## 控制器检查结果\n\n")
# 添加健康状态概览
healthy_count = sum(1 for cid, info in self.results.items()
if "health_status" in info and info["health_status"] == "OK")
detected_count = sum(1 for cid, info in self.results.items()
if "health_status" in info)
f.write(f"### 健康状态概览\n")
f.write(f"- 总控制器数: {self.controller_count}\n")
f.write(f"- 检测到的控制器数: {detected_count}\n")
f.write(f"- 健康控制器数: {healthy_count}\n")
f.write(f"- 异常控制器数: {detected_count - healthy_count}\n\n")
# 详细信息表格 - 简化版
f.write("### 基本信息概览\n")
f.write("| 控制器ID | 厂商 | 设备型号 | 序列号 | 健康状态 | 温度(°C) | 通电时间 |\n")
f.write("|----------|------|----------|--------|----------|----------|----------|\n")
for controller_id in range(self.controller_count):
info = self.results.get(controller_id, {})
if "health_status" in info: # 只显示有数据的控制器
f.write(f"| {controller_id} ")
f.write(f"| {info.get('vendor', 'N/A')} ")
f.write(f"| {info.get('device_model', 'N/A')} ")
f.write(f"| {info.get('serial_number', 'N/A')} ")
f.write(f"| {info.get('health_status', 'N/A')} ")
f.write(f"| {info.get('temperature', 'N/A')} ")
f.write(f"| {info.get('power_on_time', 'N/A')} |\n")
# 为每个控制器添加详细信息部分
f.write("\n## 控制器详细信息\n\n")
has_issues = False
for controller_id in range(self.controller_count):
info = self.results.get(controller_id, {})
if "health_status" in info: # 只显示有数据的控制器
f.write(f"### 控制器 {controller_id}\n")
f.write(f"- **厂商**: {info.get('vendor', 'N/A')}\n")
f.write(f"- **设备型号**: {info.get('device_model', 'N/A')}\n")
f.write(f"- **序列号**: {info.get('serial_number', 'N/A')}\n")
f.write(f"- **固件版本**: {info.get('firmware_version', 'N/A')}\n")
f.write(f"- **容量**: {info.get('capacity', 'N/A')}\n")
f.write(f"- **旋转速率**: {info.get('rotation_rate', 'N/A')}\n")
f.write(f"- **制造日期**: {info.get('manufactured_date', 'N/A')}\n")
f.write(f"- **健康状态**: **{info.get('health_status', 'N/A')}**\n")
f.write(f"- **当前温度**: {info.get('temperature', 'N/A')}°C\n")
f.write(f"- **累计通电时间**: {info.get('power_on_time', 'N/A')}\n")
# 错误计数信息
if "read_errors" in info:
read_err = info["read_errors"]
f.write("- **读取错误**:\n")
f.write(f" - 已纠正: {read_err['corrected']}\n")
f.write(f" - 未纠正: {read_err['uncorrected']}\n")
f.write(f" - 处理数据量: {read_err['data_processed']}\n")
if "write_errors" in info:
write_err = info["write_errors"]
f.write("- **写入错误**:\n")
f.write(f" - 已纠正: {write_err['corrected']}\n")
f.write(f" - 未纠正: {write_err['uncorrected']}\n")
f.write(f" - 处理数据量: {write_err['data_processed']}\n")
# SMART自检日志
if "self_test_results" in info:
f.write("- **SMART自检结果**:\n")
for test in info["self_test_results"]:
f.write(f" - 测试 #{test['id']}: {test['type']} - {test['status']} (运行时间: {test['lifetime_hours']}小时)\n")
# 检查是否有异常
if info.get("health_status", "") != "OK":
has_issues = True
f.write("\n**⚠️ 警告:此控制器状态异常,请及时关注!**\n")
f.write("\n")
f.write("\n## 异常汇总\n\n")
if has_issues:
f.write("**发现异常控制器,请关注以下问题:**\n\n")
for controller_id in range(self.controller_count):
info = self.results.get(controller_id, {})
if info.get("health_status", "") != "OK":
f.write(f"- **控制器 {controller_id}**: 状态为 {info.get('health_status', '未知')}\n")
f.write("\n")
else:
f.write("**未发现异常情况。**\n\n")
f.write("\n## 建议\n\n")
if has_issues:
f.write("1. **立即关注异常控制器**的状态变化,并考虑进行进一步测试\n")
f.write("2. **备份重要数据**,防止因硬盘故障导致数据丢失\n")
f.write("3. **联系技术支持**,评估是否需要更换有问题的硬盘\n")
f.write("4. 增加监控频率,密切关注错误计数的变化趋势\n")
else:
f.write("1. **保持定期检查**硬盘健康状态,建议每两周至少检查一次\n")
f.write("2. **继续做好数据备份工作**,采用异地备份和多版本策略\n")
f.write("3. **注意监控系统温度变化**,保持良好的散热环境\n")
f.write("4. 记录并跟踪硬盘的通电时间和错误计数趋势\n")
print(f"Markdown报告已生成: {self.md_report}")
return self.md_report
def submit_to_mcp(self, report_path):
"""将报告提交给AI分析通过MCP服务器"""
try:
# 读取报告内容
with open(report_path, 'r') as f:
report_content = f.read()
# 构建提交给MCP的数据 - 按照服务器要求的格式timestamp应为Unix时间戳整数
mcp_data = {
"data": {
"type": "disk_inspection_report",
"content": report_content
},
"type": "disk_inspection",
"timestamp": int(datetime.datetime.now().timestamp())
}
print("正在准备通过MCP提交报告给AI分析...")
# 创建一个临时JSON文件用于存储要提交的数据
temp_json = os.path.join(self.log_dir, f"mcp_data_{self.current_date}.json")
with open(temp_json, 'w') as f:
json.dump(mcp_data, f, ensure_ascii=False, indent=2)
print(f"MCP数据已准备好: {temp_json}")
# 尝试使用requests库直接向MCP服务器发送请求
try:
# MCP服务器配置
print(f"正在向MCP服务器发送请求: {MCP_SERVER_URL}")
response = requests.post(
MCP_SERVER_URL,
json=mcp_data,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
print("MCP提交成功")
print(f"AI分析结果: {result.get('result', {}).get('analysis', '无分析结果')}")
# 保存AI分析结果到文件
analysis_file = os.path.join(self.log_dir, f"ai_analysis_{self.current_date}.json")
with open(analysis_file, 'w') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"AI分析结果已保存: {analysis_file}")
else:
print(f"MCP服务器请求失败: HTTP {response.status_code}")
print(f"响应内容: {response.text}")
print("请检查MCP服务器是否正常运行")
except ImportError:
print("未安装requests库无法直接发送HTTP请求")
print("请运行: pip install requests 来安装必要的依赖")
print("或者使用以下命令手动提交数据:")
print(f"curl -X POST -H 'Content-Type: application/json' -d @{temp_json} {MCP_SERVER_URL}")
except Exception as e:
print(f"发送请求到MCP服务器时出错: {str(e)}")
print("请检查MCP服务器是否正常运行")
print("或者使用以下命令手动提交数据:")
print(f"curl -X POST -H 'Content-Type: application/json' -d @{temp_json} {MCP_SERVER_URL}")
return True
except Exception as e:
print(f"提交MCP失败: {str(e)}")
return False
def run_inspection(self):
"""运行完整的硬盘巡检流程"""
print(f"开始硬盘巡检,设备: {self.device}")
print(f"日志文件: {self.log_file}")
# 清空日志文件(如果存在)
if os.path.exists(self.log_file):
open(self.log_file, 'w').close()
# 检查每个控制器
for controller_id in range(self.controller_count):
print(f"检查控制器 {controller_id}...")
output = self.run_smartctl(controller_id)
if output:
self.results[controller_id] = self.parse_smart_info(output, controller_id)
# 生成MD报告
report_path = self.generate_md_report()
# 提交给MCP
self.submit_to_mcp(report_path)
print("硬盘巡检完成!")
if __name__ == "__main__":
# 检查是否以root权限运行
if os.geteuid() != 0:
print("错误请以root权限运行此脚本")
sys.exit(1)
# 检查是否安装了smartctl
try:
subprocess.run("which smartctl", shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
print("错误未找到smartctl工具请先安装")
print("Ubuntu/Debian: apt install smartmontools")
print("CentOS/RHEL: yum install smartmontools")
sys.exit(1)
# 创建并运行巡检实例
inspector = DiskInspection()
inspector.run_inspection()