Work putting one object over the other

This commit is contained in:
cristhian aguilera
2026-02-03 10:31:10 -03:00
parent 10e6792217
commit 45bbdcb9f5
9 changed files with 275 additions and 44 deletions

View File

@@ -20,3 +20,11 @@ big_height = 125.9
small_height = 106.0 small_height = 106.0
bottom_height = 68.0 bottom_height = 68.0
normal_height = 220.0 normal_height = 220.0
# Physical object heights in mm (used for stack-aware placement)
[object_heights]
cube_big = 40.0
cube_small = 40.0
cylinder_big = 40.0
cylinder_small = 40.0
default = 40.0

View File

@@ -82,7 +82,7 @@ nodes:
- id: voice - id: voice
build: | build: |
uv venv -p 3.12 --seed --allow-existing uv venv -p 3.12 --seed --allow-existing
uv pip install -e dora_voice_control uv pip install -e "dora_voice_control[llm]"
path: dora_voice_control/dora_voice_control/main.py path: dora_voice_control/dora_voice_control/main.py
env: env:
VIRTUAL_ENV: ./.venv VIRTUAL_ENV: ./.venv
@@ -104,6 +104,14 @@ nodes:
DEFAULT_PITCH: "0.0" DEFAULT_PITCH: "0.0"
DEFAULT_YAW: "0.0" DEFAULT_YAW: "0.0"
DRY_RUN: "false" DRY_RUN: "false"
# Object heights for stack-aware placement (in mm)
OBJECT_HEIGHT_DEFAULT: "40.0"
OBJECT_HEIGHT_CUBE: "40.0"
OBJECT_HEIGHT_CYLINDER: "40.0"
STACK_CLEARANCE_MM: "5.0"
# LLM provider for command parsing: "rules", "gemini", "ollama", "openai"
LLM_PROVIDER: "gemini"
LLM_MODEL: "gemini-2.0-flash"
# Initial position (used on startup and reset command) # Initial position (used on startup and reset command)
INIT_ON_START: "true" INIT_ON_START: "true"
INIT_X: "250.0" INIT_X: "250.0"

View File

@@ -129,8 +129,9 @@ class RobotBehavior(ABC):
def _init_dspy(self, llm_config: LLMConfig) -> None: def _init_dspy(self, llm_config: LLMConfig) -> None:
"""Initialize DSPy predictor for this behavior.""" """Initialize DSPy predictor for this behavior."""
_log(f"Initializing DSPy with provider={llm_config.provider}, model={llm_config.model}")
if not DSPY_AVAILABLE: if not DSPY_AVAILABLE:
_log("DSPy not available, falling back to rules") _log("DSPy not available (import failed), falling back to rules")
return return
if self.CommandSignature is None: if self.CommandSignature is None:
_log("No CommandSignature defined, falling back to rules") _log("No CommandSignature defined, falling back to rules")
@@ -141,8 +142,12 @@ class RobotBehavior(ABC):
dspy.configure(lm=lm) dspy.configure(lm=lm)
self._predictor = dspy.Predict(self.CommandSignature) self._predictor = dspy.Predict(self.CommandSignature)
_log(f"DSPy initialized with {llm_config.provider}/{llm_config.model}") _log(f"DSPy initialized with {llm_config.provider}/{llm_config.model}")
else:
_log(f"Failed to create LM for provider={llm_config.provider}")
except Exception as e: except Exception as e:
import traceback
_log(f"Failed to initialize DSPy: {e}") _log(f"Failed to initialize DSPy: {e}")
_log(f"Traceback: {traceback.format_exc()}")
def _create_lm(self, config: LLMConfig) -> Optional[Any]: def _create_lm(self, config: LLMConfig) -> Optional[Any]:
"""Create DSPy language model.""" """Create DSPy language model."""
@@ -171,8 +176,10 @@ class RobotBehavior(ABC):
def parse_command(self, transcript: str) -> Dict[str, str]: def parse_command(self, transcript: str) -> Dict[str, str]:
"""Parse voice command using DSPy or fallback to rules.""" """Parse voice command using DSPy or fallback to rules."""
if self._predictor: if self._predictor:
_log(f"Using DSPy to parse: '{transcript}'")
try: try:
result = self._predictor(comando=transcript) result = self._predictor(comando=transcript)
_log(f"DSPy result: accion={result.accion}, objeto={result.objeto}, color={result.color}, tamano={result.tamano}")
return { return {
"resultado": "ok" if result.accion != "error" else "error", "resultado": "ok" if result.accion != "error" else "error",
"accion": result.accion, "accion": result.accion,
@@ -181,7 +188,11 @@ class RobotBehavior(ABC):
"tamano": result.tamano, "tamano": result.tamano,
} }
except Exception as e: except Exception as e:
import traceback
_log(f"DSPy parsing failed: {e}, falling back to rules") _log(f"DSPy parsing failed: {e}, falling back to rules")
_log(f"Traceback: {traceback.format_exc()}")
else:
_log(f"No DSPy predictor, using rules to parse: '{transcript}'")
return self.rule_parse(transcript) return self.rule_parse(transcript)
def rule_parse(self, transcript: str) -> Dict[str, str]: def rule_parse(self, transcript: str) -> Dict[str, str]:

