Files
deploy.stack/crontab/disk_inspection.py
cnphpbb fb530c43ce feat(crontab): 添加硬盘巡检脚本及相关文档
添加disk_inspection.py脚本用于检查硬盘健康状态,包括:
- 支持多个megaraid控制器检查
- 生成Markdown格式报告
- 支持通过MCP提交数据
- 添加README和INSTALL文档
- 添加requirements.txt依赖文件
2025-09-10 20:49:12 +08:00

277 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import subprocess
import sys
import os
import datetime
import re
import json
class DiskInspection:
def __init__(self, device="/dev/sda", controller_count=8, log_dir="/root"):
self.device = device
self.controller_count = controller_count
self.log_dir = log_dir
self.current_date = datetime.datetime.now().strftime("%y%m%d")
self.log_file = os.path.join(log_dir, f"smartctl.{self.current_date}.log")
self.md_report = os.path.join(log_dir, f"disk_inspection_report.{self.current_date}.md")
self.results = {}
def run_smartctl(self, controller_id):
"""执行smartctl命令检查指定控制器的硬盘信息"""
try:
command = f"smartctl --all -d megaraid,{controller_id} {self.device}"
result = subprocess.run(
command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True
)
# 保存到日志文件
with open(self.log_file, "a") as f:
f.write(f"\n\n===== Controller {controller_id} =====\n")
f.write(result.stdout)
return result.stdout
except subprocess.CalledProcessError as e:
error_msg = f"执行smartctl命令失败 (Controller {controller_id}): {e.stderr}\n"
print(error_msg)
# 将错误信息也写入日志
with open(self.log_file, "a") as f:
f.write(f"\n\n===== Controller {controller_id} (Error) =====\n")
f.write(error_msg)
return None
def parse_smart_info(self, output, controller_id):
"""解析smartctl输出的信息"""
if not output:
return {"error": "No data available"}
parsed_info = {}
# 提取基本信息
device_info_match = re.search(r"Device Model:\s+(.*)", output)
if device_info_match:
parsed_info["device_model"] = device_info_match.group(1)
serial_match = re.search(r"Serial Number:\s+(.*)", output)
if serial_match:
parsed_info["serial_number"] = serial_match.group(1)
firmware_match = re.search(r"Firmware Version:\s+(.*)", output)
if firmware_match:
parsed_info["firmware_version"] = firmware_match.group(1)
# 提取SMART健康状态
health_match = re.search(r"SMART overall-health status:\s+(.*)", output)
if health_match:
parsed_info["health_status"] = health_match.group(1)
# 提取温度信息
temp_match = re.search(r"Temperature_Celsius\s+([0-9]+)", output)
if temp_match:
parsed_info["temperature"] = temp_match.group(1)
# 提取重要的SMART属性
important_attributes = {}
attributes_section = re.search(r"ID#\s+ATTRIBUTE_NAME\s+.*\n(.*?)(\n\s*\n|$)", output, re.DOTALL)
if attributes_section:
lines = attributes_section.group(1).strip().split("\n")
for line in lines:
parts = re.split(r"\s+", line.strip())
if len(parts) >= 10:
attr_id = parts[0]
attr_name = parts[1]
value = parts[3]
worst = parts[4]
threshold = parts[5]
# 关注重要的属性
critical_attrs = ["Reallocated_Sector_Ct", "Spin_Retry_Count",
"End-to-End_Error", "CRC_Error_Count",
"Multi_Zone_Error_Rate"]
if attr_name in critical_attrs:
important_attributes[attr_name] = {
"value": value,
"worst": worst,
"threshold": threshold
}
if important_attributes:
parsed_info["important_attributes"] = important_attributes
return parsed_info
def generate_md_report(self):
"""生成Markdown格式的报告"""
with open(self.md_report, "w") as f:
f.write(f"# 硬盘巡检报告\n\n")
f.write(f"**检查日期**: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"**设备**: {self.device}\n\n")
f.write("## 控制器检查结果\n\n")
# 添加健康状态概览
healthy_count = sum(1 for cid, info in self.results.items()
if "health_status" in info and info["health_status"] == "OK")
f.write(f"### 健康状态概览\n")
f.write(f"- 总控制器数: {self.controller_count}\n")
f.write(f"- 健康控制器数: {healthy_count}\n")
f.write(f"- 异常控制器数: {self.controller_count - healthy_count}\n\n")
# 详细信息表格
f.write("### 详细信息\n")
f.write("| 控制器ID | 设备型号 | 序列号 | 固件版本 | 健康状态 | 温度(°C) |\n")
f.write("|----------|----------|--------|----------|----------|----------|\n")
for controller_id in range(self.controller_count):
info = self.results.get(controller_id, {})
f.write(f"| {controller_id} ")
f.write(f"| {info.get('device_model', 'N/A')} ")
f.write(f"| {info.get('serial_number', 'N/A')} ")
f.write(f"| {info.get('firmware_version', 'N/A')} ")
f.write(f"| {info.get('health_status', 'N/A')} ")
f.write(f"| {info.get('temperature', 'N/A')} |\n")
f.write("\n## 异常详情\n\n")
# 添加异常详情
has_issues = False
for controller_id in range(self.controller_count):
info = self.results.get(controller_id, {})
if "health_status" in info and info["health_status"] != "OK":
has_issues = True
f.write(f"### 控制器 {controller_id}\n")
f.write(f"- 健康状态: **{info['health_status']}**\n")
if "important_attributes" in info:
f.write("- 关键属性异常:\n")
for attr_name, attr_info in info["important_attributes"].items():
f.write(f" - {attr_name}: 值={attr_info['value']}, 最差={attr_info['worst']}, 阈值={attr_info['threshold']}\n")
f.write("\n")
if not has_issues:
f.write("暂无异常情况。\n")
f.write("\n## 建议\n\n")
if has_issues:
f.write("1. 请关注异常控制器的状态变化\n")
f.write("2. 建议备份重要数据\n")
f.write("3. 必要时考虑更换有问题的硬盘\n")
else:
f.write("1. 保持定期检查硬盘健康状态\n")
f.write("2. 继续做好数据备份工作\n")
f.write("3. 注意监控系统温度变化\n")
print(f"Markdown报告已生成: {self.md_report}")
return self.md_report
def submit_to_mcp(self, report_path):
"""将报告提交给AI分析通过MCP服务器"""
try:
# 读取报告内容
with open(report_path, 'r') as f:
report_content = f.read()
# 构建提交给MCP的数据
mcp_data = {
"type": "disk_inspection_report",
"timestamp": datetime.datetime.now().isoformat(),
"content": report_content
}
print("正在准备通过MCP提交报告给AI分析...")
# 创建一个临时JSON文件用于存储要提交的数据
temp_json = os.path.join(self.log_dir, f"mcp_data_{self.current_date}.json")
with open(temp_json, 'w') as f:
json.dump(mcp_data, f, ensure_ascii=False, indent=2)
print(f"MCP数据已准备好: {temp_json}")
# 尝试使用requests库直接向MCP服务器发送请求
try:
import requests
# MCP服务器配置
mcp_server_url = "http://localhost:8080/mcp/v1/submit"
print(f"正在向MCP服务器发送请求: {mcp_server_url}")
response = requests.post(
mcp_server_url,
json=mcp_data,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
print("MCP提交成功")
print(f"AI分析结果: {result.get('result', {}).get('analysis', '无分析结果')}")
# 保存AI分析结果到文件
analysis_file = os.path.join(self.log_dir, f"ai_analysis_{self.current_date}.json")
with open(analysis_file, 'w') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"AI分析结果已保存: {analysis_file}")
else:
print(f"MCP服务器请求失败: HTTP {response.status_code}")
print(f"响应内容: {response.text}")
print("请检查MCP服务器是否正常运行")
except ImportError:
print("未安装requests库无法直接发送HTTP请求")
print("请运行: pip install requests 来安装必要的依赖")
print("或者使用以下命令手动提交数据:")
print(f"curl -X POST -H 'Content-Type: application/json' -d @{temp_json} http://localhost:8080/mcp/v1/submit")
except Exception as e:
print(f"发送请求到MCP服务器时出错: {str(e)}")
print("请检查MCP服务器是否正常运行")
print("或者使用以下命令手动提交数据:")
print(f"curl -X POST -H 'Content-Type: application/json' -d @{temp_json} http://localhost:8080/mcp/v1/submit")
return True
except Exception as e:
print(f"提交MCP失败: {str(e)}")
return False
def run_inspection(self):
"""运行完整的硬盘巡检流程"""
print(f"开始硬盘巡检,设备: {self.device}")
print(f"日志文件: {self.log_file}")
# 清空日志文件(如果存在)
if os.path.exists(self.log_file):
open(self.log_file, 'w').close()
# 检查每个控制器
for controller_id in range(self.controller_count):
print(f"检查控制器 {controller_id}...")
output = self.run_smartctl(controller_id)
if output:
self.results[controller_id] = self.parse_smart_info(output, controller_id)
# 生成MD报告
report_path = self.generate_md_report()
# 提交给MCP
self.submit_to_mcp(report_path)
print("硬盘巡检完成!")
if __name__ == "__main__":
# 检查是否以root权限运行
if os.geteuid() != 0:
print("错误请以root权限运行此脚本")
sys.exit(1)
# 检查是否安装了smartctl
try:
subprocess.run("which smartctl", shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
print("错误未找到smartctl工具请先安装")
print("Ubuntu/Debian: apt install smartmontools")
print("CentOS/RHEL: yum install smartmontools")
sys.exit(1)
# 创建并运行巡检实例
inspector = DiskInspection()
inspector.run_inspection()