Add voice control, working but need more work

This commit is contained in:
cristhian aguilera
2026-01-31 11:41:50 -03:00
parent 380c466170
commit b9798a2f46
21 changed files with 3101 additions and 0 deletions

178
dora_iobridge/README.md Normal file
View File

@@ -0,0 +1,178 @@
# Dora IOBridge Node
A WebSocket server that bridges web clients with the Dora dataflow for real-time voice commands and scene updates.
## Inputs/Outputs
| Input | Type | Description |
|----------------|--------|---------------------------------------|
| `voice_out` | JSON | Response from voice control node |
| `scene_update` | JSON | Scene objects from voice control |
| Output | Type | Description |
|----------------|--------|---------------------------------------|
| `voice_in` | string | Voice commands forwarded to Dora |
## Environment Variables
```bash
VOICE_HOST=0.0.0.0 # Bind address
VOICE_PORT=8765 # Listen port
```
## Installation
```bash
cd dora_iobridge
pip install -e .
```
## Testing
### Test with WebSocket (wscat)
```bash
# Install wscat
npm install -g wscat
# Connect to the server
wscat -c ws://localhost:8765
```
### Test with curl (websocat)
```bash
# Install websocat
# Ubuntu: sudo apt install websocat
# macOS: brew install websocat
# Send a ping
echo '{"type": "ping"}' | websocat ws://localhost:8765
# Response: {"type": "pong"}
# Send a voice command
echo '{"type": "command", "text": "sube"}' | websocat ws://localhost:8765
# Request scene refresh
echo '{"type": "scene_refresh"}' | websocat ws://localhost:8765
```
### Test with Python
```python
import asyncio
import websockets
import json
async def test_iobridge():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as ws:
# Test ping
await ws.send(json.dumps({"type": "ping"}))
response = await ws.recv()
print(f"Ping response: {response}")
# Send command
await ws.send(json.dumps({
"type": "command",
"text": "agarra el cubo rojo"
}))
# Listen for responses
async for message in ws:
data = json.loads(message)
print(f"Received: {data}")
asyncio.run(test_iobridge())
```
### Test with curl (HTTP upgrade not supported directly)
Since WebSocket requires an upgrade handshake, use this shell script:
```bash
#!/bin/bash
# test_iobridge.sh
# Using websocat for interactive testing
websocat ws://localhost:8765 <<EOF
{"type": "ping"}
{"type": "command", "text": "sube"}
{"type": "scene_refresh"}
EOF
```
## WebSocket Message Types
### Client -> Server
**Command (voice input)**
```json
{"type": "command", "text": "agarra el cubo rojo"}
```
**Ping (health check)**
```json
{"type": "ping"}
```
Response: `{"type": "pong"}`
**Scene Refresh**
```json
{"type": "scene_refresh"}
```
### Server -> Client (Broadcasts)
**Command Response**
```json
{
"type": "response",
"text": "Ok, voy a tomar",
"status": "ok"
}
```
**Scene Update**
```json
{
"type": "scene_updated",
"objects": [
{
"object_type": "cubo",
"color": "rojo",
"size": "big",
"position_mm": [150.0, 200.0, 280.0],
"source": "detection"
}
]
}
```
## Dora Dataflow Configuration
```yaml
nodes:
- id: iobridge
build: pip install -e ./dora_iobridge
path: dora_iobridge
inputs:
voice_out: voice_control/voice_out
scene_update: voice_control/scene_update
outputs:
- voice_in
env:
VOICE_HOST: "0.0.0.0"
VOICE_PORT: "8765"
```
```bash
dora up
dora start dataflow.yml
```
## Dependencies
- dora-rs >= 0.3.9
- pyarrow >= 12.0.0
- websockets >= 12.0

View File

@@ -0,0 +1 @@
"""Dora IO bridge node package."""

View File