View File

@@ -73,6 +73,10 @@ class SceneState:
def __init__(self) -> None: def __init__(self) -> None:
self._lock = threading.Lock() self._lock = threading.Lock()
self._objects: Dict[str, SceneObject] = {} self._objects: Dict[str, SceneObject] = {}
# Scene update mode: "static" (default) or "dynamic"
self._update_mode: str = os.getenv("SCENE_UPDATE_MODE", "static").lower()
# Whether scene has been captured (STATIC mode only)
self._scene_captured: bool = False
# === Core Operations === # === Core Operations ===
@@ -146,6 +150,8 @@ class SceneState:
for obj in self._objects.values(): for obj in self._objects.values():
if obj.on_top_of and obj.on_top_of not in self._objects: if obj.on_top_of and obj.on_top_of not in self._objects:
obj.on_top_of = None obj.on_top_of = None
# Reset capture flag to allow next detection
self._scene_captured = False
# === Query === # === Query ===
@@ -176,6 +182,22 @@ class SceneState:
with self._lock: with self._lock:
return len(self._objects) return len(self._objects)
# === Scene Update Mode ===
def is_static_mode(self) -> bool:
"""Check if in STATIC update mode."""
return self._update_mode == "static"
def is_captured(self) -> bool:
"""Check if scene has been captured (STATIC mode)."""
with self._lock:
return self._scene_captured
def reset_capture(self) -> None:
"""Reset capture flag to allow next detection to update scene."""
with self._lock:
self._scene_captured = False
# === Spatial Relationships === # === Spatial Relationships ===
def set_on_top_of(self, object_id: str, below_id: Optional[str]) -> bool: def set_on_top_of(self, object_id: str, below_id: Optional[str]) -> bool:
@@ -560,6 +582,10 @@ class ObjectsHandler:
if not raw: if not raw:
return return
# In STATIC mode, ignore updates after scene is captured
if self._scene.is_static_mode() and self._scene.is_captured():
return
try: try:
payload = json.loads(raw) payload = json.loads(raw)
objects = payload.get("objects", []) objects = payload.get("objects", [])
@@ -569,5 +595,11 @@ class ObjectsHandler:
self._scene.replace_detected(objects) self._scene.replace_detected(objects)
# Mark scene as captured after first successful update (STATIC mode)
if self._scene.is_static_mode():
with self._scene._lock:
self._scene._scene_captured = True
self._logger.log("Scene captured (STATIC mode)")
# Emit scene update # Emit scene update
self._notifier.send_scene_update() self._notifier.send_scene_update()

View File

