diff --git a/dataflow_verify_ulite6_zed.yml b/dataflow_verify_ulite6_zed.yml new file mode 100644 index 0000000..e5de1ad --- /dev/null +++ b/dataflow_verify_ulite6_zed.yml @@ -0,0 +1,71 @@ +nodes: + - id: zed_camera_cpp + build: bash -lc "cmake -S dora_zed_cpp -B dora_zed_cpp/build && cmake --build dora_zed_cpp/build" + path: dora_zed_cpp/build/dora_zed_cpp + env: + ZED_RESOLUTION: "720" + ZED_FPS: "15" + ZED_DEPTH_MODE: "NEURAL" + ZED_DEPTH_MIN_MM: "10" + ZED_DEPTH_MAX_MM: "600" + ZED_DEPTH_FILL: "false" + ZED_FLIP: "ON" + ZED_WARMUP_FRAMES: "30" + inputs: + tick: dora/timer/millis/100 + outputs: + - image_bgr + - camera_info + - id: ulite6 + build: uv pip install -e dora_ulite6 + path: dora_ulite6/dora_ulite6/main.py + inputs: + tick: dora/timer/millis/10 + command: verify/robot_cmd + outputs: + - tcp_pose + - status + env: + ROBOT_IP: "192.168.1.192" + DEFAULT_SPEED: "30" + DEFAULT_UNITS: "mm" + API_HOST: "0.0.0.0" + API_PORT: "9000" + VACUUM_ENABLED: "false" + - id: verify + build: | + uv venv -p 3.12 --seed --allow-existing + uv pip install -e dora_calibration + path: dora_calibration/dora_calibration/verify_eye_in_hand.py + env: + VIRTUAL_ENV: ./.venv + IMAGE_INPUT: "image_bgr" + CAMERA_INFO_INPUT: "camera_info" + POSE_INPUT: "tcp_pose" + STATUS_INPUT: "status" + COMMAND_OUTPUT: "robot_cmd" + CALIBRATION_FILE: "calibration_ulite6_zed.npz" + SQUARES_X: "4" + SQUARES_Y: "6" + SQUARE_LENGTH: "0.04" + MARKER_LENGTH: "0.03" + TCP_OFFSET_MM: "63.0" + Z_OFFSET_MM: "0.0" + HOME_X: "200" + HOME_Y: "0" + HOME_Z: "300" + HOME_ROLL: "180" + HOME_PITCH: "0" + HOME_YAW: "0" + MOVE_HOME_ON_START: "true" + POSE_STABILITY_WINDOW: "0.5" + POSE_STABILITY_EPS_MM: "0.5" + POSE_STABILITY_EPS_DEG: "0.5" + inputs: + image_bgr: zed_camera_cpp/image_bgr + camera_info: zed_camera_cpp/camera_info + tcp_pose: ulite6/tcp_pose + status: ulite6/status + tick: dora/timer/millis/100 + outputs: + - robot_cmd diff --git a/dora_calibration/dora_calibration/verify_eye_in_hand.py b/dora_calibration/dora_calibration/verify_eye_in_hand.py new file mode 100644 index 0000000..3916faa --- /dev/null +++ b/dora_calibration/dora_calibration/verify_eye_in_hand.py @@ -0,0 +1,754 @@ +"""Dora node for verifying eye-in-hand calibration with a ChArUco board.""" + +from __future__ import annotations + +import argparse +from collections import deque +import json +import os +import time +import uuid +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Tuple + +import cv2 +import numpy as np +import pyarrow as pa +from dora import Node + +WINDOW_NAME = "Dora Eye-in-Hand Verification" +DEFAULT_DICT = cv2.aruco.DICT_6X6_1000 + + +@dataclass +class CameraIntrinsics: + fx: float + fy: float + cx: float + cy: float + distortion: np.ndarray + + def camera_matrix(self) -> np.ndarray: + return np.array( + [[self.fx, 0.0, self.cx], [0.0, self.fy, self.cy], [0.0, 0.0, 1.0]], + dtype=np.float64, + ) + + +@dataclass +class CornerTarget: + corner_mm: np.ndarray + flange_target_mm: np.ndarray + flange_approach_mm: np.ndarray + + +@dataclass +class VerifyState: + intrinsics: Optional[CameraIntrinsics] = None + latest_tcp_pose: Optional[List[float]] = None + latest_tcp_pose_at: Optional[float] = None + pose_history: deque[Tuple[float, List[float]]] = field( + default_factory=lambda: deque(maxlen=10) + ) + last_status_at: Optional[float] = None + pending_command: Optional[Dict[str, Any]] = None + status_text: str = "" + detected_ids: List[int] = field(default_factory=list) + corner_targets: Dict[int, CornerTarget] = field(default_factory=dict) + selected_corner_id: Optional[int] = None + stage: str = "idle" + last_tip_error_mm: Optional[float] = None + last_flange_error_mm: Optional[float] = None + intrinsics_logged: bool = False + + +def _rotation_matrix_xyz( + roll_deg: float, pitch_deg: float, yaw_deg: float +) -> np.ndarray: + roll = np.deg2rad(roll_deg) + pitch = np.deg2rad(pitch_deg) + yaw = np.deg2rad(yaw_deg) + cx, sx = np.cos(roll), np.sin(roll) + cy, sy = np.cos(pitch), np.sin(pitch) + cz, sz = np.cos(yaw), np.sin(yaw) + rot_x = np.array([[1.0, 0.0, 0.0], [0.0, cx, -sx], [0.0, sx, cx]]) + rot_y = np.array([[cy, 0.0, sy], [0.0, 1.0, 0.0], [-sy, 0.0, cy]]) + rot_z = np.array([[cz, -sz, 0.0], [sz, cz, 0.0], [0.0, 0.0, 1.0]]) + return rot_z @ rot_y @ rot_x + + +def _pose_to_matrix(tcp_pose_mm_deg: List[float]) -> np.ndarray: + x, y, z, roll, pitch, yaw = tcp_pose_mm_deg + rot = _rotation_matrix_xyz(roll, pitch, yaw) + mat = np.eye(4) + mat[:3, :3] = rot + mat[:3, 3] = np.array([x, y, z], dtype=np.float64) / 1000.0 + return mat + + +def _try_get_charuco_board( + squares_x: int, squares_y: int, square_length: float, marker_length: float +) -> Any: + dictionary = cv2.aruco.getPredefinedDictionary(DEFAULT_DICT) + try: + board = cv2.aruco.CharucoBoard( + (squares_x, squares_y), square_length, marker_length, dictionary + ) + except AttributeError: + board = cv2.aruco.CharucoBoard_create( + squares_x, squares_y, square_length, marker_length, dictionary + ) + if hasattr(board, "setLegacyPattern"): + board.setLegacyPattern(True) + return board + + +def _create_charuco_detector(board: Any) -> Optional[Any]: + if not hasattr(cv2.aruco, "CharucoDetector"): + return None + aruco_params = cv2.aruco.DetectorParameters() + charuco_params = cv2.aruco.CharucoParameters() + charuco_params.minMarkers = 2 + return cv2.aruco.CharucoDetector(board, charuco_params, aruco_params) + + +def _decode_image(storage: pa.Array, metadata: Dict[str, Any]) -> np.ndarray: + encoding = str(metadata.get("encoding", "bgr8")).lower() + width = metadata.get("width") + height = metadata.get("height") + if (width is None or height is None) and "shape" in metadata: + shape = metadata.get("shape") + if isinstance(shape, (list, tuple)) and len(shape) >= 2: + height = height if height is not None else int(shape[0]) + width = width if width is not None else int(shape[1]) + if width is None or height is None: + raise KeyError("width/height (or shape) missing from metadata") + + if encoding == "bgr8": + frame = storage.to_numpy().astype(np.uint8).reshape((height, width, 3)) + return frame.copy() + if encoding == "rgb8": + frame = storage.to_numpy().astype(np.uint8).reshape((height, width, 3)) + return cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + if encoding in {"jpeg", "jpg", "jpe", "bmp", "webp", "png"}: + frame = storage.to_numpy().astype(np.uint8) + return cv2.imdecode(frame, cv2.IMREAD_COLOR) + if encoding == "yuv420": + yuv = storage.to_numpy().astype(np.uint8) + yuv = yuv[: width * height * 3 // 2].reshape((height * 3 // 2, width)) + return cv2.cvtColor(yuv, cv2.COLOR_YUV420p2BGR) + + raise RuntimeError(f"Unsupported image encoding: {encoding}") + + +def _intrinsics_from_metadata(metadata: Dict[str, Any]) -> Optional[CameraIntrinsics]: + if not all(k in metadata for k in ("fx", "fy", "cx", "cy")): + return None + distortion = metadata.get("distortion", []) + if distortion is None: + distortion = [] + dist = np.array(distortion, dtype=np.float64).reshape(-1) + if dist.size == 0: + dist = np.zeros(5, dtype=np.float64) + return CameraIntrinsics( + fx=float(metadata["fx"]), + fy=float(metadata["fy"]), + cx=float(metadata["cx"]), + cy=float(metadata["cy"]), + distortion=dist, + ) + + +def _intrinsics_from_camera_info( + values: np.ndarray, metadata: Dict[str, Any] +) -> Optional[CameraIntrinsics]: + if values.size < 4: + return None + fx, fy, cx, cy = [float(v) for v in values[:4]] + distortion = metadata.get("distortion", []) + if distortion is None: + distortion = [] + dist = np.array(distortion, dtype=np.float64).reshape(-1) + if dist.size == 0: + dist = np.zeros(5, dtype=np.float64) + return CameraIntrinsics(fx=fx, fy=fy, cx=cx, cy=cy, distortion=dist) + + +def _estimate_charuco_pose_with_ids( + frame: np.ndarray, + board: Any, + intrinsics: CameraIntrinsics, + detector: Optional[Any], + min_corners: int, +) -> Tuple[ + Optional[np.ndarray], + Optional[np.ndarray], + Optional[np.ndarray], + np.ndarray, + bool, + int, + int, + bool, +]: + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + if detector is not None: + charuco_corners, charuco_ids, marker_corners, marker_ids = detector.detectBoard( + frame + ) + else: + dictionary = ( + board.dictionary if hasattr(board, "dictionary") else board.getDictionary() + ) + marker_corners, marker_ids, _ = cv2.aruco.detectMarkers(gray, dictionary) + charuco_corners, charuco_ids = None, None + if marker_ids is not None and len(marker_ids) > 0: + _, charuco_corners, charuco_ids = cv2.aruco.interpolateCornersCharuco( + marker_corners, marker_ids, gray, board + ) + debug = frame.copy() + marker_count = 0 if marker_ids is None else len(marker_ids) + if marker_count == 0: + return None, None, None, debug, False, 0, 0, False + + cv2.aruco.drawDetectedMarkers(debug, marker_corners, marker_ids) + charuco_count = 0 if charuco_ids is None else len(charuco_ids) + if charuco_count < min_corners: + return None, None, None, debug, True, marker_count, charuco_count, False + + cv2.aruco.drawDetectedCornersCharuco(debug, charuco_corners, charuco_ids) + obj_points = board.getChessboardCorners()[charuco_ids.flatten()] + ok, rvec, tvec = cv2.solvePnP( + obj_points, + charuco_corners, + intrinsics.camera_matrix(), + intrinsics.distortion, + flags=cv2.SOLVEPNP_IPPE, + ) + if not ok: + return None, None, None, debug, True, marker_count, charuco_count, False + + rvec, tvec = cv2.solvePnPRefineVVS( + obj_points, + charuco_corners, + intrinsics.camera_matrix(), + intrinsics.distortion, + rvec, + tvec, + ) + + cv2.drawFrameAxes( + debug, + intrinsics.camera_matrix(), + intrinsics.distortion, + rvec, + tvec, + 0.05, + ) + return rvec, tvec, charuco_ids, debug, True, marker_count, charuco_count, True + + +def _send_dora_command( + node: Node, output_name: str, action: str, payload: Dict[str, Any] +) -> str: + command_id = str(uuid.uuid4()) + message = {"id": command_id, "action": action, "payload": payload} + node.send_output( + output_name, + pa.array([json.dumps(message)]), + metadata={"encoding": "json", "timestamp_ns": time.time_ns()}, + ) + return command_id + + +def _parse_status_payload(value: pa.Array) -> Optional[Dict[str, Any]]: + if len(value) == 0: + return None + raw = value[0].as_py() + if not raw: + return None + try: + return json.loads(raw) + except Exception: + return None + + +def _pose_is_stable( + pose_history: deque[Tuple[float, List[float]]], + window_s: float, + eps_mm: float, + eps_deg: float, +) -> bool: + if not pose_history: + return False + latest_t = pose_history[-1][0] + window = [p for (t, p) in pose_history if latest_t - t <= window_s] + if len(window) < 2: + return False + positions = np.array([p[:3] for p in window], dtype=np.float64) + rotations = np.array([p[3:6] for p in window], dtype=np.float64) + pos_span = positions.max(axis=0) - positions.min(axis=0) + rot_span = rotations.max(axis=0) - rotations.min(axis=0) + return bool(np.all(pos_span <= eps_mm) and np.all(rot_span <= eps_deg)) + + +def _within_bounds( + point_mm: np.ndarray, + min_xyz: Tuple[Optional[float], Optional[float], Optional[float]], + max_xyz: Tuple[Optional[float], Optional[float], Optional[float]], +) -> bool: + x, y, z = point_mm.tolist() + min_x, min_y, min_z = min_xyz + max_x, max_y, max_z = max_xyz + if min_x is not None and x < min_x: + return False + if max_x is not None and x > max_x: + return False + if min_y is not None and y < min_y: + return False + if max_y is not None and y > max_y: + return False + if min_z is not None and z < min_z: + return False + if max_z is not None and z > max_z: + return False + return True + + +def _parse_float_env(name: str) -> Optional[float]: + raw = os.getenv(name) + if raw is None or raw == "": + return None + try: + return float(raw) + except ValueError: + return None + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Eye-in-hand verification node with minimal OpenCV UI.", + ) + parser.add_argument("--name", type=str, default="dora-calibration-verify") + args = parser.parse_args() + + image_input = os.getenv("IMAGE_INPUT", "image") + camera_info_input = os.getenv("CAMERA_INFO_INPUT", "camera_info") + pose_input = os.getenv("POSE_INPUT", "tcp_pose") + status_input = os.getenv("STATUS_INPUT", "status") + command_output = os.getenv("COMMAND_OUTPUT", "robot_cmd") + pose_units = os.getenv("POSE_UNITS", "mm").lower() + use_camera_info = os.getenv("USE_CAMERA_INFO", "false").lower() in ( + "true", + "1", + "yes", + ) + min_corners = int(os.getenv("MIN_CORNERS", "6")) + move_speed = _parse_float_env("MOVE_SPEED") + move_home_on_start = os.getenv("MOVE_HOME_ON_START", "true").lower() in ( + "true", + "1", + "yes", + ) + tcp_offset_mm = float(os.getenv("TCP_OFFSET_MM", "63.0")) + z_offset_mm = float(os.getenv("Z_OFFSET_MM", "50.0")) + dry_run = os.getenv("DRY_RUN", "false").lower() in ("true", "1", "yes") + hold_after_touch = os.getenv("HOLD_AFTER_TOUCH", "false").lower() in ( + "true", + "1", + "yes", + ) + pose_stability_window = float(os.getenv("POSE_STABILITY_WINDOW", "0.5")) + pose_stability_eps_mm = float(os.getenv("POSE_STABILITY_EPS_MM", "0.5")) + pose_stability_eps_deg = float(os.getenv("POSE_STABILITY_EPS_DEG", "0.5")) + allow_unstable_pose = os.getenv("ALLOW_UNSTABLE_POSE", "false").lower() in ( + "true", + "1", + "yes", + ) + + squares_x = int(os.getenv("SQUARES_X", "4")) + squares_y = int(os.getenv("SQUARES_Y", "6")) + square_length = float(os.getenv("SQUARE_LENGTH", "0.04")) + marker_length = float(os.getenv("MARKER_LENGTH", "0.03")) + + home_x = float(os.getenv("HOME_X", "200")) + home_y = float(os.getenv("HOME_Y", "0")) + home_z = float(os.getenv("HOME_Z", "300")) + home_roll = float(os.getenv("HOME_ROLL", "180")) + home_pitch = float(os.getenv("HOME_PITCH", "0")) + home_yaw = float(os.getenv("HOME_YAW", "0")) + + min_xyz = ( + _parse_float_env("WORKSPACE_MIN_X"), + _parse_float_env("WORKSPACE_MIN_Y"), + _parse_float_env("WORKSPACE_MIN_Z"), + ) + max_xyz = ( + _parse_float_env("WORKSPACE_MAX_X"), + _parse_float_env("WORKSPACE_MAX_Y"), + _parse_float_env("WORKSPACE_MAX_Z"), + ) + + calibration_file = os.getenv("CALIBRATION_FILE", "calibration.npz") + if calibration_file and not os.path.isabs(calibration_file): + config_path = os.path.join("config", calibration_file) + calibration_file = config_path if os.path.exists(config_path) else calibration_file + + print(f"Loading calibration from {calibration_file}...", flush=True) + try: + calib_data = np.load(calibration_file, allow_pickle=True) + t_cam2gripper = calib_data["T_cam2gripper"] + calib_type = str(calib_data.get("calibration_type", "unknown")) + file_camera_matrix = calib_data.get("camera_matrix") + file_distortion = calib_data.get("distortion") + except Exception as exc: + raise RuntimeError(f"Failed to load calibration file: {exc}") from exc + + if calib_type != "eye_in_hand": + print( + f'Warning: calibration_type="{calib_type}", expected "eye_in_hand".', + flush=True, + ) + + file_intrinsics: Optional[CameraIntrinsics] = None + if file_camera_matrix is not None: + try: + fx = float(file_camera_matrix[0][0]) + fy = float(file_camera_matrix[1][1]) + cx = float(file_camera_matrix[0][2]) + cy = float(file_camera_matrix[1][2]) + distortion = ( + np.array(file_distortion, dtype=np.float64).reshape(-1) + if file_distortion is not None + else np.zeros(5, dtype=np.float64) + ) + if distortion.size == 0: + distortion = np.zeros(5, dtype=np.float64) + file_intrinsics = CameraIntrinsics( + fx=fx, fy=fy, cx=cx, cy=cy, distortion=distortion + ) + except Exception: + file_intrinsics = None + + state = VerifyState() + if not use_camera_info and file_intrinsics is not None: + state.intrinsics = file_intrinsics + state.intrinsics_logged = True + print( + f"Using intrinsics from calibration file fx={file_intrinsics.fx:.4f} " + f"fy={file_intrinsics.fy:.4f} cx={file_intrinsics.cx:.4f} " + f"cy={file_intrinsics.cy:.4f} dist={file_intrinsics.distortion.tolist()}", + flush=True, + ) + + board = _try_get_charuco_board(squares_x, squares_y, square_length, marker_length) + detector = _create_charuco_detector(board) + + node = Node(args.name) + cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL) + started = False + + def send_move(x: float, y: float, z: float, roll: float, pitch: float, yaw: float) -> None: + payload: Dict[str, Any] = { + "x": float(x), + "y": float(y), + "z": float(z), + "roll": float(roll), + "pitch": float(pitch), + "yaw": float(yaw), + } + if move_speed is not None: + payload["speed"] = move_speed + if dry_run: + state.status_text = "DRY_RUN: move suppressed" + return + cmd_id = _send_dora_command(node, command_output, "move_to_pose", payload) + state.pending_command = { + "id": cmd_id, + "action": "move_to_pose", + "sent_at": time.monotonic(), + "payload": payload, + } + + for event in node: + if event["type"] != "INPUT": + continue + + now = time.monotonic() + if not started: + started = True + if move_home_on_start and not state.pending_command: + send_move(home_x, home_y, home_z, home_roll, home_pitch, home_yaw) + state.stage = "home_pending" + state.status_text = "Sent home on start" + if state.pending_command and now - state.pending_command["sent_at"] > 30.0: + state.status_text = f"Command timeout ({state.pending_command['action']})" + state.pending_command = None + + if event["id"] == status_input: + payload = _parse_status_payload(event["value"]) + state.last_status_at = now + if payload and state.pending_command: + if payload.get("command_id") == state.pending_command["id"]: + ok = bool(payload.get("ok", False)) + msg = payload.get("message", "") + state.status_text = "Command ok" if ok else f"Command failed: {msg}" + state.pending_command = None + if ok: + if state.stage == "approach_pending": + state.stage = "at_approach" + elif state.stage == "touch_pending": + state.stage = "at_target" + if state.selected_corner_id in state.corner_targets: + target = state.corner_targets[state.selected_corner_id] + if state.latest_tcp_pose: + actual = np.array(state.latest_tcp_pose[:3]) + flange_error = np.linalg.norm( + actual - target.flange_target_mm + ) + tip_actual = actual.copy() + # Assumes tool tip offset along -Z in base frame (roll=180,pitch=0,yaw=0). + tip_actual[2] -= tcp_offset_mm + tip_error = np.linalg.norm(tip_actual - target.corner_mm) + state.last_flange_error_mm = float(flange_error) + state.last_tip_error_mm = float(tip_error) + if not hold_after_touch and state.selected_corner_id in state.corner_targets: + target = state.corner_targets[state.selected_corner_id] + send_move( + target.flange_approach_mm[0], + target.flange_approach_mm[1], + target.flange_approach_mm[2], + home_roll, + home_pitch, + home_yaw, + ) + state.stage = "retract_pending" + elif state.stage == "retract_pending": + state.stage = "at_approach" + continue + + if event["id"] == pose_input: + tcp_pose = event["value"].to_numpy().astype(np.float64).reshape(-1) + if tcp_pose.size >= 6: + pose = tcp_pose[:6].tolist() + if pose_units == "m": + pose[0] *= 1000.0 + pose[1] *= 1000.0 + pose[2] *= 1000.0 + state.latest_tcp_pose = pose + state.latest_tcp_pose_at = now + state.pose_history.append((now, pose)) + continue + + if event["id"] == camera_info_input: + if use_camera_info or state.intrinsics is None: + raw_bytes = event["value"].to_numpy().astype(np.uint8).tobytes() + dtype_str = str(event["metadata"].get("dtype", "float32")) + dtype_map = {"float32": np.float32, "float64": np.float64} + dtype = dtype_map.get(dtype_str, np.float32) + values = np.frombuffer(raw_bytes, dtype=dtype).astype(np.float64) + intrinsics = _intrinsics_from_camera_info(values, event["metadata"]) + if intrinsics: + state.intrinsics = intrinsics + if not state.intrinsics_logged: + state.intrinsics_logged = True + print( + f"intrinsics fx={intrinsics.fx:.4f} fy={intrinsics.fy:.4f} " + f"cx={intrinsics.cx:.4f} cy={intrinsics.cy:.4f} " + f"dist={intrinsics.distortion.tolist()}", + flush=True, + ) + continue + + if event["id"] != image_input: + continue + + storage = event["value"] + metadata = event.get("metadata", {}) + if state.intrinsics is None: + intrinsics = _intrinsics_from_metadata(metadata) + if intrinsics: + state.intrinsics = intrinsics + if not state.intrinsics_logged: + state.intrinsics_logged = True + print( + f"intrinsics fx={intrinsics.fx:.4f} fy={intrinsics.fy:.4f} " + f"cx={intrinsics.cx:.4f} cy={intrinsics.cy:.4f} " + f"dist={intrinsics.distortion.tolist()}", + flush=True, + ) + + frame = _decode_image(storage, metadata) + debug = frame.copy() + + if state.intrinsics is None: + cv2.putText( + debug, + "Missing camera intrinsics (fx,fy,cx,cy)", + (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, + 0.7, + (0, 0, 255), + 2, + ) + cv2.imshow(WINDOW_NAME, debug) + if cv2.waitKey(1) & 0xFF == ord("q"): + break + continue + + rvec, tvec, charuco_ids, debug, detected, marker_count, charuco_count, pose_ok = ( + _estimate_charuco_pose_with_ids( + frame, board, state.intrinsics, detector, min_corners + ) + ) + + if marker_count == 0: + status = "NO MARKERS" + color = (0, 0, 255) + elif not pose_ok: + status = "MARKERS ONLY" + color = (0, 165, 255) + else: + status = "DETECTED" + color = (0, 255, 0) + + state.corner_targets = {} + state.detected_ids = [] + + if pose_ok and charuco_ids is not None and state.latest_tcp_pose: + pose_stable = _pose_is_stable( + state.pose_history, + pose_stability_window, + pose_stability_eps_mm, + pose_stability_eps_deg, + ) + if pose_stable or allow_unstable_pose: + base_T_flange = _pose_to_matrix(state.latest_tcp_pose) + rmat, _ = cv2.Rodrigues(rvec) + cam_T_board = np.eye(4) + cam_T_board[:3, :3] = rmat + cam_T_board[:3, 3] = tvec.flatten() + all_board_corners = board.getChessboardCorners() + for corner_id in charuco_ids.flatten(): + corner_board = np.append(all_board_corners[int(corner_id)], 1.0) + corner_cam = cam_T_board @ corner_board + corner_base = base_T_flange @ t_cam2gripper @ corner_cam + corner_mm = corner_base[:3] * 1000.0 + flange_target_mm = corner_mm.copy() + flange_target_mm[2] += tcp_offset_mm + flange_approach_mm = flange_target_mm.copy() + flange_approach_mm[2] += z_offset_mm + state.corner_targets[int(corner_id)] = CornerTarget( + corner_mm=corner_mm, + flange_target_mm=flange_target_mm, + flange_approach_mm=flange_approach_mm, + ) + state.detected_ids = sorted(state.corner_targets.keys()) + else: + state.status_text = "Waiting for pose to stabilize" + + cv2.putText( + debug, + f"ChArUco: {status} | corners={charuco_count}", + (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, + 0.7, + color, + 2, + ) + cv2.putText( + debug, + f"Detected IDs: {state.detected_ids}", + (10, 55), + cv2.FONT_HERSHEY_SIMPLEX, + 0.6, + (255, 255, 255), + 1, + ) + cv2.putText( + debug, + f"Stage: {state.stage} | {state.status_text}", + (10, 80), + cv2.FONT_HERSHEY_SIMPLEX, + 0.55, + (255, 255, 255), + 1, + ) + if state.last_tip_error_mm is not None: + cv2.putText( + debug, + f"Last tip error: {state.last_tip_error_mm:.2f} mm", + (10, 105), + cv2.FONT_HERSHEY_SIMPLEX, + 0.55, + (255, 255, 255), + 1, + ) + + cv2.imshow(WINDOW_NAME, debug) + key = cv2.waitKey(1) & 0xFF + + if key == ord("q"): + break + if key == ord("h") and not state.pending_command: + if _within_bounds(np.array([home_x, home_y, home_z]), min_xyz, max_xyz): + send_move(home_x, home_y, home_z, home_roll, home_pitch, home_yaw) + state.stage = "home_pending" + else: + state.status_text = "Home pose outside workspace bounds" + elif key == ord("r"): + state.selected_corner_id = None + state.stage = "idle" + state.status_text = "Refreshed selection" + elif key == ord("t") and state.stage == "at_target" and not state.pending_command: + if state.selected_corner_id in state.corner_targets: + target = state.corner_targets[state.selected_corner_id] + send_move( + target.flange_approach_mm[0], + target.flange_approach_mm[1], + target.flange_approach_mm[2], + home_roll, + home_pitch, + home_yaw, + ) + state.stage = "retract_pending" + elif ord("0") <= key <= ord("9") and not state.pending_command: + corner_id = int(chr(key)) + if corner_id in state.corner_targets: + target = state.corner_targets[corner_id] + if _within_bounds(target.flange_approach_mm, min_xyz, max_xyz) and _within_bounds( + target.flange_target_mm, min_xyz, max_xyz + ): + state.selected_corner_id = corner_id + send_move( + target.flange_approach_mm[0], + target.flange_approach_mm[1], + target.flange_approach_mm[2], + home_roll, + home_pitch, + home_yaw, + ) + state.stage = "approach_pending" + state.status_text = f"Approaching corner {corner_id}" + else: + state.status_text = "Target outside workspace bounds" + else: + state.status_text = f"Corner {corner_id} not detected" + elif key == ord("y") and state.stage == "at_approach" and not state.pending_command: + if state.selected_corner_id in state.corner_targets: + target = state.corner_targets[state.selected_corner_id] + send_move( + target.flange_target_mm[0], + target.flange_target_mm[1], + target.flange_target_mm[2], + home_roll, + home_pitch, + home_yaw, + ) + state.stage = "touch_pending" + + cv2.destroyAllWindows() + + +if __name__ == "__main__": + main()