@@ -0,0 +1,145 @@
"""Dora node bridging WebSocket IO to Dora topics."""
from __future__ import annotations
import asyncio
import json
import os
import threading
import time
from typing import Any, Dict, Optional, Set
import pyarrow as pa
from dora import Node
from websockets.server import serve, WebSocketServerProtocol
class IoBridgeServer:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.clients: Set[WebSocketServerProtocol] = set()
self.command_handler = None
self.scene_refresh_handler = None
async def handler(self, websocket: WebSocketServerProtocol):
self.clients.add(websocket)
try:
async for message in websocket:
try:
data = json.loads(message)
except json.JSONDecodeError:
await websocket.send(
json.dumps({"type": "error", "text": "Invalid JSON message"})
)
continue
response = await self._route_message(data, websocket)
if response:
await websocket.send(json.dumps(response))
finally:
self.clients.discard(websocket)
async def _route_message(
self, data: Dict[str, Any], websocket: WebSocketServerProtocol
) -> Optional[Dict[str, Any]]:
msg_type = data.get("type")
if msg_type == "command":
text = data.get("text", "")
if self.command_handler:
await self.command_handler(text)
return None
return {"type": "error", "text": "No command handler registered"}
if msg_type == "ping":
return {"type": "pong"}
if msg_type == "scene_refresh":
if self.scene_refresh_handler:
objects = await self.scene_refresh_handler()
return {"type": "scene_updated", "objects": objects}
return {"type": "error", "text": "No scene handler registered"}
return {"type": "error", "text": f"Unknown message type: {msg_type}"}
async def broadcast(self, message: Dict[str, Any]):
if not self.clients:
return
payload = json.dumps(message)
await asyncio.gather(
*[client.send(payload) for client in self.clients], return_exceptions=True
)
async def send(self, message: Dict[str, Any], websocket: WebSocketServerProtocol):
await websocket.send(json.dumps(message))
async def start(self):
async with serve(self.handler, self.host, self.port):
await asyncio.Future()
def main() -> None:
host = os.getenv("VOICE_HOST", "0.0.0.0")
port = int(os.getenv("VOICE_PORT", "8765"))
input_topic = os.getenv("VOICE_IN_OUTPUT", "voice_in")
response_input = os.getenv("VOICE_OUT_INPUT", "voice_out")
scene_input = os.getenv("SCENE_INPUT", "scene_update")
node = Node()
server = IoBridgeServer(host, port)
loop = asyncio.new_event_loop()
def push_command(text: str) -> None:
node.send_output(
input_topic,
pa.array([text]),
metadata={"encoding": "utf-8", "timestamp_ns": time.time_ns()},
)
async def handle_scene_refresh():
return []
def command_handler(text: str):
push_command(text)
return None
server.command_handler = command_handler
server.scene_refresh_handler = handle_scene_refresh
def run_server():
asyncio.set_event_loop(loop)
loop.run_until_complete(server.start())
threading.Thread(target=run_server, daemon=True).start()
for event in node:
if event["type"] != "INPUT":
continue
if event["id"] == response_input:
raw = event["value"][0].as_py() if len(event["value"]) else ""
if not raw:
continue
try:
payload = json.loads(raw)
message = {
"type": "response",
"text": payload.get("text", ""),
"status": payload.get("status", "ok"),
}
except Exception:
message = {"type": "response", "text": raw, "status": "ok"}
asyncio.run_coroutine_threadsafe(server.broadcast(message), loop)
continue
if event["id"] == scene_input:
raw = event["value"][0].as_py() if len(event["value"]) else ""
if not raw:
continue
try:
payload = json.loads(raw)
objects = payload.get("objects", [])
message = {"type": "scene_updated", "objects": objects}
except Exception:
message = {"type": "scene_updated", "objects": []}
asyncio.run_coroutine_threadsafe(server.broadcast(message), loop)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,17 @@
[project]
name = "dora-iobridge"
version = "0.1.0"
license = { file = "MIT" }
authors = [{ name = "Dora" }]
description = "Dora node bridging WebSocket IO to Dora topics"
requires-python = ">=3.8"
dependencies = [
"dora-rs >= 0.3.9",
"pyarrow >= 12.0.0",
"websockets >= 12.0",
]
[project.scripts]
dora-iobridge = "dora_iobridge.main:main"