@@ -24,6 +24,7 @@ class VoiceState:
latest_pose_at: Optional[float] = None latest_pose_at: Optional[float] = None
pending_command: Optional[Dict[str, Any]] = None pending_command: Optional[Dict[str, Any]] = None
queue: Deque[RobotStep] = field(default_factory=deque) queue: Deque[RobotStep] = field(default_factory=deque)
held_object_id: Optional[str] = None # ID of currently held object
@dataclass @dataclass
@@ -192,6 +193,21 @@ class SharedState:
with self._lock: with self._lock:
return self._debug_state.last_parse_result return self._debug_state.last_parse_result
def get_held_object_id(self) -> Optional[str]:
"""Get the ID of the currently held object."""
with self._lock:
return self._voice_state.held_object_id
def set_held_object_id(self, object_id: Optional[str]) -> None:
"""Set the ID of the currently held object."""
with self._lock:
self._voice_state.held_object_id = object_id
def clear_held_object(self) -> None:
"""Clear the held object (after releasing)."""
with self._lock:
self._voice_state.held_object_id = None
def _age_ms(timestamp: Optional[float]) -> Optional[int]: def _age_ms(timestamp: Optional[float]) -> Optional[int]:
"""Calculate age in milliseconds from monotonic timestamp.""" """Calculate age in milliseconds from monotonic timestamp."""

View File

