verification working

This commit is contained in:
cristhian aguilera
2026-01-30 17:03:19 -03:00
parent 61bc384826
commit 380c466170
2 changed files with 825 additions and 0 deletions

View File

@@ -0,0 +1,754 @@
"""Dora node for verifying eye-in-hand calibration with a ChArUco board."""
from __future__ import annotations
import argparse
from collections import deque
import json
import os
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
import pyarrow as pa
from dora import Node
WINDOW_NAME = "Dora Eye-in-Hand Verification"
DEFAULT_DICT = cv2.aruco.DICT_6X6_1000
@dataclass
class CameraIntrinsics:
fx: float
fy: float
cx: float
cy: float
distortion: np.ndarray
def camera_matrix(self) -> np.ndarray:
return np.array(
[[self.fx, 0.0, self.cx], [0.0, self.fy, self.cy], [0.0, 0.0, 1.0]],
dtype=np.float64,
)
@dataclass
class CornerTarget:
corner_mm: np.ndarray
flange_target_mm: np.ndarray
flange_approach_mm: np.ndarray
@dataclass
class VerifyState:
intrinsics: Optional[CameraIntrinsics] = None
latest_tcp_pose: Optional[List[float]] = None
latest_tcp_pose_at: Optional[float] = None
pose_history: deque[Tuple[float, List[float]]] = field(
default_factory=lambda: deque(maxlen=10)
)
last_status_at: Optional[float] = None
pending_command: Optional[Dict[str, Any]] = None
status_text: str = ""
detected_ids: List[int] = field(default_factory=list)
corner_targets: Dict[int, CornerTarget] = field(default_factory=dict)
selected_corner_id: Optional[int] = None
stage: str = "idle"
last_tip_error_mm: Optional[float] = None
last_flange_error_mm: Optional[float] = None
intrinsics_logged: bool = False
def _rotation_matrix_xyz(
roll_deg: float, pitch_deg: float, yaw_deg: float
) -> np.ndarray:
roll = np.deg2rad(roll_deg)
pitch = np.deg2rad(pitch_deg)
yaw = np.deg2rad(yaw_deg)
cx, sx = np.cos(roll), np.sin(roll)
cy, sy = np.cos(pitch), np.sin(pitch)
cz, sz = np.cos(yaw), np.sin(yaw)
rot_x = np.array([[1.0, 0.0, 0.0], [0.0, cx, -sx], [0.0, sx, cx]])
rot_y = np.array([[cy, 0.0, sy], [0.0, 1.0, 0.0], [-sy, 0.0, cy]])
rot_z = np.array([[cz, -sz, 0.0], [sz, cz, 0.0], [0.0, 0.0, 1.0]])
return rot_z @ rot_y @ rot_x
def _pose_to_matrix(tcp_pose_mm_deg: List[float]) -> np.ndarray:
x, y, z, roll, pitch, yaw = tcp_pose_mm_deg
rot = _rotation_matrix_xyz(roll, pitch, yaw)
mat = np.eye(4)
mat[:3, :3] = rot
mat[:3, 3] = np.array([x, y, z], dtype=np.float64) / 1000.0
return mat
def _try_get_charuco_board(
squares_x: int, squares_y: int, square_length: float, marker_length: float
) -> Any:
dictionary = cv2.aruco.getPredefinedDictionary(DEFAULT_DICT)
try:
board = cv2.aruco.CharucoBoard(
(squares_x, squares_y), square_length, marker_length, dictionary
)
except AttributeError:
board = cv2.aruco.CharucoBoard_create(
squares_x, squares_y, square_length, marker_length, dictionary
)
if hasattr(board, "setLegacyPattern"):
board.setLegacyPattern(True)
return board
def _create_charuco_detector(board: Any) -> Optional[Any]:
if not hasattr(cv2.aruco, "CharucoDetector"):
return None
aruco_params = cv2.aruco.DetectorParameters()
charuco_params = cv2.aruco.CharucoParameters()
charuco_params.minMarkers = 2
return cv2.aruco.CharucoDetector(board, charuco_params, aruco_params)
def _decode_image(storage: pa.Array, metadata: Dict[str, Any]) -> np.ndarray:
encoding = str(metadata.get("encoding", "bgr8")).lower()
width = metadata.get("width")
height = metadata.get("height")
if (width is None or height is None) and "shape" in metadata:
shape = metadata.get("shape")
if isinstance(shape, (list, tuple)) and len(shape) >= 2:
height = height if height is not None else int(shape[0])
width = width if width is not None else int(shape[1])
if width is None or height is None:
raise KeyError("width/height (or shape) missing from metadata")
if encoding == "bgr8":
frame = storage.to_numpy().astype(np.uint8).reshape((height, width, 3))
return frame.copy()
if encoding == "rgb8":
frame = storage.to_numpy().astype(np.uint8).reshape((height, width, 3))
return cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
if encoding in {"jpeg", "jpg", "jpe", "bmp", "webp", "png"}:
frame = storage.to_numpy().astype(np.uint8)
return cv2.imdecode(frame, cv2.IMREAD_COLOR)
if encoding == "yuv420":
yuv = storage.to_numpy().astype(np.uint8)
yuv = yuv[: width * height * 3 // 2].reshape((height * 3 // 2, width))
return cv2.cvtColor(yuv, cv2.COLOR_YUV420p2BGR)
raise RuntimeError(f"Unsupported image encoding: {encoding}")
def _intrinsics_from_metadata(metadata: Dict[str, Any]) -> Optional[CameraIntrinsics]:
if not all(k in metadata for k in ("fx", "fy", "cx", "cy")):
return None
distortion = metadata.get("distortion", [])
if distortion is None:
distortion = []
dist = np.array(distortion, dtype=np.float64).reshape(-1)
if dist.size == 0:
dist = np.zeros(5, dtype=np.float64)
return CameraIntrinsics(
fx=float(metadata["fx"]),
fy=float(metadata["fy"]),
cx=float(metadata["cx"]),
cy=float(metadata["cy"]),
distortion=dist,
)
def _intrinsics_from_camera_info(
values: np.ndarray, metadata: Dict[str, Any]
) -> Optional[CameraIntrinsics]:
if values.size < 4:
return None
fx, fy, cx, cy = [float(v) for v in values[:4]]
distortion = metadata.get("distortion", [])
if distortion is None:
distortion = []
dist = np.array(distortion, dtype=np.float64).reshape(-1)
if dist.size == 0:
dist = np.zeros(5, dtype=np.float64)
return CameraIntrinsics(fx=fx, fy=fy, cx=cx, cy=cy, distortion=dist)
def _estimate_charuco_pose_with_ids(
frame: np.ndarray,
board: Any,
intrinsics: CameraIntrinsics,
detector: Optional[Any],
min_corners: int,
) -> Tuple[
Optional[np.ndarray],
Optional[np.ndarray],
Optional[np.ndarray],
np.ndarray,
bool,
int,
int,
bool,
]:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if detector is not None:
charuco_corners, charuco_ids, marker_corners, marker_ids = detector.detectBoard(
frame
)
else:
dictionary = (
board.dictionary if hasattr(board, "dictionary") else board.getDictionary()
)
marker_corners, marker_ids, _ = cv2.aruco.detectMarkers(gray, dictionary)
charuco_corners, charuco_ids = None, None
if marker_ids is not None and len(marker_ids) > 0:
_, charuco_corners, charuco_ids = cv2.aruco.interpolateCornersCharuco(
marker_corners, marker_ids, gray, board
)
debug = frame.copy()
marker_count = 0 if marker_ids is None else len(marker_ids)
if marker_count == 0:
return None, None, None, debug, False, 0, 0, False
cv2.aruco.drawDetectedMarkers(debug, marker_corners, marker_ids)
charuco_count = 0 if charuco_ids is None else len(charuco_ids)
if charuco_count < min_corners:
return None, None, None, debug, True, marker_count, charuco_count, False
cv2.aruco.drawDetectedCornersCharuco(debug, charuco_corners, charuco_ids)
obj_points = board.getChessboardCorners()[charuco_ids.flatten()]
ok, rvec, tvec = cv2.solvePnP(
obj_points,
charuco_corners,
intrinsics.camera_matrix(),
intrinsics.distortion,
flags=cv2.SOLVEPNP_IPPE,
)
if not ok:
return None, None, None, debug, True, marker_count, charuco_count, False
rvec, tvec = cv2.solvePnPRefineVVS(
obj_points,
charuco_corners,
intrinsics.camera_matrix(),
intrinsics.distortion,
rvec,
tvec,
)
cv2.drawFrameAxes(
debug,
intrinsics.camera_matrix(),
intrinsics.distortion,
rvec,
tvec,
0.05,
)
return rvec, tvec, charuco_ids, debug, True, marker_count, charuco_count, True
def _send_dora_command(
node: Node, output_name: str, action: str, payload: Dict[str, Any]
) -> str:
command_id = str(uuid.uuid4())
message = {"id": command_id, "action": action, "payload": payload}
node.send_output(
output_name,
pa.array([json.dumps(message)]),
metadata={"encoding": "json", "timestamp_ns": time.time_ns()},
)
return command_id
def _parse_status_payload(value: pa.Array) -> Optional[Dict[str, Any]]:
if len(value) == 0:
return None
raw = value[0].as_py()
if not raw:
return None
try:
return json.loads(raw)
except Exception:
return None
def _pose_is_stable(
pose_history: deque[Tuple[float, List[float]]],
window_s: float,
eps_mm: float,
eps_deg: float,
) -> bool:
if not pose_history:
return False
latest_t = pose_history[-1][0]
window = [p for (t, p) in pose_history if latest_t - t <= window_s]
if len(window) < 2:
return False
positions = np.array([p[:3] for p in window], dtype=np.float64)
rotations = np.array([p[3:6] for p in window], dtype=np.float64)
pos_span = positions.max(axis=0) - positions.min(axis=0)
rot_span = rotations.max(axis=0) - rotations.min(axis=0)
return bool(np.all(pos_span <= eps_mm) and np.all(rot_span <= eps_deg))
def _within_bounds(
point_mm: np.ndarray,
min_xyz: Tuple[Optional[float], Optional[float], Optional[float]],
max_xyz: Tuple[Optional[float], Optional[float], Optional[float]],
) -> bool:
x, y, z = point_mm.tolist()
min_x, min_y, min_z = min_xyz
max_x, max_y, max_z = max_xyz
if min_x is not None and x < min_x:
return False
if max_x is not None and x > max_x:
return False
if min_y is not None and y < min_y:
return False
if max_y is not None and y > max_y:
return False
if min_z is not None and z < min_z:
return False
if max_z is not None and z > max_z:
return False
return True
def _parse_float_env(name: str) -> Optional[float]:
raw = os.getenv(name)
if raw is None or raw == "":
return None
try:
return float(raw)
except ValueError:
return None
def main() -> None:
parser = argparse.ArgumentParser(
description="Eye-in-hand verification node with minimal OpenCV UI.",
)
parser.add_argument("--name", type=str, default="dora-calibration-verify")
args = parser.parse_args()
image_input = os.getenv("IMAGE_INPUT", "image")
camera_info_input = os.getenv("CAMERA_INFO_INPUT", "camera_info")
pose_input = os.getenv("POSE_INPUT", "tcp_pose")
status_input = os.getenv("STATUS_INPUT", "status")
command_output = os.getenv("COMMAND_OUTPUT", "robot_cmd")
pose_units = os.getenv("POSE_UNITS", "mm").lower()
use_camera_info = os.getenv("USE_CAMERA_INFO", "false").lower() in (
"true",
"1",
"yes",
)
min_corners = int(os.getenv("MIN_CORNERS", "6"))
move_speed = _parse_float_env("MOVE_SPEED")
move_home_on_start = os.getenv("MOVE_HOME_ON_START", "true").lower() in (
"true",
"1",
"yes",
)
tcp_offset_mm = float(os.getenv("TCP_OFFSET_MM", "63.0"))
z_offset_mm = float(os.getenv("Z_OFFSET_MM", "50.0"))
dry_run = os.getenv("DRY_RUN", "false").lower() in ("true", "1", "yes")
hold_after_touch = os.getenv("HOLD_AFTER_TOUCH", "false").lower() in (
"true",
"1",
"yes",
)
pose_stability_window = float(os.getenv("POSE_STABILITY_WINDOW", "0.5"))
pose_stability_eps_mm = float(os.getenv("POSE_STABILITY_EPS_MM", "0.5"))
pose_stability_eps_deg = float(os.getenv("POSE_STABILITY_EPS_DEG", "0.5"))
allow_unstable_pose = os.getenv("ALLOW_UNSTABLE_POSE", "false").lower() in (
"true",
"1",
"yes",
)
squares_x = int(os.getenv("SQUARES_X", "4"))
squares_y = int(os.getenv("SQUARES_Y", "6"))
square_length = float(os.getenv("SQUARE_LENGTH", "0.04"))
marker_length = float(os.getenv("MARKER_LENGTH", "0.03"))
home_x = float(os.getenv("HOME_X", "200"))
home_y = float(os.getenv("HOME_Y", "0"))
home_z = float(os.getenv("HOME_Z", "300"))
home_roll = float(os.getenv("HOME_ROLL", "180"))
home_pitch = float(os.getenv("HOME_PITCH", "0"))
home_yaw = float(os.getenv("HOME_YAW", "0"))
min_xyz = (
_parse_float_env("WORKSPACE_MIN_X"),
_parse_float_env("WORKSPACE_MIN_Y"),
_parse_float_env("WORKSPACE_MIN_Z"),
)
max_xyz = (
_parse_float_env("WORKSPACE_MAX_X"),
_parse_float_env("WORKSPACE_MAX_Y"),
_parse_float_env("WORKSPACE_MAX_Z"),
)
calibration_file = os.getenv("CALIBRATION_FILE", "calibration.npz")
if calibration_file and not os.path.isabs(calibration_file):
config_path = os.path.join("config", calibration_file)
calibration_file = config_path if os.path.exists(config_path) else calibration_file
print(f"Loading calibration from {calibration_file}...", flush=True)
try:
calib_data = np.load(calibration_file, allow_pickle=True)
t_cam2gripper = calib_data["T_cam2gripper"]
calib_type = str(calib_data.get("calibration_type", "unknown"))
file_camera_matrix = calib_data.get("camera_matrix")
file_distortion = calib_data.get("distortion")
except Exception as exc:
raise RuntimeError(f"Failed to load calibration file: {exc}") from exc
if calib_type != "eye_in_hand":
print(
f'Warning: calibration_type="{calib_type}", expected "eye_in_hand".',
flush=True,
)
file_intrinsics: Optional[CameraIntrinsics] = None
if file_camera_matrix is not None:
try:
fx = float(file_camera_matrix[0][0])
fy = float(file_camera_matrix[1][1])
cx = float(file_camera_matrix[0][2])
cy = float(file_camera_matrix[1][2])
distortion = (
np.array(file_distortion, dtype=np.float64).reshape(-1)
if file_distortion is not None
else np.zeros(5, dtype=np.float64)
)
if distortion.size == 0:
distortion = np.zeros(5, dtype=np.float64)
file_intrinsics = CameraIntrinsics(
fx=fx, fy=fy, cx=cx, cy=cy, distortion=distortion
)
except Exception:
file_intrinsics = None
state = VerifyState()
if not use_camera_info and file_intrinsics is not None:
state.intrinsics = file_intrinsics
state.intrinsics_logged = True
print(
f"Using intrinsics from calibration file fx={file_intrinsics.fx:.4f} "
f"fy={file_intrinsics.fy:.4f} cx={file_intrinsics.cx:.4f} "
f"cy={file_intrinsics.cy:.4f} dist={file_intrinsics.distortion.tolist()}",
flush=True,
)
board = _try_get_charuco_board(squares_x, squares_y, square_length, marker_length)
detector = _create_charuco_detector(board)
node = Node(args.name)
cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)
started = False
def send_move(x: float, y: float, z: float, roll: float, pitch: float, yaw: float) -> None:
payload: Dict[str, Any] = {
"x": float(x),
"y": float(y),
"z": float(z),
"roll": float(roll),
"pitch": float(pitch),
"yaw": float(yaw),
}
if move_speed is not None:
payload["speed"] = move_speed
if dry_run:
state.status_text = "DRY_RUN: move suppressed"
return
cmd_id = _send_dora_command(node, command_output, "move_to_pose", payload)
state.pending_command = {
"id": cmd_id,
"action": "move_to_pose",
"sent_at": time.monotonic(),
"payload": payload,
}
for event in node:
if event["type"] != "INPUT":
continue
now = time.monotonic()
if not started:
started = True
if move_home_on_start and not state.pending_command:
send_move(home_x, home_y, home_z, home_roll, home_pitch, home_yaw)
state.stage = "home_pending"
state.status_text = "Sent home on start"
if state.pending_command and now - state.pending_command["sent_at"] > 30.0:
state.status_text = f"Command timeout ({state.pending_command['action']})"
state.pending_command = None
if event["id"] == status_input:
payload = _parse_status_payload(event["value"])
state.last_status_at = now
if payload and state.pending_command:
if payload.get("command_id") == state.pending_command["id"]:
ok = bool(payload.get("ok", False))
msg = payload.get("message", "")
state.status_text = "Command ok" if ok else f"Command failed: {msg}"
state.pending_command = None
if ok:
if state.stage == "approach_pending":
state.stage = "at_approach"
elif state.stage == "touch_pending":
state.stage = "at_target"
if state.selected_corner_id in state.corner_targets:
target = state.corner_targets[state.selected_corner_id]
if state.latest_tcp_pose:
actual = np.array(state.latest_tcp_pose[:3])
flange_error = np.linalg.norm(
actual - target.flange_target_mm
)
tip_actual = actual.copy()
# Assumes tool tip offset along -Z in base frame (roll=180,pitch=0,yaw=0).
tip_actual[2] -= tcp_offset_mm
tip_error = np.linalg.norm(tip_actual - target.corner_mm)
state.last_flange_error_mm = float(flange_error)
state.last_tip_error_mm = float(tip_error)
if not hold_after_touch and state.selected_corner_id in state.corner_targets:
target = state.corner_targets[state.selected_corner_id]
send_move(
target.flange_approach_mm[0],
target.flange_approach_mm[1],
target.flange_approach_mm[2],
home_roll,
home_pitch,
home_yaw,
)
state.stage = "retract_pending"
elif state.stage == "retract_pending":
state.stage = "at_approach"
continue
if event["id"] == pose_input:
tcp_pose = event["value"].to_numpy().astype(np.float64).reshape(-1)
if tcp_pose.size >= 6:
pose = tcp_pose[:6].tolist()
if pose_units == "m":
pose[0] *= 1000.0
pose[1] *= 1000.0
pose[2] *= 1000.0
state.latest_tcp_pose = pose
state.latest_tcp_pose_at = now
state.pose_history.append((now, pose))
continue
if event["id"] == camera_info_input:
if use_camera_info or state.intrinsics is None:
raw_bytes = event["value"].to_numpy().astype(np.uint8).tobytes()
dtype_str = str(event["metadata"].get("dtype", "float32"))
dtype_map = {"float32": np.float32, "float64": np.float64}
dtype = dtype_map.get(dtype_str, np.float32)
values = np.frombuffer(raw_bytes, dtype=dtype).astype(np.float64)
intrinsics = _intrinsics_from_camera_info(values, event["metadata"])
if intrinsics:
state.intrinsics = intrinsics
if not state.intrinsics_logged:
state.intrinsics_logged = True
print(
f"intrinsics fx={intrinsics.fx:.4f} fy={intrinsics.fy:.4f} "
f"cx={intrinsics.cx:.4f} cy={intrinsics.cy:.4f} "
f"dist={intrinsics.distortion.tolist()}",
flush=True,
)
continue
if event["id"] != image_input:
continue
storage = event["value"]
metadata = event.get("metadata", {})
if state.intrinsics is None:
intrinsics = _intrinsics_from_metadata(metadata)
if intrinsics:
state.intrinsics = intrinsics
if not state.intrinsics_logged:
state.intrinsics_logged = True
print(
f"intrinsics fx={intrinsics.fx:.4f} fy={intrinsics.fy:.4f} "
f"cx={intrinsics.cx:.4f} cy={intrinsics.cy:.4f} "
f"dist={intrinsics.distortion.tolist()}",
flush=True,
)
frame = _decode_image(storage, metadata)
debug = frame.copy()
if state.intrinsics is None:
cv2.putText(
debug,
"Missing camera intrinsics (fx,fy,cx,cy)",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 0, 255),
2,
)
cv2.imshow(WINDOW_NAME, debug)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
continue
rvec, tvec, charuco_ids, debug, detected, marker_count, charuco_count, pose_ok = (
_estimate_charuco_pose_with_ids(
frame, board, state.intrinsics, detector, min_corners
)
)
if marker_count == 0:
status = "NO MARKERS"
color = (0, 0, 255)
elif not pose_ok:
status = "MARKERS ONLY"
color = (0, 165, 255)
else:
status = "DETECTED"
color = (0, 255, 0)
state.corner_targets = {}
state.detected_ids = []
if pose_ok and charuco_ids is not None and state.latest_tcp_pose:
pose_stable = _pose_is_stable(
state.pose_history,
pose_stability_window,
pose_stability_eps_mm,
pose_stability_eps_deg,
)
if pose_stable or allow_unstable_pose:
base_T_flange = _pose_to_matrix(state.latest_tcp_pose)
rmat, _ = cv2.Rodrigues(rvec)
cam_T_board = np.eye(4)
cam_T_board[:3, :3] = rmat
cam_T_board[:3, 3] = tvec.flatten()
all_board_corners = board.getChessboardCorners()
for corner_id in charuco_ids.flatten():
corner_board = np.append(all_board_corners[int(corner_id)], 1.0)
corner_cam = cam_T_board @ corner_board
corner_base = base_T_flange @ t_cam2gripper @ corner_cam
corner_mm = corner_base[:3] * 1000.0
flange_target_mm = corner_mm.copy()
flange_target_mm[2] += tcp_offset_mm
flange_approach_mm = flange_target_mm.copy()
flange_approach_mm[2] += z_offset_mm
state.corner_targets[int(corner_id)] = CornerTarget(
corner_mm=corner_mm,
flange_target_mm=flange_target_mm,
flange_approach_mm=flange_approach_mm,
)
state.detected_ids = sorted(state.corner_targets.keys())
else:
state.status_text = "Waiting for pose to stabilize"
cv2.putText(
debug,
f"ChArUco: {status} | corners={charuco_count}",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
color,
2,
)
cv2.putText(
debug,
f"Detected IDs: {state.detected_ids}",
(10, 55),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
1,
)
cv2.putText(
debug,
f"Stage: {state.stage} | {state.status_text}",
(10, 80),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(255, 255, 255),
1,
)
if state.last_tip_error_mm is not None:
cv2.putText(
debug,
f"Last tip error: {state.last_tip_error_mm:.2f} mm",
(10, 105),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(255, 255, 255),
1,
)
cv2.imshow(WINDOW_NAME, debug)
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
if key == ord("h") and not state.pending_command:
if _within_bounds(np.array([home_x, home_y, home_z]), min_xyz, max_xyz):
send_move(home_x, home_y, home_z, home_roll, home_pitch, home_yaw)
state.stage = "home_pending"
else:
state.status_text = "Home pose outside workspace bounds"
elif key == ord("r"):
state.selected_corner_id = None
state.stage = "idle"
state.status_text = "Refreshed selection"
elif key == ord("t") and state.stage == "at_target" and not state.pending_command:
if state.selected_corner_id in state.corner_targets:
target = state.corner_targets[state.selected_corner_id]
send_move(
target.flange_approach_mm[0],
target.flange_approach_mm[1],
target.flange_approach_mm[2],
home_roll,
home_pitch,
home_yaw,
)
state.stage = "retract_pending"
elif ord("0") <= key <= ord("9") and not state.pending_command:
corner_id = int(chr(key))
if corner_id in state.corner_targets:
target = state.corner_targets[corner_id]
if _within_bounds(target.flange_approach_mm, min_xyz, max_xyz) and _within_bounds(
target.flange_target_mm, min_xyz, max_xyz
):
state.selected_corner_id = corner_id
send_move(
target.flange_approach_mm[0],
target.flange_approach_mm[1],
target.flange_approach_mm[2],
home_roll,
home_pitch,
home_yaw,
)
state.stage = "approach_pending"
state.status_text = f"Approaching corner {corner_id}"
else:
state.status_text = "Target outside workspace bounds"
else:
state.status_text = f"Corner {corner_id} not detected"
elif key == ord("y") and state.stage == "at_approach" and not state.pending_command:
if state.selected_corner_id in state.corner_targets:
target = state.corner_targets[state.selected_corner_id]
send_move(
target.flange_target_mm[0],
target.flange_target_mm[1],
target.flange_target_mm[2],
home_roll,
home_pitch,
home_yaw,
)
state.stage = "touch_pending"
cv2.destroyAllWindows()
if __name__ == "__main__":
main()