Refactor voice control core and robot behavior

This commit is contained in:
cristhian aguilera
2026-02-02 12:29:59 -03:00
parent b9798a2f46
commit 695d309816
36 changed files with 3436 additions and 1065 deletions

View File

@@ -0,0 +1 @@
"""Tests for dora_voice_control."""

View File

@@ -0,0 +1,624 @@
"""Tests for scene state with spatial relationships."""
import sys
from pathlib import Path
import pytest
# Add parent directory to path for direct import from core.scene
sys.path.insert(0, str(Path(__file__).parent.parent / "dora_voice_control" / "core"))
sys.path.insert(0, str(Path(__file__).parent.parent))
from scene import SceneState, SceneObject
class TestSceneObjectBasics:
"""Test SceneObject dataclass."""
def test_create_scene_object(self):
"""Test basic SceneObject creation."""
obj = SceneObject(
id="cube_red_1",
object_type="cube",
color="red",
position_mm=[100.0, 200.0, 50.0],
height_mm=30.0,
)
assert obj.id == "cube_red_1"
assert obj.object_type == "cube"
assert obj.color == "red"
assert obj.position_mm == [100.0, 200.0, 50.0]
assert obj.height_mm == 30.0
assert obj.source == "detected" # default
def test_center_property(self):
"""Test center property is alias for position_mm."""
obj = SceneObject(
id="obj1",
object_type="cube",
color="blue",
position_mm=[10.0, 20.0, 30.0],
)
assert obj.center == obj.position_mm
def test_to_dict(self):
"""Test to_dict serialization."""
obj = SceneObject(
id="test_1",
object_type="cylinder",
color="yellow",
size="big",
position_mm=[1.0, 2.0, 3.0],
height_mm=40.0,
source="config",
)
d = obj.to_dict()
assert d["id"] == "test_1"
assert d["object_type"] == "cylinder"
assert d["color"] == "yellow"
assert d["size"] == "big"
assert d["position_mm"] == [1.0, 2.0, 3.0]
assert d["height_mm"] == 40.0
assert d["source"] == "config"
class TestSceneStateBasics:
"""Test basic SceneState operations."""
def test_add_and_get(self):
"""Test adding and retrieving objects."""
scene = SceneState()
obj = SceneObject(
id="cube_red_1",
object_type="cube",
color="red",
position_mm=[100.0, 200.0, 50.0],
)
assert scene.add(obj) is True
retrieved = scene.get("cube_red_1")
assert retrieved is not None
assert retrieved.id == "cube_red_1"
def test_add_duplicate_fails(self):
"""Test adding duplicate ID fails."""
scene = SceneState()
obj1 = SceneObject(id="obj1", object_type="cube", color="red")
obj2 = SceneObject(id="obj1", object_type="cylinder", color="blue")
assert scene.add(obj1) is True
assert scene.add(obj2) is False
def test_delete(self):
"""Test deleting objects."""
scene = SceneState()
obj = SceneObject(id="to_delete", object_type="cube", color="red")
scene.add(obj)
assert scene.get("to_delete") is not None
assert scene.delete("to_delete") is True
assert scene.get("to_delete") is None
def test_delete_nonexistent(self):
"""Test deleting non-existent object returns False."""
scene = SceneState()
assert scene.delete("nonexistent") is False
def test_update(self):
"""Test updating object properties."""
scene = SceneState()
obj = SceneObject(
id="to_update",
object_type="cube",
color="red",
position_mm=[0.0, 0.0, 0.0],
)
scene.add(obj)
assert scene.update("to_update", color="blue", position_mm=[1.0, 2.0, 3.0])
updated = scene.get("to_update")
assert updated.color == "blue"
assert updated.position_mm == [1.0, 2.0, 3.0]
def test_get_all(self):
"""Test getting all objects."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red"))
scene.add(SceneObject(id="b", object_type="cube", color="blue"))
all_objs = scene.get_all()
assert len(all_objs) == 2
ids = {o.id for o in all_objs}
assert ids == {"a", "b"}
def test_count(self):
"""Test object count."""
scene = SceneState()
assert scene.count() == 0
scene.add(SceneObject(id="a", object_type="cube", color="red"))
assert scene.count() == 1
scene.add(SceneObject(id="b", object_type="cube", color="blue"))
assert scene.count() == 2
def test_clear(self):
"""Test clearing all objects."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red"))
scene.add(SceneObject(id="b", object_type="cube", color="blue"))
scene.clear()
assert scene.count() == 0
class TestSceneStateQuery:
"""Test SceneState query operations."""
def test_query_by_type(self):
"""Test querying by object type."""
scene = SceneState()
scene.add(SceneObject(id="cube1", object_type="cube", color="red"))
scene.add(SceneObject(id="cyl1", object_type="cylinder", color="red"))
scene.add(SceneObject(id="cube2", object_type="cube", color="blue"))
cubes = scene.query(object_type="cube")
assert len(cubes) == 2
assert all(o.object_type == "cube" for o in cubes)
def test_query_by_color(self):
"""Test querying by color."""
scene = SceneState()
scene.add(SceneObject(id="r1", object_type="cube", color="red"))
scene.add(SceneObject(id="r2", object_type="cylinder", color="red"))
scene.add(SceneObject(id="b1", object_type="cube", color="blue"))
reds = scene.query(color="red")
assert len(reds) == 2
assert all(o.color == "red" for o in reds)
def test_query_by_size(self):
"""Test querying by size."""
scene = SceneState()
scene.add(SceneObject(id="big1", object_type="cube", color="red", size="big"))
scene.add(SceneObject(id="small1", object_type="cube", color="blue", size="small"))
big = scene.query(size="big")
assert len(big) == 1
assert big[0].id == "big1"
def test_query_by_source(self):
"""Test querying by source."""
scene = SceneState()
scene.add(SceneObject(id="d1", object_type="cube", color="red", source="detected"))
scene.add(SceneObject(id="c1", object_type="box", color="blue", source="config"))
detected = scene.query(source="detected")
assert len(detected) == 1
assert detected[0].id == "d1"
def test_query_multiple_criteria(self):
"""Test querying with multiple criteria."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red", size="big"))
scene.add(SceneObject(id="b", object_type="cube", color="red", size="small"))
scene.add(SceneObject(id="c", object_type="cube", color="blue", size="big"))
results = scene.query(object_type="cube", color="red", size="big")
assert len(results) == 1
assert results[0].id == "a"
def test_query_no_matches(self):
"""Test query with no matches returns empty list."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red"))
results = scene.query(color="purple")
assert results == []
class TestSceneStateRelationships:
"""Test spatial relationship methods."""
def test_set_on_top_of(self):
"""Test manually setting stacking relationship."""
scene = SceneState()
scene.add(SceneObject(id="bottom", object_type="cube", color="red"))
scene.add(SceneObject(id="top", object_type="cube", color="blue"))
assert scene.set_on_top_of("top", "bottom") is True
top = scene.get("top")
assert top.on_top_of == "bottom"
def test_set_on_top_of_invalid(self):
"""Test setting relationship with invalid IDs."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red"))
# Non-existent top object
assert scene.set_on_top_of("nonexistent", "a") is False
# Non-existent bottom object
assert scene.set_on_top_of("a", "nonexistent") is False
def test_get_object_below(self):
"""Test getting object below."""
scene = SceneState()
scene.add(SceneObject(id="bottom", object_type="cube", color="red"))
scene.add(SceneObject(id="top", object_type="cube", color="blue", on_top_of="bottom"))
below = scene.get_object_below("top")
assert below is not None
assert below.id == "bottom"
def test_get_objects_above(self):
"""Test getting objects stacked on top."""
scene = SceneState()
scene.add(SceneObject(id="base", object_type="cube", color="red"))
scene.add(SceneObject(id="middle", object_type="cube", color="blue", on_top_of="base"))
scene.add(SceneObject(id="side", object_type="cube", color="yellow", on_top_of="base"))
above = scene.get_objects_above("base")
assert len(above) == 2
ids = {o.id for o in above}
assert ids == {"middle", "side"}
def test_effective_height_no_stack(self):
"""Test effective height for single object."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red", height_mm=30.0))
assert scene.effective_height("a") == 30.0
def test_effective_height_stacked(self):
"""Test effective height for stacked objects."""
scene = SceneState()
scene.add(SceneObject(id="bottom", object_type="cube", color="red", height_mm=30.0))
scene.add(SceneObject(id="top", object_type="cube", color="blue", height_mm=25.0, on_top_of="bottom"))
assert scene.effective_height("bottom") == 30.0
assert scene.effective_height("top") == 55.0 # 30 + 25
def test_get_stack(self):
"""Test getting full stack."""
scene = SceneState()
scene.add(SceneObject(id="bottom", object_type="cube", color="red"))
scene.add(SceneObject(id="middle", object_type="cube", color="blue", on_top_of="bottom"))
scene.add(SceneObject(id="top", object_type="cube", color="yellow", on_top_of="middle"))
stack = scene.get_stack("middle")
assert len(stack) == 3
assert stack[0].id == "bottom"
assert stack[1].id == "middle"
assert stack[2].id == "top"
def test_delete_clears_relationships(self):
"""Test that deleting object clears relationships pointing to it."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red"))
scene.add(SceneObject(id="b", object_type="cube", color="blue", on_top_of="a"))
assert scene.get("b").on_top_of == "a"
scene.delete("a")
obj_b = scene.get("b")
assert obj_b.on_top_of is None # Relationship cleared
class TestSceneStateStackingInference:
"""Test automatic stacking inference from positions."""
def test_infer_stacking_simple(self):
"""Test basic stacking inference."""
scene = SceneState()
# Bottom cube at z=30
scene.add(SceneObject(
id="bottom",
object_type="cube",
color="red",
position_mm=[0.0, 0.0, 30.0],
height_mm=30.0,
bbox_mm=[-15.0, -15.0, 15.0, 15.0],
))
# Top cube at z=60 (30 + 30 = 60, sitting on bottom)
scene.add(SceneObject(
id="top",
object_type="cube",
color="blue",
position_mm=[0.0, 0.0, 60.0],
height_mm=30.0,
bbox_mm=[-15.0, -15.0, 15.0, 15.0],
))
scene.infer_relationships()
top = scene.get("top")
assert top.on_top_of == "bottom"
def test_infer_stacking_no_overlap(self):
"""Test that objects without horizontal overlap are not stacked."""
scene = SceneState()
# Object at x=0
scene.add(SceneObject(
id="a",
object_type="cube",
color="red",
position_mm=[0.0, 0.0, 30.0],
height_mm=30.0,
bbox_mm=[-15.0, -15.0, 15.0, 15.0],
))
# Object at x=100 (no overlap), same z would indicate stack
scene.add(SceneObject(
id="b",
object_type="cube",
color="blue",
position_mm=[100.0, 0.0, 60.0],
height_mm=30.0,
bbox_mm=[85.0, -15.0, 115.0, 15.0],
))
scene.infer_relationships()
b = scene.get("b")
assert b.on_top_of is None # No stacking - no overlap
def test_infer_stacking_via_replace_detected(self):
"""Test stacking is inferred after replace_detected."""
scene = SceneState()
scene.replace_detected([
{
"object_type": "cube",
"color": "red",
"position_mm": [0.0, 0.0, 30.0],
"height_mm": 30.0,
"bbox_mm": [-15.0, -15.0, 15.0, 15.0],
},
{
"object_type": "cube",
"color": "blue",
"position_mm": [0.0, 0.0, 60.0], # z = 30 + 30
"height_mm": 30.0,
"bbox_mm": [-15.0, -15.0, 15.0, 15.0],
},
])
# Should have auto-inferred the stacking
blues = scene.query(color="blue")
assert len(blues) == 1
assert blues[0].on_top_of is not None
class TestSceneStateDetectorIntegration:
"""Test detector integration methods."""
def test_replace_detected_basic(self):
"""Test replacing detected objects."""
scene = SceneState()
scene.replace_detected([
{"object_type": "cube", "color": "red", "position_mm": [1.0, 2.0, 3.0]},
{"object_type": "cylinder", "color": "blue"},
])
assert scene.count() == 2
all_objs = scene.get_all()
assert all(o.source == "detected" for o in all_objs)
def test_replace_detected_keeps_config(self):
"""Test that replace_detected keeps config objects."""
scene = SceneState()
# Add a config object
scene.add(SceneObject(
id="config_box_1",
object_type="box",
color="yellow",
source="config",
))
# Replace detected
scene.replace_detected([
{"object_type": "cube", "color": "red"},
])
# Config object should still be there
assert scene.get("config_box_1") is not None
config_objs = scene.query(source="config")
assert len(config_objs) == 1
# Detected object should be added
detected = scene.query(source="detected")
assert len(detected) == 1
def test_replace_detected_replaces_old_detected(self):
"""Test that replace_detected replaces previous detected objects."""
scene = SceneState()
# First detection
scene.replace_detected([
{"object_type": "cube", "color": "red"},
{"object_type": "cube", "color": "blue"},
])
assert len(scene.query(source="detected")) == 2
# Second detection
scene.replace_detected([
{"object_type": "cylinder", "color": "green"},
])
detected = scene.query(source="detected")
assert len(detected) == 1
assert detected[0].object_type == "cylinder"
def test_replace_detected_generates_ids(self):
"""Test that replace_detected generates unique IDs."""
scene = SceneState()
scene.replace_detected([
{"object_type": "cube", "color": "red"},
{"object_type": "cube", "color": "red"},
])
all_objs = scene.get_all()
ids = [o.id for o in all_objs]
assert len(ids) == len(set(ids)) # All IDs should be unique
def test_clear_detected(self):
"""Test clearing only detected objects."""
scene = SceneState()
scene.add(SceneObject(id="config1", object_type="box", color="green", source="config"))
scene.replace_detected([
{"object_type": "cube", "color": "red"},
])
assert scene.count() == 2
scene.clear_detected()
assert scene.count() == 1
assert scene.get("config1") is not None
def test_to_list(self):
"""Test serializing all objects to list."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red"))
scene.add(SceneObject(id="b", object_type="cube", color="blue"))
lst = scene.to_list()
assert len(lst) == 2
assert all(isinstance(d, dict) for d in lst)
ids = {d["id"] for d in lst}
assert ids == {"a", "b"}
class TestSceneStateThreadSafety:
"""Test thread safety of SceneState."""
def test_concurrent_add_and_query(self):
"""Test concurrent add and query operations."""
import threading
scene = SceneState()
errors = []
def adder():
try:
for i in range(100):
scene.add(SceneObject(
id=f"obj_{threading.current_thread().name}_{i}",
object_type="cube",
color="red",
))
except Exception as e:
errors.append(e)
def querier():
try:
for _ in range(100):
scene.query(object_type="cube")
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=adder, name="adder1"),
threading.Thread(target=adder, name="adder2"),
threading.Thread(target=querier, name="querier"),
]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Thread errors: {errors}"
class TestFindByCriteria:
"""Test find_by_criteria method (voice command matching)."""
def test_find_by_type_and_color(self):
"""Test finding object by type and color."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red"))
scene.add(SceneObject(id="b", object_type="cube", color="blue"))
result = scene.find_by_criteria("cube", "red", "no especificado")
assert result is not None
assert result.id == "a"
def test_find_with_class_map(self):
"""Test finding with class_map translation."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red"))
# Spanish to English mapping
class_map = {"cubo": "cube", "rojo": "red"}
result = scene.find_by_criteria("cubo", "rojo", "no especificado", class_map)
assert result is not None
assert result.id == "a"
def test_find_unspecified_returns_none(self):
"""Test that 'no especificado' object type returns None."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red"))
result = scene.find_by_criteria("no especificado", "red", "no especificado")
assert result is None
def test_find_no_match(self):
"""Test finding with no matching object."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red"))
result = scene.find_by_criteria("cylinder", "blue", "no especificado")
assert result is None
def test_find_with_size(self):
"""Test finding with size criteria."""
scene = SceneState()
scene.add(SceneObject(id="a", object_type="cube", color="red", size="big"))
scene.add(SceneObject(id="b", object_type="cube", color="red", size="small"))
result = scene.find_by_criteria("cube", "red", "small")
assert result is not None
assert result.id == "b"
class TestLoadFromConfig:
"""Test load_from_config method."""
def test_load_nonexistent_file(self):
"""Test loading from nonexistent file returns 0."""
scene = SceneState()
count = scene.load_from_config("/nonexistent/path.toml")
assert count == 0
assert scene.count() == 0
def test_load_from_config_with_objects(self, tmp_path):
"""Test loading objects from config file."""
config_file = tmp_path / "config.toml"
config_file.write_text("""
[object_parameters]
normal_height = 100.0
[[static_objects]]
type = "box"
color = "blue"
position = [150.0, -200.0]
size = "big"
[[static_objects]]
type = "box"
color = "red"
position = [250.0, -200.0]
""")
scene = SceneState()
count = scene.load_from_config(str(config_file))
assert count == 2
assert scene.count() == 2
blues = scene.query(color="blue")
assert len(blues) == 1
assert blues[0].object_type == "box"
assert blues[0].position_mm[2] == 100.0 # normal_height
assert blues[0].source == "config"
def test_load_empty_config(self, tmp_path):
"""Test loading from empty config file."""
config_file = tmp_path / "empty.toml"
config_file.write_text("")
scene = SceneState()
count = scene.load_from_config(str(config_file))
assert count == 0