···11+# USING CODEGEN
22+33+i dont really understand how indigo lexgen works but this two works i guess
44+55+run this in the project root
66+77+### struct gen
88+you really only need to do this when lexicon changes
99+```bash
1010+go run github.com/bluesky-social/indigo/cmd/lexgen/ \
1111+--package labelmerge_lex --outdir ./labelmerge/lex \
1212+--external-lexicons ./labelmerge/lex/generation/external/com.atproto.label.defs.json \
1313+--build-file ./labelmerge/lex/generation/lexgen.json \
1414+./labelmerge/lex/generation/defs
1515+```
1616+1717+### server gen
1818+this is really only needed to be done once per project and never again.
1919+2020+written here for future reference.
2121+```bash
2222+go run github.com/bluesky-social/indigo/cmd/lexgen/ \
2323+--gen-server \
2424+--gen-handlers \
2525+--package main \
2626+--outdir ./cmd/labelmerge/ \
2727+--external-lexicons ./labelmerge/lex/generation/external/ \
2828+--types-import app.reddwarf.labelmerge:tangled.org/whey.party/red-dwarf-server/labelmerge/lex \
2929+--types-import com.atproto:github.com/bluesky-social/indigo/api/atproto \
3030+--build-file ./labelmerge/lex/generation/lexgen.json \
3131+./labelmerge/lex/defs/
3232+```
3333+3434+### typescript client gen
3535+3636+uses a separate tool but its kinda related so im gonna put it here too
3737+```bash
3838+npx @atproto/lex-cli gen-api ./your/typescript/project/path ./labelmerge/lex/generation/defs/app.reddwarf.labelmerge.queryLabels.json ./labelmerge/lex/generation/external/com.atproto.label.defs.json
3939+```
+45
labelmerge/lex/labelmergequeryLabels.go
···11+// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
22+33+// Lexicon schema: app.reddwarf.labelmerge.queryLabels
44+55+package labelmerge_lex
66+77+import (
88+ "context"
99+1010+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1111+ lexutil "github.com/bluesky-social/indigo/lex/util"
1212+)
1313+1414+// QueryLabels_Error is a "error" in the app.reddwarf.labelmerge.queryLabels schema.
1515+type QueryLabels_Error struct {
1616+ E *string `json:"e,omitempty" cborgen:"e,omitempty"`
1717+ S string `json:"s" cborgen:"s"`
1818+}
1919+2020+// QueryLabels_Output is the output of a app.reddwarf.labelmerge.queryLabels call.
2121+type QueryLabels_Output struct {
2222+ Error []*QueryLabels_Error `json:"error,omitempty" cborgen:"error,omitempty"`
2323+ Labels []*comatproto.LabelDefs_Label `json:"labels" cborgen:"labels"`
2424+}
2525+2626+// QueryLabels calls the XRPC method "app.reddwarf.labelmerge.queryLabels".
2727+//
2828+// l: List of label sources (labeler DIDs) to filter on.
2929+// s: List of label subjects (strings).
3030+// strict: If true then any errors will throw the entire query
3131+func QueryLabels(ctx context.Context, c lexutil.LexClient, l []string, s []string, strict bool) (*QueryLabels_Output, error) {
3232+ var out QueryLabels_Output
3333+3434+ params := map[string]interface{}{}
3535+ params["l"] = l
3636+ params["s"] = s
3737+ if strict {
3838+ params["strict"] = strict
3939+ }
4040+ if err := c.LexDo(ctx, lexutil.Query, "", "app.reddwarf.labelmerge.queryLabels", params, nil, &out); err != nil {
4141+ return nil, err
4242+ }
4343+4444+ return &out, nil
4545+}
+221
labelmerge/lru/lru.go
···11+package lru
22+33+import (
44+ "container/list"
55+ "fmt"
66+ "sync"
77+88+ badger "github.com/dgraph-io/badger/v4"
99+)
1010+1111+// Cache is a persistent LRU cache implementation
1212+type Cache[K comparable, V any] struct {
1313+ db *badger.DB
1414+ mu sync.RWMutex
1515+1616+ // In-memory LRU tracking (Key -> List Element)
1717+ cache map[K]*list.Element
1818+ list *list.List
1919+2020+ // Serialization functions
2121+ serialize func(V) ([]byte, error)
2222+ deserialize func([]byte) (V, error)
2323+2424+ // Hooks
2525+ OnAdd func(K)
2626+ OnRemove func(K)
2727+}
2828+2929+// New creates a new persistent LRU cache
3030+func New[K comparable, V any](dbPath string, serialize func(V) ([]byte, error), deserialize func([]byte) (V, error)) (*Cache[K, V], error) {
3131+ opts := badger.DefaultOptions(dbPath).WithLogger(nil) // Suppress Badger logs
3232+ db, err := badger.Open(opts)
3333+ if err != nil {
3434+ return nil, fmt.Errorf("failed to open Badger DB: %w", err)
3535+ }
3636+3737+ c := &Cache[K, V]{
3838+ db: db,
3939+ cache: make(map[K]*list.Element),
4040+ list: list.New(),
4141+ serialize: serialize,
4242+ deserialize: deserialize,
4343+ // Default NOP hooks
4444+ OnAdd: func(K) {},
4545+ OnRemove: func(K) {},
4646+ }
4747+4848+ return c, nil
4949+}
5050+5151+// Close closes the underlying BadgerDB
5252+func (c *Cache[K, V]) Close() error {
5353+ return c.db.Close()
5454+}
5555+5656+// LRUItem is used within the in-memory doubly linked list to track order
5757+type LRUItem[K comparable, V any] struct {
5858+ Key K
5959+ Value V
6060+}
6161+6262+// Get retrieves a value from the cache
6363+func (c *Cache[K, V]) Get(key K) (V, bool) {
6464+ c.mu.Lock()
6565+ defer c.mu.Unlock()
6666+6767+ // Check in-memory cache first
6868+ if ele, ok := c.cache[key]; ok {
6969+ // Move to front (mark as recently used)
7070+ c.list.MoveToFront(ele)
7171+ return ele.Value.(*LRUItem[K, V]).Value, true
7272+ }
7373+7474+ // Not in memory, check BadgerDB
7575+ var zero V
7676+ err := c.db.View(func(txn *badger.Txn) error {
7777+ item, err := txn.Get([]byte(fmt.Sprintf("%v", key)))
7878+ if err == badger.ErrKeyNotFound {
7979+ return nil
8080+ }
8181+ if err != nil {
8282+ return err
8383+ }
8484+8585+ return item.Value(func(val []byte) error {
8686+ value, err := c.deserialize(val)
8787+ if err != nil {
8888+ return err
8989+ }
9090+9191+ // Add to LRU list
9292+ ele := c.list.PushFront(&LRUItem[K, V]{Key: key, Value: value})
9393+ c.cache[key] = ele
9494+ c.OnAdd(key)
9595+9696+ return nil
9797+ })
9898+ })
9999+100100+ if err != nil {
101101+ return zero, false
102102+ }
103103+104104+ return zero, false
105105+}
106106+107107+// Put stores a value in the cache
108108+func (c *Cache[K, V]) Put(key K, value V) {
109109+ c.mu.Lock()
110110+ defer c.mu.Unlock()
111111+112112+ // Serialize the value
113113+ serializedValue, err := c.serialize(value)
114114+ if err != nil {
115115+ return
116116+ }
117117+118118+ // Write to BadgerDB
119119+ err = c.db.Update(func(txn *badger.Txn) error {
120120+ return txn.Set([]byte(fmt.Sprintf("%v", key)), serializedValue)
121121+ })
122122+ if err != nil {
123123+ return
124124+ }
125125+126126+ // Update LRU list
127127+ if ele, ok := c.cache[key]; ok {
128128+ // Update existing value
129129+ ele.Value.(*LRUItem[K, V]).Value = value
130130+ c.list.MoveToFront(ele)
131131+ } else {
132132+ // Add new item
133133+ ele := c.list.PushFront(&LRUItem[K, V]{Key: key, Value: value})
134134+ c.cache[key] = ele
135135+ c.OnAdd(key)
136136+ }
137137+}
138138+139139+// Remove removes a value from the cache
140140+func (c *Cache[K, V]) Remove(key K) {
141141+ c.mu.Lock()
142142+ defer c.mu.Unlock()
143143+144144+ // Remove from BadgerDB
145145+ err := c.db.Update(func(txn *badger.Txn) error {
146146+ return txn.Delete([]byte(fmt.Sprintf("%v", key)))
147147+ })
148148+ if err != nil && err != badger.ErrKeyNotFound {
149149+ return
150150+ }
151151+152152+ // Remove from LRU list
153153+ if ele, ok := c.cache[key]; ok {
154154+ c.list.Remove(ele)
155155+ delete(c.cache, key)
156156+ c.OnRemove(key)
157157+ }
158158+}
159159+160160+// Len returns the number of items in the cache
161161+func (c *Cache[K, V]) Len() int {
162162+ c.mu.RLock()
163163+ defer c.mu.RUnlock()
164164+ return len(c.cache)
165165+}
166166+167167+// Keys returns all keys in the cache
168168+func (c *Cache[K, V]) Keys() []K {
169169+ c.mu.RLock()
170170+ defer c.mu.RUnlock()
171171+172172+ keys := make([]K, 0, len(c.cache))
173173+ for k := range c.cache {
174174+ keys = append(keys, k)
175175+ }
176176+ return keys
177177+}
178178+179179+// Clear removes all items from the cache
180180+func (c *Cache[K, V]) Clear() error {
181181+ c.mu.Lock()
182182+ defer c.mu.Unlock()
183183+184184+ // Clear in-memory cache
185185+ c.cache = make(map[K]*list.Element)
186186+ c.list.Init()
187187+188188+ // Clear BadgerDB
189189+ return c.db.DropAll()
190190+}
191191+192192+// SetCapacity sets the maximum number of items the cache can hold
193193+// When the capacity is exceeded, the least recently used items are evicted
194194+func (c *Cache[K, V]) SetCapacity(capacity int) {
195195+ c.mu.Lock()
196196+ defer c.mu.Unlock()
197197+198198+ for len(c.cache) > capacity {
199199+ // Get the least recently used item (back of the list)
200200+ ele := c.list.Back()
201201+ if ele == nil {
202202+ break
203203+ }
204204+205205+ item := ele.Value.(*LRUItem[K, V])
206206+ key := item.Key
207207+208208+ // Remove from BadgerDB
209209+ err := c.db.Update(func(txn *badger.Txn) error {
210210+ return txn.Delete([]byte(fmt.Sprintf("%v", key)))
211211+ })
212212+ if err != nil && err != badger.ErrKeyNotFound {
213213+ continue
214214+ }
215215+216216+ // Remove from LRU list
217217+ c.list.Remove(ele)
218218+ delete(c.cache, key)
219219+ c.OnRemove(key)
220220+ }
221221+}
+386
labelmerge/stream/stream.go
···11+package stream
22+33+import (
44+ "context"
55+ "fmt"
66+ "log/slog"
77+ "net/http"
88+ "net/url"
99+ "strconv"
1010+ "strings"
1111+ "sync"
1212+ "time"
1313+1414+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1515+ "github.com/bluesky-social/indigo/atproto/identity"
1616+ "github.com/bluesky-social/indigo/atproto/syntax"
1717+ "github.com/bluesky-social/indigo/events"
1818+ "github.com/bluesky-social/indigo/events/schedulers/parallel"
1919+ "github.com/gorilla/websocket"
2020+)
2121+2222+const LabelSubscribePath = "xrpc/com.atproto.label.subscribeLabels"
2323+const MaxWorkers = 4
2424+const EventChannelCapacity = 1000
2525+2626+// LabelEvent is the simplified, unified event object sent to the consumer.
2727+type LabelEvent struct {
2828+ SourceDid string // The DID of the labeler service that sent the event
2929+ Cursor int64 // The sequence number (seq) of the event
3030+ Value *comatproto.LabelDefs_Label
3131+}
3232+3333+// SubscriptionContext holds the state for a single labeler connection.
3434+type SubscriptionContext struct {
3535+ DID string
3636+ ServiceURL string // Resolved WSS URL
3737+ CurrentCursor string // Sequence string used for connection restarts
3838+ WorkerCancel context.CancelFunc
3939+ WorkerCtx context.Context
4040+ IsRunning bool
4141+ mu sync.Mutex
4242+}
4343+4444+// LabelerSubscriptionManager manages connections to multiple ATProto labeler services.
4545+type LabelerSubscriptionManager struct {
4646+ log *slog.Logger
4747+ managerCtx context.Context
4848+ managerCancel context.CancelFunc
4949+ wg sync.WaitGroup
5050+5151+ resolver identity.Directory // DID Resolver dependency
5252+5353+ subscriptions map[string]*SubscriptionContext
5454+ subMu sync.RWMutex
5555+5656+ eventCh chan LabelEvent
5757+}
5858+5959+// NewLabelerSubscriptionManager creates a new instance of the manager.
6060+func NewLabelerSubscriptionManager(log *slog.Logger) *LabelerSubscriptionManager {
6161+ ctx, cancel := context.WithCancel(context.Background())
6262+ dir := identity.DefaultDirectory() // Cached with 24hr TTL
6363+ return &LabelerSubscriptionManager{
6464+ log: log,
6565+ managerCtx: ctx,
6666+ managerCancel: cancel,
6767+ subscriptions: make(map[string]*SubscriptionContext),
6868+ eventCh: make(chan LabelEvent, EventChannelCapacity),
6969+ resolver: dir,
7070+ }
7171+}
7272+7373+// Events returns the read-only channel where all aggregated LabelEvents are sent.
7474+func (m *LabelerSubscriptionManager) Events() <-chan LabelEvent {
7575+ return m.eventCh
7676+}
7777+7878+// Start initiates the subscription manager. It attempts to resolve the URL and start workers
7979+// for all currently added labelers.
8080+func (m *LabelerSubscriptionManager) Start() {
8181+ m.subMu.RLock()
8282+ defer m.subMu.RUnlock()
8383+8484+ m.log.Info("Starting Labeler Subscription Manager", "count", len(m.subscriptions))
8585+8686+ for _, sub := range m.subscriptions {
8787+ // Initial resolution check (the worker loop handles retries)
8888+ did, err1 := syntax.ParseDID(sub.DID)
8989+ if err1 != nil {
9090+ m.log.Warn(sub.DID + "ResolutionFailure invalid did")
9191+ } else {
9292+ ident, err2 := m.resolver.LookupDID(context.TODO(), did)
9393+ if err2 != nil {
9494+ m.log.Warn(sub.DID + "ResolutionFailure not reachable")
9595+ } else {
9696+ labelerURL := ident.GetServiceEndpoint("atproto_labeler")
9797+ if labelerURL == "" {
9898+ m.log.Warn(sub.DID + "ResolutionFailure no service endpoint")
9999+ } else {
100100+ sub.ServiceURL = labelerURL
101101+ m.log.Info("Initial DID resolved successfully", "did", sub.DID, "service_url", sub.ServiceURL)
102102+ }
103103+ }
104104+ }
105105+ m.startWorker(sub)
106106+ }
107107+}
108108+109109+// Stop gracefully shuts down all active connections and the manager.
110110+func (m *LabelerSubscriptionManager) Stop() {
111111+ m.managerCancel()
112112+ m.log.Info("Waiting for all labeler workers to shut down...")
113113+ m.wg.Wait()
114114+ m.log.Info("Manager stopped gracefully.")
115115+ close(m.eventCh)
116116+}
117117+118118+// AddLabeler registers a new labeler DID. URL resolution is handled lazily (in Start or in the worker loop).
119119+func (m *LabelerSubscriptionManager) AddLabeler(did string, cursor string) error {
120120+ if did == "" {
121121+ return fmt.Errorf("did cannot be empty")
122122+ }
123123+124124+ m.subMu.Lock()
125125+ defer m.subMu.Unlock()
126126+127127+ if _, exists := m.subscriptions[did]; exists {
128128+ m.log.Warn("Labeler already exists", "did", did)
129129+ return nil
130130+ }
131131+132132+ workerCtx, workerCancel := context.WithCancel(m.managerCtx)
133133+134134+ sub := &SubscriptionContext{
135135+ DID: did,
136136+ ServiceURL: "", // Must be resolved before use
137137+ CurrentCursor: cursor,
138138+ WorkerCtx: workerCtx,
139139+ WorkerCancel: workerCancel,
140140+ IsRunning: false,
141141+ }
142142+143143+ m.subscriptions[did] = sub
144144+145145+ // If the overall manager is running, try to resolve and start the worker immediately.
146146+ select {
147147+ case <-m.managerCtx.Done():
148148+ // Manager is shutting down, just register the sub but don't start
149149+ default:
150150+ // Attempt immediate resolution and start
151151+ // entry, err := m.resolver.Resolve(sub.DID)
152152+ // if err == nil {
153153+ // sub.ServiceURL = entry.Domain
154154+ // m.log.Info("Immediate DID resolution successful for new labeler", "did", sub.DID, "service_url", sub.ServiceURL)
155155+ // } else {
156156+ // m.log.Warn("Immediate DID resolution failed for new labeler. Worker will retry.", "did", sub.DID, "err", err)
157157+ // }
158158+ // m.startWorker(sub)
159159+ did, err1 := syntax.ParseDID(sub.DID)
160160+ if err1 != nil {
161161+ m.log.Warn(sub.DID + "ResolutionFailure invalid did")
162162+ } else {
163163+ ident, err2 := m.resolver.LookupDID(context.TODO(), did)
164164+ if err2 != nil {
165165+ m.log.Warn(sub.DID + "ResolutionFailure not reachable")
166166+ } else {
167167+ labelerURL := ident.GetServiceEndpoint("atproto_labeler")
168168+ if labelerURL == "" {
169169+ m.log.Warn(sub.DID + "ResolutionFailure no service endpoint")
170170+ } else {
171171+ sub.ServiceURL = labelerURL
172172+ m.log.Info("Initial DID resolved successfully", "did", sub.DID, "service_url", sub.ServiceURL)
173173+ }
174174+ }
175175+ }
176176+ m.startWorker(sub)
177177+ }
178178+179179+ m.log.Info("Labeler added", "did", did, "start_cursor", cursor)
180180+ return nil
181181+}
182182+183183+// RemoveLabeler stops the stream worker for the given DID and removes it from the manager.
184184+func (m *LabelerSubscriptionManager) RemoveLabeler(did string) {
185185+ m.subMu.Lock()
186186+ defer m.subMu.Unlock()
187187+188188+ sub, exists := m.subscriptions[did]
189189+ if !exists {
190190+ return
191191+ }
192192+193193+ m.log.Info("Removing labeler subscription", "did", did)
194194+ sub.WorkerCancel()
195195+ delete(m.subscriptions, did)
196196+}
197197+198198+// startWorker spins up the connection management goroutine for a single labeler.
199199+func (m *LabelerSubscriptionManager) startWorker(sub *SubscriptionContext) {
200200+ sub.mu.Lock()
201201+ if sub.IsRunning {
202202+ sub.mu.Unlock()
203203+ return
204204+ }
205205+ sub.IsRunning = true
206206+ sub.mu.Unlock()
207207+208208+ m.wg.Add(1)
209209+ go func() {
210210+ defer m.wg.Done()
211211+ defer func() {
212212+ sub.mu.Lock()
213213+ sub.IsRunning = false
214214+ sub.mu.Unlock()
215215+ }()
216216+ m.manageSubscription(sub)
217217+ }()
218218+}
219219+220220+// manageSubscription handles connection retries, cursors, and running the stream processor.
221221+func (m *LabelerSubscriptionManager) manageSubscription(sub *SubscriptionContext) {
222222+ didLog := m.log.With("did", sub.DID)
223223+ didLog.Info("Worker started")
224224+225225+ for {
226226+ select {
227227+ case <-sub.WorkerCtx.Done():
228228+ didLog.Info("Worker received stop signal, shutting down.")
229229+ return
230230+ default:
231231+ // Proceed
232232+ }
233233+ var err error
234234+235235+ // 1. Ensure ServiceURL is resolved
236236+ if sub.ServiceURL == "" {
237237+ didLog.Info("Service URL not resolved, attempting DID resolution.")
238238+ // entry, err := m.resolver.Resolve(sub.DID)
239239+ // if err != nil {
240240+ // didLog.Error("DID resolution failed, retrying...", "err", err)
241241+ // goto WaitAndRetry
242242+ // }
243243+ // sub.ServiceURL = entry.Domain
244244+ // didLog.Info("DID resolution successful", "service_url", sub.ServiceURL)
245245+ did, err1 := syntax.ParseDID(sub.DID)
246246+ if err1 != nil {
247247+ m.log.Warn(sub.DID + "ResolutionFailure invalid did")
248248+ } else {
249249+ ident, err2 := m.resolver.LookupDID(context.TODO(), did)
250250+ if err2 != nil {
251251+ m.log.Warn(sub.DID + "ResolutionFailure not reachable")
252252+ goto WaitAndRetry
253253+ } else {
254254+ labelerURL := ident.GetServiceEndpoint("atproto_labeler")
255255+ if labelerURL == "" {
256256+ m.log.Warn(sub.DID + "ResolutionFailure no service endpoint")
257257+ } else {
258258+ sub.ServiceURL = labelerURL
259259+ m.log.Info("Initial DID resolved successfully", "did", sub.DID, "service_url", sub.ServiceURL)
260260+ }
261261+ }
262262+ }
263263+ }
264264+265265+ // 2. Attempt to stream
266266+ err = m.dialAndStream(sub)
267267+268268+ if sub.WorkerCtx.Err() != nil {
269269+ return
270270+ }
271271+272272+ if err != nil {
273273+ didLog.Error("Stream failed, attempting restart", "err", err, "cursor", sub.CurrentCursor)
274274+ } else {
275275+ didLog.Info("Stream closed cleanly, attempting restart.")
276276+ }
277277+278278+ WaitAndRetry:
279279+ // Wait before retrying
280280+ didLog.Info("Waiting 5s before reconnecting/retrying resolution...")
281281+ select {
282282+ case <-time.After(5 * time.Second):
283283+ // Proceed with retry
284284+ case <-sub.WorkerCtx.Done():
285285+ return // Exit if canceled during wait
286286+ }
287287+ }
288288+}
289289+290290+func httpToWS(s string) string {
291291+ if strings.HasPrefix(s, "https://") {
292292+ return "wss://" + s[len("https://"):]
293293+ }
294294+ if strings.HasPrefix(s, "http://") {
295295+ return "ws://" + s[len("http://"):]
296296+ }
297297+ return s
298298+}
299299+300300+// dialAndStream establishes the WebSocket connection and processes the stream.
301301+func (m *LabelerSubscriptionManager) dialAndStream(sub *SubscriptionContext) error {
302302+ didLog := m.log.With("did", sub.DID)
303303+304304+ fullURL := httpToWS(sub.ServiceURL) + "/" + LabelSubscribePath
305305+ if sub.CurrentCursor != "" {
306306+ fullURL = fmt.Sprintf("%s?cursor=%s", fullURL, sub.CurrentCursor)
307307+ }
308308+309309+ u, err := url.Parse(fullURL)
310310+ if err != nil {
311311+ return fmt.Errorf("failed to parse URL: %w", err)
312312+ }
313313+314314+ // 1. Establish WebSocket Connection
315315+ dialer := websocket.DefaultDialer
316316+ con, resp, err := dialer.Dial(u.String(), http.Header{
317317+ "User-Agent": []string{"LabelerSubscriptionManager/1.0"},
318318+ })
319319+320320+ if err != nil {
321321+ if resp != nil {
322322+ didLog.Error("WebSocket connection failed", "status", resp.StatusCode)
323323+ }
324324+ // If dial fails due to network/DNS/TLS error, clear the service URL to force re-resolution
325325+ // on the next loop iteration.
326326+ sub.mu.Lock()
327327+ sub.ServiceURL = ""
328328+ sub.mu.Unlock()
329329+330330+ return fmt.Errorf("failed to dial websocket to %s: %w", u.String(), err)
331331+ }
332332+ defer con.Close()
333333+ didLog.Info("Successfully connected to Labeler firehose", "url", u.String())
334334+335335+ // 2. Define Event Callbacks
336336+ rsc := &events.RepoStreamCallbacks{
337337+ LabelLabels: func(evt *comatproto.LabelSubscribeLabels_Labels) error {
338338+ if evt.Seq == 0 || evt.Labels == nil {
339339+ return nil
340340+ }
341341+342342+ // Update the cursor immediately
343343+ sub.mu.Lock()
344344+ sub.CurrentCursor = strconv.FormatInt(evt.Seq, 10)
345345+ sub.mu.Unlock()
346346+347347+ // Process and simplify each label event
348348+ for _, label := range evt.Labels {
349349+ select {
350350+ case m.eventCh <- LabelEvent{
351351+ SourceDid: sub.DID,
352352+ Cursor: evt.Seq,
353353+ Value: label,
354354+ }:
355355+ // Sent successfully
356356+ default:
357357+ didLog.Warn("Event channel full, dropping label event", "seq", evt.Seq, "uri", label.Uri)
358358+ }
359359+ }
360360+ return nil
361361+ },
362362+363363+ LabelInfo: func(evt *comatproto.LabelSubscribeLabels_Info) error {
364364+ if evt.Message != nil {
365365+ didLog.Info("Stream Info Message", "name", evt.Name, "message", *evt.Message)
366366+ }
367367+ return nil
368368+ },
369369+370370+ Error: func(evt *events.ErrorFrame) error {
371371+ didLog.Error("Stream processing error frame", "error_type", evt.Error, "message", evt.Message)
372372+ return fmt.Errorf("atproto stream error: %s", evt.Message)
373373+ },
374374+ }
375375+376376+ // 3. Create Scheduler and Start Processing
377377+ scheduler := parallel.NewScheduler(
378378+ MaxWorkers,
379379+ EventChannelCapacity,
380380+ fullURL,
381381+ rsc.EventHandler,
382382+ )
383383+384384+ // HandleRepoStream blocks until the connection closes or the context cancels.
385385+ return events.HandleRepoStream(sub.WorkerCtx, con, scheduler, didLog)
386386+}