Files
deploy.stack/crontab/disk_inspection.py

464 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# 配置项
DEFAULT_DEVICE = "/dev/sda" # 默认检查的设备路径
DEFAULT_MEGARAID_BINEG = 8 # megaraid起始值
DEFAULT_MEGARAID_END = 15 # megaraid结束值
DEFAULT_LOG_DIR = "/root" # 默认日志目录
MCP_SERVER_URL = "http://10.10.13.143:4527/mcp/v1/submit" # MCP服务器提交接口
CRITICAL_ATTRIBUTES = ["Reallocated_Sector_Ct", "Spin_Retry_Count",
"End-to-End_Error", "CRC_Error_Count",
"Multi_Zone_Error_Rate"] # 关注的重要SMART属性
import subprocess
import sys
import os
import datetime
import re
import json
import requests
class DiskInspection:
def __init__(self, device=DEFAULT_DEVICE, controller_count=DEFAULT_MEGARAID_BINEG, log_dir=DEFAULT_LOG_DIR):
self.device = device
self.controller_count = controller_count
self.megaraid_count = DEFAULT_MEGARAID_END + 1 # 默认megaraid控制器数量
self.log_dir = log_dir
self.current_date = datetime.datetime.now().strftime("%y%m%d")
self.log_file = os.path.join(log_dir, f"smartctl.{self.current_date}.log")
self.md_report = os.path.join(log_dir, f"disk_inspection_report.{self.current_date}.md")
self.results = {}
def run_smartctl(self, controller_id):
"""执行smartctl命令检查指定控制器的硬盘信息"""
try:
command = f"smartctl --all -d megaraid,{controller_id} {self.device}"
result = subprocess.run(
command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True
)
# 保存到日志文件
with open(self.log_file, "a") as f:
f.write(f"\n\n===== Controller {controller_id} =====\n")
f.write(result.stdout)
return result.stdout
except subprocess.CalledProcessError as e:
error_msg = f"执行smartctl命令失败 (Controller {controller_id}): {e.stderr}\n"
print(error_msg)
# 将错误信息也写入日志
with open(self.log_file, "a") as f:
f.write(f"\n\n===== Controller {controller_id} (Error) =====\n")
f.write(error_msg)
return None
def parse_smart_info(self, output, controller_id):
"""解析smartctl输出的信息兼容SAS和SSD硬盘格式"""
if not output:
return {"error": "No data available"}
parsed_info = {}
# 提取基本信息 - 兼容SAS和SSD格式
# 厂商信息 (SAS格式)
vendor_match = re.search(r"Vendor:\s+(.*)", output)
if vendor_match:
parsed_info["vendor"] = vendor_match.group(1)
# 产品型号 (SAS格式)
product_match = re.search(r"Product:\s+(.*)", output)
if product_match:
parsed_info["device_model"] = product_match.group(1)
# 设备型号 (SSD格式)
elif not parsed_info.get("device_model"):
device_model_match = re.search(r"Device Model:\s+(.*)", output)
if device_model_match:
parsed_info["device_model"] = device_model_match.group(1)
# 固件版本 (SAS格式)
revision_match = re.search(r"Revision:\s+(.*)", output)
if revision_match:
parsed_info["firmware_version"] = revision_match.group(1)
# 固件版本 (SSD格式)
elif not parsed_info.get("firmware_version"):
firmware_match = re.search(r"Firmware Version:\s+(.*)", output)
if firmware_match:
parsed_info["firmware_version"] = firmware_match.group(1)
# 序列号 (SAS格式)
serial_match = re.search(r"Serial number:\s+(.*)", output)
if serial_match:
parsed_info["serial_number"] = serial_match.group(1)
# 序列号 (SSD格式)
elif not parsed_info.get("serial_number"):
serial_ssd_match = re.search(r"Serial Number:\s+(.*)", output)
if serial_ssd_match:
parsed_info["serial_number"] = serial_ssd_match.group(1)
# 提取容量信息
capacity_match = re.search(r"User Capacity:\s+(.*?)\s+bytes", output)
if capacity_match:
parsed_info["capacity"] = capacity_match.group(1)
# 提取旋转速率
rotation_match = re.search(r"Rotation Rate:\s+(.*)", output)
if rotation_match:
parsed_info["rotation_rate"] = rotation_match.group(1)
# 提取SMART健康状态 - 兼容SAS和SSD格式
# SAS格式
health_match = re.search(r"SMART Health Status:\s+(.*)", output)
if health_match:
parsed_info["health_status"] = health_match.group(1)
# SSD格式
elif not parsed_info.get("health_status"):
health_ssd_match = re.search(r"SMART overall-health self-assessment test result:\s+(.*)", output)
if health_ssd_match:
parsed_info["health_status"] = health_ssd_match.group(1)
# 提取温度信息 - 兼容SAS和SSD格式
# SAS格式
temp_match = re.search(r"Current Drive Temperature:\s+([0-9]+)", output)
if temp_match:
parsed_info["temperature"] = temp_match.group(1)
# SSD格式 - 增强版,能处理更多格式变化
elif not parsed_info.get("temperature"):
# 尝试匹配194 Temperature_Celsius行精确提取RAW_VALUE
temp_ssd_match = re.search(r"194 Temperature_Celsius.*?\b(\d+)\b\s*$", output)
if temp_ssd_match:
parsed_info["temperature"] = temp_ssd_match.group(1)
else:
# 尝试匹配其他可能的温度表示方式
temp_ssd_alt_match = re.search(r"Temperature_Celsius.*?\b(\d+)\b", output)
if temp_ssd_alt_match:
parsed_info["temperature"] = temp_ssd_alt_match.group(1)
# 提取通电时间 - 兼容SAS和SSD格式
# SAS格式
power_on_match = re.search(r"Accumulated power on time, hours:minutes\s+([0-9:]+)", output)
if power_on_match:
parsed_info["power_on_time"] = f"{ power_on_match.group(1)}"
# SSD格式 - 增强版,能处理更多格式变化
elif not parsed_info.get("power_on_time"):
# 尝试匹配9 Power_On_Hours行精确提取RAW_VALUE
# 改进的正则:添加\s*匹配行首空格使用re.MULTILINE匹配多行
power_on_ssd_match = re.search(r"\s*9 Power_On_Hours.*?\b(\d+)\b\s*$", output, re.MULTILINE)
if power_on_ssd_match:
parsed_info["power_on_time"] = f"{power_on_ssd_match.group(1)}"
else:
# 尝试其他可能的通电时间表示方式,使用更宽松的正则
power_on_ssd_alt_match = re.search(r"Power_On_Hours.*?\b(\d+)\b", output, re.MULTILINE)
if power_on_ssd_alt_match:
parsed_info["power_on_time"] = f"{power_on_ssd_alt_match.group(1)}"
# 提取制造日期 - 增强版,支持多种格式
# 格式1: Manufactured in week XX of year XXXX
manufactured_match = re.search(r"Manufactured in week (\d+) of year (\d+)", output)
if manufactured_match:
parsed_info["manufactured_date"] = f"{manufactured_match.group(2)}-W{manufactured_match.group(1)}"
else:
# 格式2: 尝试从固件版本或其他字段提取制造年份信息
# 例如:固件版本通常包含年份信息 SN14546 -> 2024年
firmware = parsed_info.get("firmware_version", "")
if firmware:
# 尝试从固件版本中提取制造年份信息
year_match = re.search(r"(\d{2})\d{2}", firmware)
if year_match:
year = year_match.group(1)
# 假设年份是21世纪
parsed_info["manufactured_date"] = f"20{year}"
# 提取错误计数信息 (SAS格式)
error_read_match = re.search(r"read:\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+)", output)
if error_read_match:
parsed_info["read_errors"] = {
"corrected": error_read_match.group(4),
"uncorrected": error_read_match.group(7),
"data_processed": f"{error_read_match.group(6)} GB"
}
error_write_match = re.search(r"write:\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+)", output)
if error_write_match:
parsed_info["write_errors"] = {
"corrected": error_write_match.group(4),
"uncorrected": error_write_match.group(7),
"data_processed": f"{error_write_match.group(6)} GB"
}
# 提取SMART自检日志信息 - 兼容SAS和SSD格式
# SAS格式
self_test_match = re.search(r"SMART Self-test log\nNum\s+Test\s+Status\s+segment\s+LifeTime\s+LBA_first_err \[SK ASC ASQ\]\n(.*?)(\n\s*\n|$)", output, re.DOTALL)
if self_test_match:
test_results = []
lines = self_test_match.group(1).strip().split("\n")
for line in lines:
if line.strip() and not line.startswith(" "):
parts = re.split(r"\s+", line.strip())
if len(parts) >= 5:
test_info = {
"id": parts[0],
"type": parts[1],
"status": parts[2],
"lifetime_hours": parts[4]
}
test_results.append(test_info)
if test_results:
parsed_info["self_test_results"] = test_results
# SSD格式
elif not parsed_info.get("self_test_results"):
self_test_ssd_match = re.search(r"SMART Self-test log structure revision number \d+\nNum\s+Test_Description\s+Status\s+Remaining\s+LifeTime\(hours\)\s+LBA_of_first_error\n(.*?)(\n\s*\n|$)", output, re.DOTALL)
if self_test_ssd_match:
test_results = []
lines = self_test_ssd_match.group(1).strip().split("\n")
for line in lines:
if line.strip() and line.startswith("#"):
parts = re.split(r"\s+", line.strip())
if len(parts) >= 5:
test_info = {
"id": parts[0].replace("#", ""),
"type": parts[1],
"status": " ".join(parts[2:4]) if parts[2] == "Completed" and parts[3] == "without" else parts[2],
"lifetime_hours": parts[4]
}
test_results.append(test_info)
if test_results:
parsed_info["self_test_results"] = test_results
# 检测硬盘类型 (根据旋转速率或特定属性)
if "rotation_rate" in parsed_info and "Solid State Device" in parsed_info["rotation_rate"]:
parsed_info["disk_type"] = "SSD"
elif "rotation_rate" in parsed_info and "rpm" in parsed_info["rotation_rate"]:
parsed_info["disk_type"] = "HDD"
else:
parsed_info["disk_type"] = "Unknown"
return parsed_info
def generate_md_report(self):
"""生成Markdown格式的报告兼容SAS和SSD硬盘格式"""
with open(self.md_report, "w") as f:
f.write(f"# 硬盘巡检报告\n\n")
f.write(f"**检查日期**: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"**设备**: {self.device}\n\n")
f.write("## 控制器检查结果\n\n")
# 添加健康状态概览
healthy_count = sum(1 for cid, info in self.results.items()
if "health_status" in info and (info["health_status"] == "OK" or info["health_status"] == "PASSED"))
detected_count = sum(1 for cid, info in self.results.items()
if "health_status" in info)
# 统计硬盘类型
hdd_count = sum(1 for cid, info in self.results.items()
if "disk_type" in info and info["disk_type"] == "HDD")
ssd_count = sum(1 for cid, info in self.results.items()
if "disk_type" in info and info["disk_type"] == "SSD")
f.write(f"### 健康状态概览\n")
f.write(f"- 总控制器数: {self.controller_count}\n")
f.write(f"- 检测到的控制器数: {detected_count}\n")
f.write(f"- 健康控制器数: {healthy_count}\n")
f.write(f"- 异常控制器数: {detected_count - healthy_count}\n")
f.write(f"- HDD硬盘数: {hdd_count}\n")
f.write(f"- SSD硬盘数: {ssd_count}\n\n")
# 详细信息表格 - 简化版,添加硬盘类型列
f.write("### 基本信息概览\n")
f.write("| 控制器ID | 硬盘类型 | 厂商 | 设备型号 | 序列号 | 健康状态 | 温度(°C) | 通电时间 |\n")
f.write("|----------|----------|------|----------|--------|----------|----------|----------|\n")
for controller_id in range(self.controller_count, self.megaraid_count):
info = self.results.get(controller_id, {})
if "health_status" in info: # 只显示有数据的控制器
f.write(f"| {controller_id} ")
f.write(f"| {info.get('disk_type', 'N/A')} ")
f.write(f"| {info.get('vendor', 'N/A')} ")
f.write(f"| {info.get('device_model', 'N/A')} ")
f.write(f"| {info.get('serial_number', 'N/A')} ")
f.write(f"| {info.get('health_status', 'N/A')} ")
f.write(f"| {info.get('temperature', 'N/A')} ")
f.write(f"| {info.get('power_on_time', 'N/A')} |\n")
# 为每个控制器添加详细信息部分
f.write("\n## 控制器详细信息\n\n")
has_issues = False
for controller_id in range(self.controller_count, self.megaraid_count):
info = self.results.get(controller_id, {})
if "health_status" in info: # 只显示有数据的控制器
f.write(f"### 控制器 {controller_id}\n")
f.write(f"- **厂商**: {info.get('vendor', 'N/A')}\n")
f.write(f"- **设备型号**: {info.get('device_model', 'N/A')}\n")
f.write(f"- **序列号**: {info.get('serial_number', 'N/A')}\n")
f.write(f"- **固件版本**: {info.get('firmware_version', 'N/A')}\n")
f.write(f"- **容量**: {info.get('capacity', 'N/A')}\n")
f.write(f"- **旋转速率**: {info.get('rotation_rate', 'N/A')}\n")
f.write(f"- **硬盘类型**: {info.get('disk_type', 'N/A')}\n")
f.write(f"- **制造日期**: {info.get('manufactured_date', 'N/A')}\n")
f.write(f"- **健康状态**: **{info.get('health_status', 'N/A')}**\n")
f.write(f"- **当前温度**: {info.get('temperature', 'N/A')}°C\n")
f.write(f"- **累计通电时间**: {info.get('power_on_time', 'N/A')}\n")
# 错误计数信息 (主要适用于SAS硬盘)
if "read_errors" in info:
read_err = info["read_errors"]
f.write("- **读取错误**:\n")
f.write(f" - 已纠正: {read_err['corrected']}\n")
f.write(f" - 未纠正: {read_err['uncorrected']}\n")
f.write(f" - 处理数据量: {read_err['data_processed']}\n")
if "write_errors" in info:
write_err = info["write_errors"]
f.write("- **写入错误**:\n")
f.write(f" - 已纠正: {write_err['corrected']}\n")
f.write(f" - 未纠正: {write_err['uncorrected']}\n")
f.write(f" - 处理数据量: {write_err['data_processed']}\n")
# SMART自检日志
if "self_test_results" in info:
f.write("- **SMART自检结果**:\n")
for test in info["self_test_results"]:
f.write(f" - 测试 #{test['id']}: {test['type']} - {test['status']} (运行时间: {test['lifetime_hours']}小时)\n")
# 检查是否有异常 - 兼容SAS(OK)和SSD(PASSED)的健康状态表示
if info.get("health_status", "") not in ["OK", "PASSED"]:
has_issues = True
f.write("\n**⚠️ 警告:此控制器状态异常,请及时关注!**\n")
f.write("\n")
f.write("\n## 异常汇总\n\n")
if has_issues:
f.write("**发现异常控制器,请关注以下问题:**\n\n")
for controller_id in range(self.controller_count, self.megaraid_count):
info = self.results.get(controller_id, {})
if info.get("health_status", "") not in ["OK", "PASSED"]:
f.write(f"- **控制器 {controller_id}**: 状态为 {info.get('health_status', '未知')}\n")
f.write("\n")
else:
f.write("**未发现异常情况。**\n\n")
f.write("\n## 建议\n\n")
if has_issues:
f.write("1. **立即关注异常控制器**的状态变化,并考虑进行进一步测试\n")
f.write("2. **备份重要数据**,防止因硬盘故障导致数据丢失\n")
f.write("3. **联系技术支持**,评估是否需要更换有问题的硬盘\n")
f.write("4. 增加监控频率,密切关注错误计数的变化趋势\n")
else:
f.write("1. **保持定期检查**硬盘健康状态,建议每两周至少检查一次\n")
f.write("2. **继续做好数据备份工作**,采用异地备份和多版本策略\n")
f.write("3. **注意监控系统温度变化**,保持良好的散热环境\n")
f.write("4. 记录并跟踪硬盘的通电时间和错误计数趋势\n")
print(f"Markdown报告已生成: {self.md_report}")
return self.md_report
def submit_to_mcp(self, report_path):
"""将报告提交给AI分析通过MCP服务器"""
try:
# 读取报告内容
with open(report_path, 'r') as f:
report_content = f.read()
# 构建提交给MCP的数据 - 按照服务器要求的格式timestamp应为Unix时间戳整数
mcp_data = {
"data": {
"type": "disk_inspection_report",
"content": report_content
},
"type": "disk_inspection",
"timestamp": int(datetime.datetime.now().timestamp())
}
print("正在准备通过MCP提交报告给AI分析...")
# 创建一个临时JSON文件用于存储要提交的数据
temp_json = os.path.join(self.log_dir, f"mcp_data_{self.current_date}.json")
with open(temp_json, 'w') as f:
json.dump(mcp_data, f, ensure_ascii=False, indent=2)
print(f"MCP数据已准备好: {temp_json}")
# 尝试使用requests库直接向MCP服务器发送请求
try:
# MCP服务器配置
print(f"正在向MCP服务器发送请求: {MCP_SERVER_URL}")
response = requests.post(
MCP_SERVER_URL,
json=mcp_data,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
print("MCP提交成功")
print(f"AI分析结果: {result.get('result', {}).get('analysis', '无分析结果')}")
# 保存AI分析结果到文件
analysis_file = os.path.join(self.log_dir, f"ai_analysis_{self.current_date}.json")
with open(analysis_file, 'w') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"AI分析结果已保存: {analysis_file}")
else:
print(f"MCP服务器请求失败: HTTP {response.status_code}")
print(f"响应内容: {response.text}")
print("请检查MCP服务器是否正常运行")
except ImportError:
print("未安装requests库无法直接发送HTTP请求")
print("请运行: pip install requests 来安装必要的依赖")
print("或者使用以下命令手动提交数据:")
print(f"curl -X POST -H 'Content-Type: application/json' -d @{temp_json} {MCP_SERVER_URL}")
except Exception as e:
print(f"发送请求到MCP服务器时出错: {str(e)}")
print("请检查MCP服务器是否正常运行")
print("或者使用以下命令手动提交数据:")
print(f"curl -X POST -H 'Content-Type: application/json' -d @{temp_json} {MCP_SERVER_URL}")
return True
except Exception as e:
print(f"提交MCP失败: {str(e)}")
return False
def run_inspection(self):
"""运行完整的硬盘巡检流程"""
print(f"开始硬盘巡检,设备: {self.device}")
print(f"日志文件: {self.log_file}")
# 清空日志文件(如果存在)
if os.path.exists(self.log_file):
open(self.log_file, 'w').close()
# 检查每个控制器
for controller_id in range(self.controller_count, self.megaraid_count):
print(f"检查控制器 {controller_id}...")
output = self.run_smartctl(controller_id)
if output:
self.results[controller_id] = self.parse_smart_info(output, controller_id)
# 生成MD报告
report_path = self.generate_md_report()
# 提交给MCP
self.submit_to_mcp(report_path)
print("硬盘巡检完成!")
if __name__ == "__main__":
# 检查是否以root权限运行
if os.geteuid() != 0:
print("错误请以root权限运行此脚本")
sys.exit(1)
# 检查是否安装了smartctl
try:
subprocess.run("which smartctl", shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
print("错误未找到smartctl工具请先安装")
print("Ubuntu/Debian: apt install smartmontools")
print("CentOS/RHEL: yum install smartmontools")
sys.exit(1)
# 创建并运行巡检实例
inspector = DiskInspection()
inspector.run_inspection()