personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add raw ArUco marker detection and preserve in frame output

Refactor ArUco detection to expose raw marker data (IDs and corners)
alongside the computed bounding polygon. Frame JSONL output now includes
an "aruco" field with detected markers and masked status.

- Add detect_markers() returning raw {markers, polygon} dict
- Remove detect_convey_region() wrapper (unused after refactor)
- Thread aruco metadata through VideoProcessor to JSONL output
- Update mask_convey_region() docstring

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+126 -89
+25 -18
observe/aruco.py
··· 41 41 return _detector 42 42 43 43 44 - def detect_convey_region(image: Image.Image) -> Optional[list[tuple[float, float]]]: 44 + def detect_markers(image: Image.Image) -> Optional[dict]: 45 45 """ 46 - Detect Convey UI region by finding all 4 corner fiducial tags. 46 + Detect ArUco markers in an image and return raw detection data. 47 47 48 48 Parameters 49 49 ---------- ··· 52 52 53 53 Returns 54 54 ------- 55 - Optional[list[tuple[float, float]]] 56 - Polygon coordinates [(x,y), ...] in order [TL, TR, BR, BL] if all 4 57 - corner tags are detected, None otherwise. 55 + Optional[dict] 56 + Detection result with keys: 57 + - markers: list of {id: int, corners: [[x,y], ...]} for each detected marker 58 + - polygon: [[x,y], ...] bounding polygon if all 4 corner tags found, else None 59 + Returns None if no markers detected. 58 60 """ 59 61 # Convert PIL to numpy array 60 62 img_array = np.array(image) ··· 72 74 if ids is None: 73 75 return None 74 76 75 - # Build map of detected tag ID -> corner points 77 + # Build raw markers list 78 + markers = [] 76 79 id_to_corners = {} 77 80 for tag_id, pts in zip(ids.flatten().tolist(), corners): 78 81 id_to_corners[tag_id] = pts 82 + # Convert corners to list of [x, y] pairs 83 + corner_list = pts.reshape(4, 2).tolist() 84 + markers.append({"id": tag_id, "corners": corner_list}) 79 85 80 - # Check if all 4 corner tags are present 81 - if not CORNER_TAG_IDS.issubset(id_to_corners.keys()): 82 - return None 86 + result: dict = {"markers": markers, "polygon": None} 83 87 84 - # Extract outer corners from each tag to form the bounding polygon 85 - # ArUco corner order within each marker: [TL, TR, BR, BL] 86 - tl = id_to_corners[6].reshape(4, 2)[0] # TL tag, TL corner 87 - tr = id_to_corners[7].reshape(4, 2)[1] # TR tag, TR corner 88 - br = id_to_corners[2].reshape(4, 2)[2] # BR tag, BR corner 89 - bl = id_to_corners[4].reshape(4, 2)[3] # BL tag, BL corner 88 + # Check if all 4 corner tags are present for bounding polygon 89 + if CORNER_TAG_IDS.issubset(id_to_corners.keys()): 90 + # Extract outer corners from each tag to form the bounding polygon 91 + # ArUco corner order within each marker: [TL, TR, BR, BL] 92 + tl = id_to_corners[6].reshape(4, 2)[0] # TL tag, TL corner 93 + tr = id_to_corners[7].reshape(4, 2)[1] # TR tag, TR corner 94 + br = id_to_corners[2].reshape(4, 2)[2] # BR tag, BR corner 95 + bl = id_to_corners[4].reshape(4, 2)[3] # BL tag, BL corner 96 + result["polygon"] = [tl.tolist(), tr.tolist(), br.tolist(), bl.tolist()] 90 97 91 - return [tuple(tl), tuple(tr), tuple(br), tuple(bl)] 98 + return result 92 99 93 100 94 101 def mask_convey_region(image: Image.Image, polygon: list[tuple[float, float]]) -> None: ··· 102 109 image : Image.Image 103 110 PIL Image to mask (modified in place) 104 111 polygon : list[tuple[float, float]] 105 - Polygon coordinates from detect_convey_region() 112 + Polygon coordinates [(x,y), ...] defining the region to mask 106 113 """ 107 114 draw = ImageDraw.Draw(image) 108 115 draw.polygon(polygon, fill=(0, 0, 0)) ··· 136 143 137 144 __all__ = [ 138 145 "CORNER_TAG_IDS", 139 - "detect_convey_region", 146 + "detect_markers", 140 147 "mask_convey_region", 141 148 "polygon_area", 142 149 ]
+34 -23
observe/describe.py
··· 28 28 import av 29 29 from PIL import Image, ImageChops, ImageStat 30 30 31 - from observe.aruco import detect_convey_region, mask_convey_region, polygon_area 31 + from observe.aruco import detect_markers, mask_convey_region, polygon_area 32 32 from observe.utils import get_segment_key 33 33 from think.callosum import callosum_send 34 34 from think.utils import setup_cli ··· 211 211 pil_img = Image.fromarray(arr_rgb) 212 212 del arr_rgb 213 213 214 - # Detect and mask Convey UI region (fiducial corner tags) 215 - convey_polygon = detect_convey_region(pil_img) 216 - if convey_polygon is not None: 217 - # Check if Convey covers most of the frame 218 - mask_area = polygon_area(convey_polygon) 214 + # Detect ArUco markers (fiducial corner tags) 215 + aruco_result = detect_markers(pil_img) 216 + aruco_masked = False 217 + if aruco_result is not None and aruco_result["polygon"] is not None: 218 + # All 4 corner tags detected - check coverage 219 + polygon = [tuple(pt) for pt in aruco_result["polygon"]] 220 + mask_area = polygon_area(polygon) 219 221 frame_area = pil_img.width * pil_img.height 220 222 if mask_area / frame_area > self.MASK_SKIP_THRESHOLD: 221 223 # Skip frame entirely - Convey UI dominates ··· 226 228 ) 227 229 continue 228 230 # Mask the Convey region with black 229 - mask_convey_region(pil_img, convey_polygon) 231 + mask_convey_region(pil_img, polygon) 232 + aruco_masked = True 230 233 231 234 # Downsample for comparison 232 235 current_small = self._downsample(pil_img) 236 + 237 + # Build frame data dict 238 + frame_data: dict = { 239 + "frame_id": frame_count, 240 + "timestamp": timestamp, 241 + } 242 + # Include aruco detection result if markers were found 243 + if aruco_result is not None: 244 + frame_data["aruco"] = { 245 + "markers": aruco_result["markers"], 246 + "masked": aruco_masked, 247 + } 233 248 234 249 # First frame: always qualify (RMS vs nothing = 100% different) 235 250 if last_qualified_small is None: 236 - frame_bytes = self._frame_to_bytes(pil_img) 251 + frame_data["frame_bytes"] = self._frame_to_bytes(pil_img) 237 252 pil_img.close() 238 253 239 - self.qualified_frames.append( 240 - { 241 - "frame_id": frame_count, 242 - "timestamp": timestamp, 243 - "frame_bytes": frame_bytes, 244 - } 245 - ) 254 + self.qualified_frames.append(frame_data) 246 255 247 256 last_qualified_small = current_small 248 257 logger.debug(f"First frame at {timestamp:.2f}s") ··· 258 267 continue 259 268 260 269 # Qualified - convert full frame to bytes 261 - frame_bytes = self._frame_to_bytes(pil_img) 270 + frame_data["frame_bytes"] = self._frame_to_bytes(pil_img) 262 271 pil_img.close() 263 272 264 - self.qualified_frames.append( 265 - { 266 - "frame_id": frame_count, 267 - "timestamp": timestamp, 268 - "frame_bytes": frame_bytes, 269 - } 270 - ) 273 + self.qualified_frames.append(frame_data) 271 274 272 275 # Update cached downsampled frame 273 276 last_qualified_small.close() ··· 426 429 req.timestamp = frame_data["timestamp"] 427 430 req.retry_count = 0 428 431 req.frame_bytes = frame_data["frame_bytes"] # Store bytes for reuse 432 + req.aruco = frame_data.get("aruco") # ArUco detection result (may be None) 429 433 req.request_type = RequestType.DESCRIBE 430 434 req.json_analysis = None # Will store the JSON analysis result 431 435 req.category_results = {} # Will store category-specific results ··· 563 567 follow_req.frame_id = req.frame_id 564 568 follow_req.timestamp = req.timestamp 565 569 follow_req.frame_bytes = req.frame_bytes 570 + follow_req.aruco = req.aruco 566 571 follow_req.json_analysis = req.json_analysis 567 572 follow_req.category_results = req.category_results 568 573 follow_req.requests = req.requests ··· 608 613 "analysis": req.json_analysis, 609 614 "pending": req.pending_follow_ups, 610 615 } 616 + if req.aruco: 617 + frame_results[req.frame_id]["aruco"] = req.aruco 611 618 if has_error: 612 619 frame_results[req.frame_id]["error"] = error_msg 613 620 ··· 658 665 "timestamp": req.timestamp, 659 666 "requests": req.requests, 660 667 } 668 + 669 + # Add aruco detection result if present 670 + if req.aruco: 671 + result["aruco"] = req.aruco 661 672 662 673 # Add error at top level if any request failed 663 674 if has_error:
+67 -48
tests/test_aruco.py
··· 9 9 10 10 from observe.aruco import ( 11 11 CORNER_TAG_IDS, 12 - detect_convey_region, 12 + detect_markers, 13 13 mask_convey_region, 14 14 polygon_area, 15 15 ) ··· 41 41 assert polygon_area([(0, 0), (1, 1)]) == 0.0 42 42 43 43 44 - def test_detect_convey_region_no_markers(): 45 - """Test detection returns None when no markers are present.""" 46 - # Plain white image - no markers 44 + def test_detect_markers_no_markers(): 45 + """Test detect_markers returns None when no markers are present.""" 47 46 img = Image.new("RGB", (640, 480), color="white") 48 - result = detect_convey_region(img) 47 + result = detect_markers(img) 49 48 assert result is None 50 49 51 50 52 - def test_detect_convey_region_grayscale(): 53 - """Test detection works with grayscale input.""" 54 - # Grayscale image - should handle conversion 51 + def test_detect_markers_grayscale(): 52 + """Test detect_markers works with grayscale input.""" 55 53 img = Image.new("L", (640, 480), color=128) 56 - result = detect_convey_region(img) 54 + result = detect_markers(img) 57 55 assert result is None # No markers, but shouldn't crash 58 56 59 57 ··· 89 87 assert img.getpixel((95, 5)) == (255, 255, 255) 90 88 91 89 92 - def test_detect_convey_region_with_real_markers(): 93 - """Test detection with actual ArUco markers rendered into image.""" 90 + def test_detect_markers_with_all_corners(): 91 + """Test detect_markers returns full result with all 4 corner markers.""" 94 92 # Create a test image 95 93 img_array = np.ones((480, 640, 3), dtype=np.uint8) * 255 96 94 97 95 # Generate and place the 4 corner markers 98 96 dictionary = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50) 99 97 marker_size = 50 98 + pad = 20 100 99 101 - # Generate markers 102 - markers = {} 100 + # Generate and place markers 103 101 for tag_id in [6, 7, 4, 2]: 104 102 marker = cv2.aruco.generateImageMarker(dictionary, tag_id, marker_size) 105 - # Convert to 3-channel 106 - markers[tag_id] = cv2.cvtColor(marker, cv2.COLOR_GRAY2RGB) 103 + marker_rgb = cv2.cvtColor(marker, cv2.COLOR_GRAY2RGB) 107 104 108 - # Place markers at corners (with some padding) 109 - pad = 20 110 - # TL - tag 6 111 - img_array[pad : pad + marker_size, pad : pad + marker_size] = markers[6] 112 - # TR - tag 7 113 - img_array[pad : pad + marker_size, 640 - pad - marker_size : 640 - pad] = markers[7] 114 - # BL - tag 4 115 - img_array[480 - pad - marker_size : 480 - pad, pad : pad + marker_size] = markers[4] 116 - # BR - tag 2 117 - img_array[ 118 - 480 - pad - marker_size : 480 - pad, 640 - pad - marker_size : 640 - pad 119 - ] = markers[2] 105 + if tag_id == 6: # TL 106 + img_array[pad : pad + marker_size, pad : pad + marker_size] = marker_rgb 107 + elif tag_id == 7: # TR 108 + img_array[pad : pad + marker_size, 640 - pad - marker_size : 640 - pad] = ( 109 + marker_rgb 110 + ) 111 + elif tag_id == 4: # BL 112 + img_array[480 - pad - marker_size : 480 - pad, pad : pad + marker_size] = ( 113 + marker_rgb 114 + ) 115 + elif tag_id == 2: # BR 116 + img_array[ 117 + 480 - pad - marker_size : 480 - pad, 118 + 640 - pad - marker_size : 640 - pad, 119 + ] = marker_rgb 120 120 121 - # Convert to PIL 122 121 pil_img = Image.fromarray(img_array) 123 122 124 - # Detect 125 - result = detect_convey_region(pil_img) 123 + result = detect_markers(pil_img) 126 124 127 - # Should find all 4 markers and return polygon 125 + # Should return dict with markers and polygon 128 126 assert result is not None 129 - assert len(result) == 4 127 + assert "markers" in result 128 + assert "polygon" in result 129 + 130 + # Should have 4 markers 131 + assert len(result["markers"]) == 4 132 + 133 + # Each marker should have id and corners 134 + marker_ids = {m["id"] for m in result["markers"]} 135 + assert marker_ids == {2, 4, 6, 7} 136 + 137 + for marker in result["markers"]: 138 + assert "id" in marker 139 + assert "corners" in marker 140 + assert len(marker["corners"]) == 4 141 + for corner in marker["corners"]: 142 + assert len(corner) == 2 143 + assert isinstance(corner[0], (int, float)) 144 + assert isinstance(corner[1], (int, float)) 130 145 131 - # Polygon should roughly bound the marker positions 132 - # Each point should be a tuple of numeric values 133 - for point in result: 134 - assert len(point) == 2 135 - assert np.issubdtype(type(point[0]), np.number) or isinstance( 136 - point[0], (int, float) 137 - ) 138 - assert np.issubdtype(type(point[1]), np.number) or isinstance( 139 - point[1], (int, float) 140 - ) 146 + # Polygon should be present (all 4 corners detected) 147 + assert result["polygon"] is not None 148 + assert len(result["polygon"]) == 4 141 149 142 150 143 - def test_detect_convey_region_partial_markers(): 144 - """Test detection returns None when only some markers present.""" 151 + def test_detect_markers_partial(): 152 + """Test detect_markers returns markers but no polygon with partial detection.""" 145 153 # Create a test image 146 154 img_array = np.ones((480, 640, 3), dtype=np.uint8) * 255 147 155 ··· 150 158 marker_size = 50 151 159 pad = 20 152 160 153 - # Only place TL and TR markers 161 + # Only place TL (6) and TR (7) markers 154 162 for tag_id, pos in [(6, (pad, pad)), (7, (pad, 640 - pad - marker_size))]: 155 163 marker = cv2.aruco.generateImageMarker(dictionary, tag_id, marker_size) 156 164 marker_rgb = cv2.cvtColor(marker, cv2.COLOR_GRAY2RGB) ··· 159 167 160 168 pil_img = Image.fromarray(img_array) 161 169 162 - # Should return None - only 2 of 4 markers found 163 - result = detect_convey_region(pil_img) 164 - assert result is None 170 + result = detect_markers(pil_img) 171 + 172 + # Should return dict with markers but no polygon 173 + assert result is not None 174 + assert "markers" in result 175 + assert "polygon" in result 176 + 177 + # Should have 2 markers 178 + assert len(result["markers"]) == 2 179 + marker_ids = {m["id"] for m in result["markers"]} 180 + assert marker_ids == {6, 7} 181 + 182 + # Polygon should be None (only 2 of 4 corners) 183 + assert result["polygon"] is None