A terminal-only Bluesky / AT Protocol client written in Fortran, with a asm/Rust native firehose decoder for the relay-raw stream. DM slide support. Dither image composer. Yes, that Fortran www.patreon.com/FormerLab
rust atproto fun fortran assembly
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fortransky v1.2 — Assemblersky integration, relay-raw live decode via Python cbor2

FormerLab 27806a23 13c052f7

+145 -17
+2 -1
.gitignore
··· 5 5 *.o 6 6 *.mod 7 7 *.out 8 - ~/.fortransky/ 8 + ~/.fortransky/ 9 + bridge/assemblersky/bin/assemblersky_cli
bridge/assemblersky/bin/.gitkeep

This is a binary file and will not be displayed.

+23
scripts/check_assemblersky.sh
··· 1 + #!/usr/bin/env bash 2 + set -euo pipefail 3 + 4 + ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" 5 + 6 + echo "[fortransky] checking Assemblersky CLI..." 7 + if [[ -x "$ROOT/bridge/assemblersky/bin/assemblersky_cli" ]]; then 8 + echo "[fortransky] found bundled Assemblersky CLI" 9 + exit 0 10 + fi 11 + 12 + if command -v assemblersky_cli >/dev/null 2>&1; then 13 + echo "[fortransky] found Assemblersky CLI on PATH" 14 + exit 0 15 + fi 16 + 17 + echo "[fortransky] Assemblersky CLI not found." 18 + echo "[fortransky] Expected one of:" 19 + echo " $ROOT/bridge/assemblersky/bin/assemblersky_cli" 20 + echo " assemblersky_cli on PATH" 21 + echo 22 + echo "[fortransky] You can still run relay_raw_tail.py --fixture while preparing the native decoder." 23 + exit 0
+120 -16
scripts/relay_raw_tail.py
··· 24 24 25 25 26 26 def detect_native_decoder(root: Path) -> Optional[str]: 27 - env = os.environ.get('FORTRANSKY_FIREHOSE_DECODER', '').strip() 28 - candidates = [] 29 - if env: 30 - candidates.append(env) 31 - candidates.extend([ 27 + # Detection order: 28 + # 1. FORTRANSKY_RELAY_DECODER — explicit override 29 + # 2. FORTRANSKY_ASSEMBLERSKY_DECODER — explicit Assemblersky path 30 + # 3. bridge/assemblersky/bin/assemblersky_cli — bundled Assemblersky 31 + # 4. assemblersky_cli on PATH 32 + # 5. FORTRANSKY_FIREHOSE_DECODER — explicit Rust bridge path 33 + # 6. bridge/firehose-bridge/target/release/firehose_bridge_cli — bundled Rust 34 + # 7. bridge/firehose-bridge/target/debug/firehose_bridge_cli 35 + # 8. firehose_bridge_cli on PATH 36 + candidates = [ 37 + os.environ.get('FORTRANSKY_RELAY_DECODER', '').strip() or None, 38 + os.environ.get('FORTRANSKY_ASSEMBLERSKY_DECODER', '').strip() or None, 39 + str(root / 'bridge' / 'assemblersky' / 'bin' / 'assemblersky_cli'), 40 + shutil.which('assemblersky_cli'), 41 + os.environ.get('FORTRANSKY_FIREHOSE_DECODER', '').strip() or None, 32 42 str(root / 'bridge' / 'firehose-bridge' / 'target' / 'release' / 'firehose_bridge_cli'), 33 43 str(root / 'bridge' / 'firehose-bridge' / 'target' / 'debug' / 'firehose_bridge_cli'), 34 - ]) 44 + shutil.which('firehose_bridge_cli'), 45 + ] 35 46 for candidate in candidates: 36 47 if candidate and Path(candidate).exists() and os.access(candidate, os.X_OK): 37 48 return candidate 38 - return shutil.which('firehose_bridge_cli') 49 + return None 39 50 40 51 41 52 def decode_frame_native(raw: bytes, decoder: str) -> list[dict]: 42 - proc = subprocess.run([decoder], input=raw, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) 53 + # assemblersky_cli uses --input FILE; firehose_bridge_cli reads stdin directly 54 + # Write frame to a temp file for assemblersky_cli 55 + import tempfile 56 + if 'assemblersky' in Path(decoder).name: 57 + with tempfile.NamedTemporaryFile(delete=False, suffix='.bin') as tf: 58 + tf.write(raw) 59 + tmp_path = tf.name 60 + cmd = [decoder, '--input', tmp_path] 61 + else: 62 + tmp_path = None 63 + cmd = [decoder] 64 + proc = subprocess.run( 65 + cmd, 66 + input=raw if tmp_path is None else None, 67 + stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False 68 + ) 69 + if tmp_path: 70 + try: 71 + import os; os.unlink(tmp_path) 72 + except OSError: 73 + pass 43 74 if proc.returncode != 0: 75 + # rc=1 from assemblersky_cli means no matching op in this frame — not an error 76 + if tmp_path is not None and proc.returncode == 1: 77 + return [] 44 78 raise RuntimeError(proc.stderr.decode('utf-8', errors='replace').strip() or 'native decoder failed') 45 79 events = [] 46 - for line in proc.stdout.decode('utf-8', errors='replace').splitlines(): 80 + output = proc.stdout.decode('utf-8', errors='replace').strip() 81 + if not output: 82 + return events 83 + # Assemblersky emits pretty-printed JSON objects separated by blank lines. 84 + # firehose_bridge_cli emits compact NDJSON (one object per line). 85 + # Handle both: try splitting on blank lines first, then fall back to line-by-line. 86 + def normalise_asb(obj): 87 + if 'repo' in obj and 'did' not in obj: 88 + record = obj.get('record', {}) if isinstance(obj.get('record'), dict) else {} 89 + obj['did'] = obj.get('repo', '') 90 + obj['handle'] = '' 91 + obj['text'] = record.get('text', '') 92 + obj['time_us'] = str(obj.get('seq', '')) 93 + obj['record_type'] = obj.get('collection', '') 94 + obj['source'] = 'relay-raw-native' 95 + obj['error'] = '' 96 + obj['uri'] = (f"at://{obj['did']}/{obj.get('collection','')}" 97 + f"/{obj.get('rkey','')}" ) 98 + obj['record_json'] = record 99 + return obj 100 + 101 + # Try parsing as a single JSON object (pretty-printed Assemblersky output) 102 + try: 103 + obj = json.loads(output) 104 + events.append(normalise_asb(obj)) 105 + return events 106 + except json.JSONDecodeError: 107 + pass 108 + 109 + # Fall back to NDJSON line-by-line (firehose_bridge_cli compact output) 110 + for line in output.splitlines(): 47 111 line = line.strip() 48 112 if not line: 49 113 continue 50 - events.append(json.loads(line)) 114 + try: 115 + obj = json.loads(line) 116 + events.append(normalise_asb(obj)) 117 + except Exception: 118 + continue 51 119 return events 52 120 53 121 ··· 114 182 115 183 116 184 def decode_frame_python(raw: bytes) -> list[dict]: 117 - decoder = cbor2.CBORDecoder(io.BytesIO(raw)) 118 - header = decoder.decode() 119 - body = decoder.decode() 120 - if not isinstance(header, dict) or not isinstance(body, dict): 185 + # cbor2's streaming decoder has a read-ahead buffer that overshoots the first 186 + # item. Find the header boundary by trying incremental slices. 187 + header = None 188 + header_len = 0 189 + for n in range(5, min(30, len(raw))): 190 + try: 191 + h = cbor2.loads(raw[:n]) 192 + if isinstance(h, dict) and 'op' in h: 193 + header = h 194 + header_len = n 195 + break 196 + except Exception: 197 + continue 198 + if header is None: 199 + return [] 200 + try: 201 + body = cbor2.loads(raw[header_len:]) 202 + except Exception: 203 + return [] 204 + if not isinstance(body, dict): 121 205 return [] 122 206 if header.get('t') != '#commit': 123 207 return [] ··· 138 222 if not path.startswith('app.bsky.feed.post/'): 139 223 continue 140 224 cid = op.get('cid', b'') 225 + # cbor2 decodes CIDs as CBORTag(42, bytes) — unwrap the tag value 226 + if hasattr(cid, 'value'): 227 + cid = cid.value 228 + # CID bytes have a leading 0x00 multibase prefix in CBOR — strip it 229 + if isinstance(cid, (bytes, bytearray)) and len(cid) > 0 and cid[0] == 0: 230 + cid = cid[1:] 141 231 if not isinstance(cid, (bytes, bytearray)): 142 232 continue 143 233 block = blocks.get(bytes(cid)) ··· 153 243 154 244 155 245 def decode_frame(raw: bytes, root: Path) -> list[dict]: 246 + # For live streaming use Python decoder — spawning a subprocess per frame 247 + # is too slow at firehose rates. Native decoder is used for fixture/single frames. 248 + return decode_frame_python(raw) 249 + 250 + 251 + def decode_frame_single(raw: bytes, root: Path) -> list[dict]: 252 + # For single frame decode (fixture path) prefer native decoder if available 156 253 decoder = detect_native_decoder(root) 157 254 if decoder: 158 255 return decode_frame_native(raw, decoder) ··· 160 257 161 258 162 259 def load_fixture_events(frame_file: Path, root: Path) -> list[dict]: 163 - return decode_frame(frame_file.read_bytes(), root) 260 + return decode_frame_single(frame_file.read_bytes(), root) 164 261 165 262 166 263 async def load_live_events(url: str, limit: int, root: Path) -> list[dict]: 167 264 collected: list[dict] = [] 265 + frames_seen = 0 266 + # Cap total frames scanned to avoid hanging when decoder filters most frames 267 + max_frames = limit * 500 168 268 async with websockets.connect(url, max_size=2**22, ping_interval=20, ping_timeout=20) as ws: 169 269 async for raw in ws: 170 270 if isinstance(raw, str): 171 271 continue 272 + frames_seen += 1 172 273 try: 173 274 events = decode_frame(raw, root) 174 - except Exception: 275 + except Exception as _frame_exc: 276 + sys.stderr.write('frame decode error: ' + str(_frame_exc) + chr(10)) 175 277 events = [] 176 278 for ev in events: 177 279 collected.append(ev) 178 280 if len(collected) >= limit: 179 281 return collected 282 + if frames_seen >= max_frames: 283 + break 180 284 return collected 181 285 182 286