@@ -8,39 +8,39 @@ from ...core.behavior import ActionInfo
LITTLEHAND_ACTIONS: dict[str, ActionInfo] = { LITTLEHAND_ACTIONS: dict[str, ActionInfo] = {
"subir": ActionInfo( "subir": ActionInfo(
name="subir", name="subir",
aliases=["sube", "arriba"], aliases=[],
requires_pose=True, requires_pose=True,
description="Subir el robot", description="Subir el robot",
), ),
"bajar": ActionInfo( "bajar": ActionInfo(
name="bajar", name="bajar",
aliases=["baja", "abajo"], aliases=[],
requires_pose=True, requires_pose=True,
description="Bajar el robot", description="Bajar el robot",
), ),
"ir": ActionInfo( "ir": ActionInfo(
name="ir", name="ir",
aliases=["ve", "mover", "muevete", "acercar"], aliases=[],
requires_object=True, requires_object=True,
description="Ir hacia un objeto", description="Ir hacia un objeto",
), ),
"tomar": ActionInfo( "tomar": ActionInfo(
name="tomar", name="tomar",
aliases=["toma", "agarra", "agarrar", "coger", "chupar", "succionar"], aliases=[],
requires_pose=False, requires_pose=False,
requires_object=False, requires_object=False,
description="Tomar un objeto", description="Tomar un objeto",
), ),
"soltar": ActionInfo( "soltar": ActionInfo(
name="soltar", name="soltar",
aliases=["deja", "dejar"], aliases=[],
requires_pose=False, requires_pose=False,
requires_object=False, requires_object=False,
description="Soltar el objeto", description="Soltar el objeto",
), ),
"reiniciar": ActionInfo( "reiniciar": ActionInfo(
name="reiniciar", name="reiniciar",
aliases=["reinicia", "reset"], aliases=[],
requires_pose=False, requires_pose=False,
requires_object=False, requires_object=False,
description="Reiniciar a posicion inicial", description="Reiniciar a posicion inicial",

View File

@@ -10,6 +10,7 @@ from .actions import LITTLEHAND_ACTIONS
from .signature import LittlehandSignature from .signature import LittlehandSignature
_XY_MATCH_RADIUS_MM = float(os.getenv("BAJAR_XY_RADIUS_MM", "40.0")) _XY_MATCH_RADIUS_MM = float(os.getenv("BAJAR_XY_RADIUS_MM", "40.0"))
_STACK_CLEARANCE_MM = float(os.getenv("STACK_CLEARANCE_MM", "5.0")) # Clearance when placing on top of objects
class LittlehandBehavior(RobotBehavior): class LittlehandBehavior(RobotBehavior):
"""Littlehand behavior using the default pick-and-place actions.""" """Littlehand behavior using the default pick-and-place actions."""
@@ -35,20 +36,92 @@ class LittlehandBehavior(RobotBehavior):
return self._queue_move(ctx, ctx.pose[0], ctx.pose[1], target_z) return self._queue_move(ctx, ctx.pose[0], ctx.pose[1], target_z)
def action_bajar(self, ctx: ActionContext) -> bool: def action_bajar(self, ctx: ActionContext) -> bool:
"""Move down by step_mm or to top of object under the tool.""" """Move down by step_mm or to top of object under the tool.
target = self._find_object_under_pose(ctx)
if target is not None: If holding an object, accounts for its height when placing on obstacles.
target_z = target.position_mm[2] + ctx.config.tcp_offset_mm
Note: position_mm[2] from the camera represents the TOP surface of the object
(camera looks down, so it sees the top). We use height_mm only for the HELD
object to calculate placement position.
"""
obstacle = self._find_object_under_pose(ctx)
# Get held object height for stack-aware placement
held_height = self._get_held_object_height(ctx)
if obstacle is not None:
# obstacle.position_mm[2] is the TOP surface of the obstacle
obstacle_top_z = obstacle.position_mm[2]
if held_height > 0:
# Stack-aware: place held object on top of obstacle
# When vacuum releases, bottom of held object should be at obstacle_top
# TCP needs to be at: obstacle_top + held_height + tcp_offset
target_z = obstacle_top_z + held_height + _STACK_CLEARANCE_MM + ctx.config.tcp_offset_mm
_log( _log(
f"bajar: using object '{target.object_type}' color={target.color} " f"bajar: STACK-AWARE placement on '{obstacle.object_type}' color={obstacle.color} "
f"obj_z={target.position_mm[2]:.1f} tcp_offset={ctx.config.tcp_offset_mm:.1f} " f"obstacle_top_z={obstacle_top_z:.1f} held_height={held_height:.1f} "
f"target_z={target_z:.1f} at pose_z={ctx.pose[2]:.1f}" f"clearance={_STACK_CLEARANCE_MM:.1f} tcp_offset={ctx.config.tcp_offset_mm:.1f} "
f"target_z={target_z:.1f}"
)
else:
# Not holding anything: move TCP to object top (for grabbing)
target_z = obstacle_top_z + ctx.config.tcp_offset_mm
_log(
f"bajar: move to object '{obstacle.object_type}' color={obstacle.color} "
f"obstacle_top_z={obstacle_top_z:.1f} tcp_offset={ctx.config.tcp_offset_mm:.1f} "
f"target_z={target_z:.1f}"
) )
return self._queue_move(ctx, ctx.pose[0], ctx.pose[1], target_z) return self._queue_move(ctx, ctx.pose[0], ctx.pose[1], target_z)
target_z = ctx.pose[2] - self.config.step_mm target_z = ctx.pose[2] - self.config.step_mm
_log(f"bajar: no object under pose, step to z={target_z:.1f}") _log(f"bajar: no object under pose, step to z={target_z:.1f}")
return self._queue_move(ctx, ctx.pose[0], ctx.pose[1], target_z) return self._queue_move(ctx, ctx.pose[0], ctx.pose[1], target_z)
def _get_held_object_height(self, ctx: ActionContext) -> float:
"""Get the height of the currently held object.
Uses configured height from config.toml [object_heights] section,
falling back to the detected height_mm.
"""
held_id = ctx.shared_state.get_held_object_id()
if not held_id:
return 0.0
held_obj = ctx.scene.get(held_id)
if not held_obj:
_log(f"bajar: held object id={held_id} not found in scene")
return 0.0
# Use configured height based on object type and size
height = self._get_configured_height(held_obj.object_type, held_obj.size)
_log(f"bajar: holding object id={held_id} type={held_obj.object_type} size={held_obj.size} height={height:.1f}mm")
return height
def _get_configured_height(self, object_type: str, size: str) -> float:
"""Get configured height for object type/size from environment or defaults."""
# Try specific key like OBJECT_HEIGHT_CUBE_BIG
key = f"OBJECT_HEIGHT_{object_type.upper()}_{size.upper()}"
height_str = os.getenv(key)
if height_str:
try:
return float(height_str)
except ValueError:
pass
# Try generic key like OBJECT_HEIGHT_CUBE
key = f"OBJECT_HEIGHT_{object_type.upper()}"
height_str = os.getenv(key)
if height_str:
try:
return float(height_str)
except ValueError:
pass
# Default height (configurable via OBJECT_HEIGHT_DEFAULT)
default = float(os.getenv("OBJECT_HEIGHT_DEFAULT", "40.0"))
return default
def action_ir(self, ctx: ActionContext) -> bool: def action_ir(self, ctx: ActionContext) -> bool:
"""Move to object X/Y while keeping current Z.""" """Move to object X/Y while keeping current Z."""
if ctx.pose is None or ctx.target is None: if ctx.pose is None or ctx.target is None:
@@ -57,24 +130,45 @@ class LittlehandBehavior(RobotBehavior):
return self._queue_move(ctx, pos[0], pos[1], ctx.pose[2]) return self._queue_move(ctx, pos[0], pos[1], ctx.pose[2])
def action_tomar(self, ctx: ActionContext) -> bool: def action_tomar(self, ctx: ActionContext) -> bool:
"""Activate tool (low-level grab).""" """Activate tool (low-level grab) and track held object."""
self._queue_steps(ctx, self.robot_adapter.grab()) self._queue_steps(ctx, self.robot_adapter.grab())
# Find the object under current pose to track for stack-aware placement
# (ctx.target may be None since requires_object=False in littlehand)
target_obj = ctx.target or self._find_object_under_pose(ctx)
if target_obj is not None:
ctx.shared_state.set_held_object_id(target_obj.id)
height = self._get_configured_height(target_obj.object_type, target_obj.size)
_log(f"tomar: now holding object id={target_obj.id} type={target_obj.object_type} height={height:.1f}mm")
else:
_log("tomar: no object found under pose, not tracking held object")
return True return True
def action_soltar(self, ctx: ActionContext) -> bool: def action_soltar(self, ctx: ActionContext) -> bool:
"""Deactivate tool (low-level release).""" """Deactivate tool (low-level release) and clear held object."""
self._queue_steps(ctx, self.robot_adapter.release()) self._queue_steps(ctx, self.robot_adapter.release())
# Clear the held object tracking
held_id = ctx.shared_state.get_held_object_id()
if held_id:
_log(f"soltar: released object id={held_id}")
ctx.shared_state.clear_held_object()
return True return True
def action_reiniciar(self, ctx: ActionContext) -> bool: def action_reiniciar(self, ctx: ActionContext) -> bool:
"""Reset: release tool, move home, clear objects.""" """Reset: release tool, move home, clear objects and held state."""
self._queue_steps(ctx, self.robot_adapter.reset_tool()) self._queue_steps(ctx, self.robot_adapter.reset_tool())
self._queue_steps(ctx, self.robot_adapter.move(ctx.home_pose)) self._queue_steps(ctx, self.robot_adapter.move(ctx.home_pose))
ctx.scene.clear_detected() ctx.scene.clear_detected()
ctx.shared_state.clear_held_object()
_log("reiniciar: cleared held object state")
return True return True
def _find_object_under_pose(self, ctx: ActionContext) -> Optional["SceneObject"]: def _find_object_under_pose(self, ctx: ActionContext) -> Optional["SceneObject"]:
"""Find the topmost object near the current pose x,y (mm).""" """Find the topmost object near the current pose x,y (mm).
Note: position_mm[2] is treated as the TOP surface of the object
(camera looks down, sees the top surface).
"""
if ctx.pose is None: if ctx.pose is None:
_log("bajar: missing pose, cannot find object under tool") _log("bajar: missing pose, cannot find object under tool")
return None return None
@@ -89,20 +183,19 @@ class LittlehandBehavior(RobotBehavior):
dist2 = dx * dx + dy * dy dist2 = dx * dx + dy * dy
if dist2 > _XY_MATCH_RADIUS_MM * _XY_MATCH_RADIUS_MM: if dist2 > _XY_MATCH_RADIUS_MM * _XY_MATCH_RADIUS_MM:
continue continue
top_surface = obj.position_mm[2] + obj.height_mm # position_mm[2] IS the top surface (camera sees top of object)
candidates.append((top_surface, obj)) top_surface_z = obj.position_mm[2]
candidates.append((top_surface_z, obj))
_log( _log(
"bajar: near id={} type={} color={} center=({:.1f},{:.1f}) " "bajar: near id={} type={} color={} center=({:.1f},{:.1f}) "
"dist_xy={:.1f} obj_z={:.1f} height={:.1f} top_z={:.1f}".format( "dist_xy={:.1f} top_z={:.1f}".format(
obj.id, obj.id,
obj.object_type, obj.object_type,
obj.color, obj.color,
obj.position_mm[0], obj.position_mm[0],
obj.position_mm[1], obj.position_mm[1],
(dist2 ** 0.5), (dist2 ** 0.5),
obj.position_mm[2], top_surface_z,
obj.height_mm,
top_surface,
) )
) )
if not candidates: if not candidates:

View File

@@ -14,7 +14,11 @@ if dspy is not None:
comando = dspy.InputField(desc="Voice command in Spanish") comando = dspy.InputField(desc="Voice command in Spanish")
accion = dspy.OutputField( accion = dspy.OutputField(
desc="Action name: subir, bajar, ir, tomar, soltar, reiniciar or error" desc=(
"Accion: subir, bajar, ir, tomar, soltar, reiniciar o error. "
"Mapea errores infantiles, parafrasis y sinonimos cercanos a la accion valida mas cercana. "
"Si la intencion es ambigua o no relacionada, devuelve error."
)
) )
objeto = dspy.OutputField( objeto = dspy.OutputField(
desc="Object name (cubo, cilindro, estrella, caja) or 'no especificado'" desc="Object name (cubo, cilindro, estrella, caja) or 'no especificado'"

View File

@@ -185,6 +185,60 @@ def _sample_point(
return np.median(np.stack(samples, axis=0), axis=0) return np.median(np.stack(samples, axis=0), axis=0)
def _estimate_object_height(
point_cloud: np.ndarray, bbox: List[int], cfg: DetectionConfig, sample_step: int = 4
) -> Optional[float]:
"""Estimate object height by sampling Z values within the bounding box.
Samples points in a grid within the bbox, finds the Z range (max - min),
which corresponds to the object height.
Args:
point_cloud: The point cloud array (H, W, channels) with XYZ in mm.
bbox: Bounding box [x1, y1, x2, y2] in pixels.
cfg: Detection config for depth validation.
sample_step: Step size for grid sampling (smaller = more samples).
Returns:
Estimated height in mm, or None if not enough valid points.
"""
x1, y1, x2, y2 = bbox
h, w, _ = point_cloud.shape
# Clamp bbox to image bounds
x1 = max(0, x1)
y1 = max(0, y1)
x2 = min(w, x2)
y2 = min(h, y2)
if x2 <= x1 or y2 <= y1:
return None
# Sample points in a grid within the bounding box
z_values = []
for y in range(y1, y2, sample_step):
for x in range(x1, x2, sample_step):
point_xyz = point_cloud[y, x, :3].astype(np.float64)
if _valid_point(point_xyz, cfg):
z_values.append(point_xyz[2])
if len(z_values) < 5:
return None
# Use percentiles to filter outliers (table surface, noise)
z_array = np.array(z_values)
z_min = np.percentile(z_array, 10) # Top of object (closer to camera = smaller Z)
z_max = np.percentile(z_array, 90) # Bottom/table level (farther = larger Z)
height = z_max - z_min
# Sanity check: height should be positive and reasonable (5mm to 200mm)
if height < 5.0 or height > 200.0:
return None
return float(height)
def _dominant_color(image: np.ndarray, bbox: List[int]) -> Tuple[int, int, int]: def _dominant_color(image: np.ndarray, bbox: List[int]) -> Tuple[int, int, int]:
x1, y1, x2, y2 = bbox x1, y1, x2, y2 = bbox
x1 = max(0, x1) x1 = max(0, x1)
@@ -482,8 +536,10 @@ def main() -> None:
area = max(1, (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])) area = max(1, (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]))
size_label = "big" if area >= cfg.size_threshold else "small" size_label = "big" if area >= cfg.size_threshold else "small"
objects.append( # Estimate object height from point cloud
{ height_mm = _estimate_object_height(latest_point_cloud, bbox, cfg)
obj_data = {
"object_type": results.names[int(r.cls.item())], "object_type": results.names[int(r.cls.item())],
"confidence": float(r.conf.item()), "confidence": float(r.conf.item()),
"color": color_name, "color": color_name,
@@ -497,7 +553,10 @@ def main() -> None:
], ],
"timestamp_ns": time.time_ns(), "timestamp_ns": time.time_ns(),
} }
) if height_mm is not None:
obj_data["height_mm"] = height_mm
objects.append(obj_data)
payload = json.dumps({"objects": objects, "timestamp_ns": time.time_ns()}) payload = json.dumps({"objects": objects, "timestamp_ns": time.time_ns()})
node.send_output( node.send_output(