base_agent/tools/uav_self_check.py

123 lines
4.9 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from pymavlink import mavutil
from pymavlink.dialects.v20.all import MAVLink_sys_status_message
from agent.device import controller
from agent.tools.base_tool import BaseTool, ToolResult
class Tool(BaseTool):
name = "uav_self_check"
description = ("对无人机设备进行使用前自检")
parameters = {}
def execute(self, **kwargs) -> ToolResult:
"""
MAVLink 飞控系统健康自检
解决Arming denied: Resolve system health failures first
"""
# 等待健康数据
assert controller.connect(), "连接应成功"
try:
connection = controller.adapter.get_connection()
check_list = []
health = connection.wait_heartbeat(timeout=5)
if not health:
return ToolResult(success=False, output="❌ 无法获取飞控状态,连接异常")
# 2. 获取系统健康状态(关键:判断解锁失败原因)
check_list.append("\n===== 系统健康自检结果 =====")
system_status = health.system_status
check_list.append(f"系统状态码:{system_status}")
# 系统状态解释
status_map = {
0: "未初始化",
1: "初始化中",
2: "测试模式",
3: "待机",
4: "活动",
5: "临界",
6: "紧急",
7: "故障"
}
check_list.append(f"系统状态:{status_map.get(system_status, '未知')}")
# 3. 读取详细传感器健康(核心自检)
check_list.append("\n===== 传感器健康检测 =====")
for _ in range(30):
msg = connection.recv_match(type="SYS_STATUS", blocking=True)
if msg:
# 传感器是否存在
sensors_present = msg.onboard_control_sensors_present
sensors_enable = msg.onboard_control_sensors_enabled
# 传感器列表
sensor_list = [
("3D陀螺仪", 0),
("3D加速度计", 1),
("3D磁力计", 2),
("气压计", 3),
("GPS", 4),
("光学流量", 5),
("视觉定位", 6),
("测距传感器", 7),
("电机控制", 8),
("电池", 9),
("遥控信号", 10),
("飞行控制", 11),
("避障系统", 12),
("云台", 13),
]
# 逐个检测
for name, bit in sensor_list:
present = (sensors_present >> bit) & 1
enable = (sensors_enable >> bit) & 1
if present:
if not enable:
check_list.append(f"{name}:故障")
else:
check_list.append(f"{name}:正常")
else:
check_list.append(f" {name}:未安装")
break
time.sleep(0.1)
# 4. 解锁能力判断(解决 Arming denied
check_list.append("\n===== 解锁能力检测 =====")
can_arm = False
for _ in range(20):
msg = connection.recv_match(type="HEARTBEAT", blocking=True)
if msg:
base_mode = msg.base_mode
custom_mode = msg.custom_mode
# 0b10000000 = 已解锁
if (base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0:
check_list.append("✅ 当前状态:已解锁")
can_arm = True
else:
check_list.append("⚠️ 当前状态:未解锁")
# 判断是否禁止解锁
if system_status in [5, 6, 7]:
check_list.append("❌ 禁止解锁原因:系统处于临界/紧急/故障状态")
else:
check_list.append("✅ 系统允许解锁,可执行 arm 指令")
can_arm = True
break
print("\n===== 自检完成 =====")
if can_arm:
check_list.append("🎉 结论无健康故障可以执行解锁arm")
else:
check_list.append("⚠️ 结论:存在系统故障,必须修复后才能 arm")
return ToolResult(success=True, output="\n".join(check_list))
except Exception as e:
return ToolResult(success=False, output=str(e))
finally:
controller.disconnect()