audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: use ATProto SDK types instead of hand-rolled string parsing (#1076)

* refactor: use ATProto SDK types (AtUri, NSID) instead of hand-rolled string parsing

replace manual rsplit/split parsing with the SDK's proper NSID and AtUri
types, which provide validation and semantic clarity.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: update STATUS.md with SDK types refactor, fix deferred import

move AtUri import to module level in client.py. update STATUS.md to
document PR #1076 and correct the rsplit description.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

authored by

nate nowack
Claude Opus 4.6
and committed by
GitHub
f74a09ee d8d183f2

+21 -19
+6 -5
STATUS.md
··· 47 47 48 48 ### March 2026 49 49 50 - #### Jetstream real-time ingestion — staging smoketest (PRs #1069-1074, Mar 10-12) 50 + #### Jetstream real-time ingestion — staging smoketest (PRs #1069-1074, #1076, Mar 10-12) 51 51 52 - shipped the Jetstream WebSocket consumer for real-time ATProto record ingestion, then ran a full staging smoketest that surfaced two bugs before production. 52 + shipped the Jetstream WebSocket consumer for real-time ATProto record ingestion, then ran a full staging smoketest that surfaced two bugs before production. follow-up refactor replaced hand-rolled string parsing with ATProto SDK types. 53 53 54 54 **what shipped:** 55 - - **Jetstream consumer** (PR #1070): perpetual docket task connects to `jetstream2.us-east.bsky.network`, filters for `fm.plyr.*` collections, and dispatches events to ingest tasks. `rsplit(".", 1)` extracts record type, so `fm.plyr.stg.track` → `track` works across all environments. docket's Redis lock ensures single-instance. exponential backoff on reconnect, cursor persistence in Redis. 55 + - **Jetstream consumer** (PR #1070): perpetual docket task connects to `jetstream2.us-east.bsky.network`, filters for `fm.plyr.*` collections, and dispatches events to ingest tasks. uses `NSID.from_str(collection).name` to extract record type (e.g. `fm.plyr.stg.track` → `track`) across all environments. docket's Redis lock ensures single-instance. exponential backoff on reconnect, cursor persistence in Redis. 56 56 - **lexicon validation + ingest task layer** (PR #1069): 12 ingest tasks covering track/like/comment/list CRUD and profile updates. `validate_record()` checks incoming records against lexicon JSON schemas before ingestion. 57 57 - **reserve-then-publish** (PR #1072): upload API reserves a DB row with `publish_state=pending` and the AT URI before writing to the PDS. when the Jetstream `track.create` event arrives, `ingest_track_create` finds the pending row and promotes it to `published` with the CID — no duplicate, no race. 58 + - **SDK types refactor** (PR #1076): replaced hand-rolled `rsplit(".", 1)` and `str.split("/")` parsing with `NSID.from_str()` and `AtUri.from_str()` from the ATProto SDK (`atproto_core`). these types provide proper validation and semantic field access (`.name`, `.host`, `.collection`, `.rkey`). cleaned up a dead `_parse_at_uri` re-export. 58 59 59 60 **staging smoketest (Mar 11-12):** 60 61 ··· 276 277 277 278 ### current focus 278 279 279 - Jetstream real-time ingestion enabled on staging — 24h soak in progress. staging smoketest validated reserve-then-publish reconciliation, direct PDS writes, and all 12 event types. two bugs fixed (concurrent delete deadlock, lexicon validation no-op in Docker). open questions on audit trail visibility and moderation for PDS-direct records before production enablement. 280 + Jetstream real-time ingestion enabled on staging — 24h soak in progress. staging smoketest validated reserve-then-publish reconciliation, direct PDS writes, and all 12 event types. two bugs fixed (concurrent delete deadlock, lexicon validation no-op in Docker). ATProto URI/NSID parsing now uses SDK types instead of hand-rolled string splits. open questions on audit trail visibility and moderation for PDS-direct records before production enablement. 280 281 281 282 ### known issues 282 283 - iOS PWA audio may hang on first play after backgrounding ··· 411 412 412 413 --- 413 414 414 - this is a living document. last updated 2026-03-12 (Jetstream staging smoketest). 415 + this is a living document. last updated 2026-03-12 (Jetstream smoketest + SDK types refactor). 415 416
+7 -7
backend/src/backend/_internal/atproto/client.py
··· 6 6 from datetime import UTC, datetime, timedelta 7 7 from typing import Any, BinaryIO 8 8 9 + from atproto import AtUri 9 10 from atproto_oauth.models import OAuthSession 10 11 from cachetools import LRUCache 11 12 ··· 322 323 323 324 324 325 def parse_at_uri(uri: str) -> tuple[str, str, str]: 325 - """parse an AT URI into (repo, collection, rkey).""" 326 - if not uri.startswith("at://"): 327 - raise ValueError(f"Invalid AT URI format: {uri}") 328 - parts = uri.replace("at://", "").split("/") 329 - if len(parts) != 3: 330 - raise ValueError(f"Invalid AT URI structure: {uri}") 331 - return parts[0], parts[1], parts[2] 326 + """parse an AT URI into (repo, collection, rkey). 327 + 328 + thin wrapper around the SDK's AtUri for call-site compatibility. 329 + """ 330 + parsed = AtUri.from_str(uri) 331 + return parsed.host, parsed.collection, parsed.rkey
-2
backend/src/backend/_internal/atproto/records/__init__.py
··· 27 27 from backend._internal.atproto.client import ( 28 28 _refresh_session_tokens, 29 29 make_pds_request as _make_pds_request, 30 - parse_at_uri as _parse_at_uri, 31 30 reconstruct_oauth_session as _reconstruct_oauth_session, 32 31 ) 33 32 34 33 __all__ = [ 35 34 "_make_pds_request", 36 - "_parse_at_uri", 37 35 "_reconstruct_oauth_session", 38 36 "_refresh_session_tokens", 39 37 "build_track_record",
+5 -4
backend/src/backend/_internal/jetstream.py
··· 18 18 import logfire 19 19 import orjson 20 20 import websockets 21 + from atproto_core.nsid import NSID 21 22 from docket import Perpetual 22 23 from sqlalchemy import select 23 24 from websockets.asyncio.client import ClientConnection ··· 168 169 """dispatch event to the appropriate ingest task via docket.""" 169 170 docket = get_docket() 170 171 171 - # determine which collection type this is (strip namespace prefix) 172 + # extract record type from the collection NSID 172 173 # e.g. "fm.plyr.track" or "fm.plyr.dev.track" → "track" 173 - parts = collection.rsplit(".", 1) 174 - if len(parts) != 2: 174 + try: 175 + record_type = NSID.from_str(collection).name 176 + except Exception: 175 177 return 176 - record_type = parts[1] 177 178 178 179 task_map: dict[tuple[str, str], Any] = { 179 180 ("track", "create"): ingest_track_create,
+3 -1
backend/src/backend/utilities/lexicon.py
··· 11 11 from pathlib import Path 12 12 from typing import Any 13 13 14 + from atproto_core.nsid import NSID 15 + 14 16 logger = logging.getLogger(__name__) 15 17 16 18 ··· 31 33 @lru_cache(maxsize=32) 32 34 def _load_lexicon(lexicon_id: str) -> dict[str, Any] | None: 33 35 """load a lexicon JSON by its NSID, using the last segment as filename.""" 34 - filename = lexicon_id.rsplit(".", 1)[-1] + ".json" 36 + filename = NSID.from_str(lexicon_id).name + ".json" 35 37 path = _LEXICONS_DIR / filename 36 38 if not path.exists(): 37 39 logger.debug("lexicon file not found: %s", path)