this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

some refactoring, and add a dedicated 'indexer' binary called bgs

+544 -20
+149
cmd/bgs/fedmgr.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "fmt" 8 + "log" 9 + "math/rand" 10 + "net" 11 + "sync" 12 + "time" 13 + 14 + "github.com/gorilla/websocket" 15 + "github.com/whyrusleeping/gosky/events" 16 + "gorm.io/gorm" 17 + ) 18 + 19 + type IndexCallback func(context.Context, *PDS, *events.Event) error 20 + 21 + // TODO: rename me 22 + type Slurper struct { 23 + cb IndexCallback 24 + 25 + db *gorm.DB 26 + 27 + lk sync.Mutex 28 + active map[string]*PDS 29 + } 30 + 31 + func NewSlurper(db *gorm.DB, cb IndexCallback) *Slurper { 32 + return &Slurper{ 33 + cb: cb, 34 + db: db, 35 + active: make(map[string]*PDS), 36 + } 37 + } 38 + 39 + func (s *Slurper) SubscribeToPds(ctx context.Context, host string) error { 40 + // TODO: for performance, lock on the hostname instead of global 41 + s.lk.Lock() 42 + defer s.lk.Unlock() 43 + 44 + _, ok := s.active[host] 45 + if ok { 46 + return nil 47 + } 48 + 49 + var peering PDS 50 + if err := s.db.First(&peering, "host = ?", host).Error; err != nil { 51 + if !errors.Is(err, gorm.ErrRecordNotFound) { 52 + return err 53 + } 54 + 55 + // New PDS! 56 + npds := PDS{ 57 + Host: host, 58 + } 59 + if err := s.db.Create(&npds).Error; err != nil { 60 + return err 61 + } 62 + 63 + peering = npds 64 + } 65 + 66 + s.active[host] = &peering 67 + 68 + go s.subscribeWithRedialer(&peering) 69 + 70 + return nil 71 + } 72 + 73 + func (s *Slurper) subscribeWithRedialer(host *PDS) { 74 + defer func() { 75 + s.lk.Lock() 76 + defer s.lk.Unlock() 77 + 78 + delete(s.active, host.Host) 79 + }() 80 + 81 + d := websocket.Dialer{} 82 + 83 + var backoff int 84 + for { 85 + 86 + con, res, err := d.Dial("ws://"+host.Host+"/events", nil) 87 + if err != nil { 88 + fmt.Printf("dialing %q failed: %s", host.Host, err) 89 + time.Sleep(sleepForBackoff(backoff)) 90 + backoff++ 91 + continue 92 + } 93 + 94 + fmt.Println("event subscription response code: ", res.StatusCode) 95 + 96 + if err := s.handleConnection(host, con); err != nil { 97 + if errors.Is(err, ErrTimeoutShutdown) { 98 + log.Printf("shutting down pds subscription to %s, no activity after %s", host.Host, EventsTimeout) 99 + return 100 + } 101 + log.Printf("connection to %q failed: %s", host.Host, err) 102 + } 103 + } 104 + } 105 + 106 + func sleepForBackoff(b int) time.Duration { 107 + if b == 0 { 108 + return 0 109 + } 110 + 111 + if b < 10 { 112 + return (time.Duration(b) * 2) + (time.Millisecond * time.Duration(rand.Intn(1000))) 113 + } 114 + 115 + return time.Second * 30 116 + } 117 + 118 + var ErrTimeoutShutdown = fmt.Errorf("timed out waiting for new events") 119 + 120 + var EventsTimeout = time.Minute 121 + 122 + func (s *Slurper) handleConnection(host *PDS, con *websocket.Conn) error { 123 + for { 124 + if err := con.SetReadDeadline(time.Now().Add(EventsTimeout)); err != nil { 125 + return fmt.Errorf("failed to set read deadline: %w", err) 126 + } 127 + 128 + mt, data, err := con.ReadMessage() 129 + if err != nil { 130 + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { 131 + return ErrTimeoutShutdown 132 + } 133 + 134 + return err 135 + } 136 + 137 + _ = mt 138 + 139 + var ev events.Event 140 + if err := json.Unmarshal(data, &ev); err != nil { 141 + return fmt.Errorf("failed to unmarshal event: %w", err) 142 + } 143 + 144 + fmt.Println("got event: ", host.Host, ev.Kind) 145 + if err := s.cb(context.TODO(), host, &ev); err != nil { 146 + log.Printf("failed to index event from %q: %s", host.Host, err) 147 + } 148 + } 149 + }
+318
cmd/bgs/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "log" 8 + "net/url" 9 + 10 + "github.com/gorilla/websocket" 11 + "github.com/labstack/echo/v4" 12 + "github.com/urfave/cli/v2" 13 + "github.com/whyrusleeping/gosky/api" 14 + bsky "github.com/whyrusleeping/gosky/api/bsky" 15 + "github.com/whyrusleeping/gosky/carstore" 16 + cliutil "github.com/whyrusleeping/gosky/cmd/gosky/util" 17 + "github.com/whyrusleeping/gosky/events" 18 + "github.com/whyrusleeping/gosky/indexer" 19 + "github.com/whyrusleeping/gosky/notifs" 20 + "github.com/whyrusleeping/gosky/plc" 21 + "github.com/whyrusleeping/gosky/repomgr" 22 + "github.com/whyrusleeping/gosky/types" 23 + "github.com/whyrusleeping/gosky/xrpc" 24 + "go.opentelemetry.io/otel" 25 + "go.opentelemetry.io/otel/attribute" 26 + "go.opentelemetry.io/otel/exporters/jaeger" 27 + "go.opentelemetry.io/otel/sdk/resource" 28 + tracesdk "go.opentelemetry.io/otel/sdk/trace" 29 + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 30 + "gorm.io/gorm" 31 + "gorm.io/plugin/opentelemetry/tracing" 32 + ) 33 + 34 + func main() { 35 + app := cli.NewApp() 36 + 37 + app.Flags = []cli.Flag{ 38 + &cli.BoolFlag{ 39 + Name: "jaeger", 40 + }, 41 + &cli.StringFlag{ 42 + Name: "db", 43 + Value: "sqlite=bgs.db", 44 + }, 45 + &cli.StringFlag{ 46 + Name: "carstoredb", 47 + Value: "sqlite=carstore.db", 48 + }, 49 + &cli.StringFlag{ 50 + Name: "carstore", 51 + Value: "bgscarstore", 52 + }, 53 + &cli.BoolFlag{ 54 + Name: "dbtracing", 55 + }, 56 + &cli.StringFlag{ 57 + Name: "plc", 58 + Usage: "hostname of the plc server", 59 + Value: "https://plc.directory", 60 + }, 61 + } 62 + 63 + app.Action = func(cctx *cli.Context) error { 64 + 65 + if cctx.Bool("jaeger") { 66 + url := "http://localhost:14268/api/traces" 67 + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) 68 + if err != nil { 69 + return err 70 + } 71 + tp := tracesdk.NewTracerProvider( 72 + // Always be sure to batch in production. 73 + tracesdk.WithBatcher(exp), 74 + // Record information about this application in a Resource. 75 + tracesdk.WithResource(resource.NewWithAttributes( 76 + semconv.SchemaURL, 77 + semconv.ServiceNameKey.String("bgs"), 78 + attribute.String("environment", "test"), 79 + attribute.Int64("ID", 1), 80 + )), 81 + ) 82 + 83 + otel.SetTracerProvider(tp) 84 + } 85 + 86 + dbstr := cctx.String("db") 87 + 88 + db, err := cliutil.SetupDatabase(dbstr) 89 + if err != nil { 90 + return err 91 + } 92 + 93 + if cctx.Bool("dbtracing") { 94 + if err := db.Use(tracing.NewPlugin()); err != nil { 95 + return err 96 + } 97 + } 98 + 99 + cardb, err := cliutil.SetupDatabase(cctx.String("carstoredb")) 100 + if err != nil { 101 + return err 102 + } 103 + 104 + csdir := cctx.String("carstore") 105 + cstore, err := carstore.NewCarStore(cardb, csdir) 106 + if err != nil { 107 + return err 108 + } 109 + 110 + repoman := repomgr.NewRepoManager(db, cstore) 111 + 112 + evtman := events.NewEventManager() 113 + 114 + notifman := notifs.NewNotificationManager(db, repoman.GetRecord) 115 + 116 + didr := &api.PLCServer{Host: cctx.String("plc")} 117 + 118 + ix, err := indexer.NewIndexer(db, notifman, evtman, didr) 119 + if err != nil { 120 + return err 121 + } 122 + 123 + bgs := &BGS{ 124 + index: ix, 125 + db: db, 126 + 127 + repoman: repoman, 128 + events: evtman, 129 + didr: didr, 130 + } 131 + bgs.slurper = NewSlurper(db, bgs.handleFedEvent) 132 + 133 + return bgs.Start(":2470") 134 + } 135 + 136 + app.RunAndExitOnError() 137 + } 138 + 139 + type BGS struct { 140 + index *indexer.Indexer 141 + db *gorm.DB 142 + slurper *Slurper 143 + events *events.EventManager 144 + didr plc.PLCClient 145 + 146 + repoman *repomgr.RepoManager 147 + } 148 + 149 + func (bgs *BGS) Start(listen string) error { 150 + e := echo.New() 151 + 152 + // TODO: this API is temporary until we formalize what we want here 153 + e.POST("/add-target", bgs.handleAddTarget) 154 + 155 + e.GET("/events", bgs.EventsHandler) 156 + 157 + return e.Start(listen) 158 + } 159 + 160 + type PDS struct { 161 + gorm.Model 162 + 163 + Host string 164 + } 165 + 166 + type User struct { 167 + gorm.Model 168 + Handle string `gorm:"uniqueIndex"` 169 + Did string `gorm:"uniqueIndex"` 170 + PDS uint 171 + } 172 + 173 + type addTargetBody struct { 174 + Host string 175 + } 176 + 177 + // the ding-dong api 178 + func (bgs *BGS) handleAddTarget(c echo.Context) error { 179 + var body addTargetBody 180 + if err := c.Bind(&body); err != nil { 181 + return err 182 + } 183 + 184 + return bgs.slurper.SubscribeToPds(c.Request().Context(), body.Host) 185 + } 186 + 187 + func (bgs *BGS) EventsHandler(c echo.Context) error { 188 + did := c.Request().Header.Get("DID") 189 + conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) 190 + if err != nil { 191 + return err 192 + } 193 + 194 + var peering PDS 195 + if err := bgs.db.First(&peering, "did = ?", did).Error; err != nil { 196 + return err 197 + } 198 + 199 + evts, cancel, err := bgs.events.Subscribe(func(evt *events.Event) bool { return true }) 200 + if err != nil { 201 + return err 202 + } 203 + defer cancel() 204 + 205 + for evt := range evts { 206 + if err := conn.WriteJSON(evt); err != nil { 207 + return err 208 + } 209 + } 210 + 211 + return nil 212 + } 213 + 214 + func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error) { 215 + var u User 216 + if err := bgs.db.First(&u, "did = ?", did).Error; err != nil { 217 + return nil, err 218 + } 219 + 220 + return &u, nil 221 + } 222 + 223 + func (bgs *BGS) handleFedEvent(ctx context.Context, host *PDS, evt *events.Event) error { 224 + log.Printf("got fed event from %q: %s\n", host.Host, evt.Kind) 225 + switch evt.Kind { 226 + case events.EvtKindRepoChange: 227 + u, err := bgs.lookupUserByDid(ctx, evt.User) 228 + if err != nil { 229 + if !errors.Is(err, gorm.ErrRecordNotFound) { 230 + return fmt.Errorf("looking up event user: %w", err) 231 + } 232 + 233 + subj, err := bgs.createExternalUser(ctx, evt.User) 234 + if err != nil { 235 + return err 236 + } 237 + 238 + u = new(User) 239 + u.ID = subj.Uid 240 + } 241 + 242 + return bgs.repoman.HandleExternalUserEvent(ctx, host.ID, repomgr.EvtKindCreateRecord, u.ID, evt.RepoOps, evt.CarSlice) 243 + default: 244 + return fmt.Errorf("unrecognized fed event kind: %q", evt.Kind) 245 + } 246 + return nil 247 + } 248 + 249 + func (s *BGS) createExternalUser(ctx context.Context, did string) (*types.ActorInfo, error) { 250 + doc, err := s.didr.GetDocument(ctx, did) 251 + if err != nil { 252 + return nil, fmt.Errorf("could not locate DID document for followed user: %s", err) 253 + } 254 + 255 + if len(doc.Service) == 0 { 256 + return nil, fmt.Errorf("external followed user %s had no services in did document", did) 257 + } 258 + 259 + svc := doc.Service[0] 260 + durl, err := url.Parse(svc.ServiceEndpoint) 261 + if err != nil { 262 + return nil, err 263 + } 264 + 265 + // TODO: the PDS's DID should also be in the service, we could use that to look up? 266 + var peering PDS 267 + if err := s.db.First(&peering, "host = ?", durl.Host).Error; err != nil { 268 + return nil, err 269 + } 270 + 271 + var handle string 272 + if len(doc.AlsoKnownAs) > 0 { 273 + hurl, err := url.Parse(doc.AlsoKnownAs[0]) 274 + if err != nil { 275 + return nil, err 276 + } 277 + 278 + handle = hurl.Host 279 + } 280 + 281 + c := &xrpc.Client{Host: svc.ServiceEndpoint} 282 + profile, err := bsky.ActorGetProfile(ctx, c, did) 283 + if err != nil { 284 + return nil, err 285 + } 286 + 287 + if handle != profile.Handle { 288 + return nil, fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle) 289 + } 290 + 291 + // TODO: request this users info from their server to fill out our data... 292 + u := User{ 293 + Handle: handle, 294 + Did: did, 295 + PDS: peering.ID, 296 + } 297 + 298 + if err := s.db.Create(&u).Error; err != nil { 299 + return nil, fmt.Errorf("failed to create other pds user: %w", err) 300 + } 301 + 302 + // okay cool, its a user on a server we are peered with 303 + // lets make a local record of that user for the future 304 + subj := &types.ActorInfo{ 305 + Uid: u.ID, 306 + Handle: handle, 307 + DisplayName: *profile.DisplayName, 308 + Did: did, 309 + DeclRefCid: profile.Declaration.Cid, 310 + Type: "", 311 + PDS: peering.ID, 312 + } 313 + if err := s.db.Create(subj).Error; err != nil { 314 + return nil, err 315 + } 316 + 317 + return subj, nil 318 + }
+39
cmd/gosky/util/util.go
··· 7 7 "net/http" 8 8 "os" 9 9 "path/filepath" 10 + "strings" 10 11 "time" 11 12 12 13 homedir "github.com/mitchellh/go-homedir" 13 14 "github.com/urfave/cli/v2" 14 15 "github.com/whyrusleeping/gosky/api" 15 16 "github.com/whyrusleeping/gosky/xrpc" 17 + "gorm.io/driver/postgres" 18 + "gorm.io/driver/sqlite" 19 + "gorm.io/gorm" 16 20 ) 17 21 18 22 func GetPLCClient(cctx *cli.Context) *api.PLCServer { ··· 158 162 159 163 return &auth, nil 160 164 } 165 + 166 + func SetupDatabase(dbval string) (*gorm.DB, error) { 167 + parts := strings.SplitN(dbval, "=", 2) 168 + if len(parts) == 1 { 169 + return nil, fmt.Errorf("format for database string is 'DBTYPE=PARAMS'") 170 + } 171 + 172 + var dial gorm.Dialector 173 + switch parts[0] { 174 + case "sqlite": 175 + dial = sqlite.Open(parts[1]) 176 + case "postgres": 177 + dial = postgres.Open(parts[1]) 178 + default: 179 + return nil, fmt.Errorf("unsupported or unrecognized db type: %s", parts[0]) 180 + } 181 + 182 + db, err := gorm.Open(dial, &gorm.Config{ 183 + SkipDefaultTransaction: true, 184 + }) 185 + if err != nil { 186 + return nil, err 187 + } 188 + 189 + sqldb, err := db.DB() 190 + if err != nil { 191 + return nil, err 192 + } 193 + 194 + sqldb.SetMaxIdleConns(80) 195 + sqldb.SetMaxOpenConns(99) 196 + sqldb.SetConnMaxIdleTime(time.Hour) 197 + 198 + return db, nil 199 + }
+4 -4
indexer/indexer.go
··· 181 181 return nil, err 182 182 } 183 183 184 - act, err := ix.lookupUserByDid(ctx, puri.Did) 184 + act, err := ix.LookupUserByDid(ctx, puri.Did) 185 185 if err != nil { 186 186 return nil, err 187 187 } ··· 216 216 } 217 217 218 218 case *bsky.GraphFollow: 219 - subj, err := ix.lookupUserByDid(ctx, rec.Subject.Did) 219 + subj, err := ix.LookupUserByDid(ctx, rec.Subject.Did) 220 220 if err != nil { 221 221 if !errors.Is(err, gorm.ErrRecordNotFound) { 222 222 return nil, fmt.Errorf("failed to lookup user: %w", err) ··· 279 279 return &ai, nil 280 280 } 281 281 282 - func (ix *Indexer) lookupUserByDid(ctx context.Context, did string) (*types.ActorInfo, error) { 282 + func (ix *Indexer) LookupUserByDid(ctx context.Context, did string) (*types.ActorInfo, error) { 283 283 var ai types.ActorInfo 284 284 if err := ix.db.First(&ai, "did = ?", did).Error; err != nil { 285 285 return nil, err ··· 313 313 for _, e := range post.Entities { 314 314 switch e.Type { 315 315 case "mention": 316 - mentioned, err := ix.lookupUserByDid(ctx, e.Value) 316 + mentioned, err := ix.LookupUserByDid(ctx, e.Value) 317 317 if err != nil { 318 318 return fmt.Errorf("mentioned user does not exist: %w", err) 319 319 }
+10 -9
notifs/notifs.go
··· 6 6 "time" 7 7 8 8 "github.com/ipfs/go-cid" 9 + cbg "github.com/whyrusleeping/cbor-gen" 9 10 appbskytypes "github.com/whyrusleeping/gosky/api/bsky" 10 11 "github.com/whyrusleeping/gosky/lex/util" 11 - "github.com/whyrusleeping/gosky/repomgr" 12 12 "github.com/whyrusleeping/gosky/types" 13 13 "gorm.io/gorm" 14 14 "gorm.io/gorm/clause" ··· 17 17 type NotificationManager struct { 18 18 db *gorm.DB 19 19 20 - repoman *repomgr.RepoManager 20 + getRecord GetRecord 21 21 } 22 + type GetRecord func(ctx context.Context, user uint, collection string, rkey string, maybeCid cid.Cid) (cid.Cid, cbg.CBORMarshaler, error) 22 23 23 - func NewNotificationManager(db *gorm.DB, rmgr *repomgr.RepoManager) *NotificationManager { 24 + func NewNotificationManager(db *gorm.DB, getrec GetRecord) *NotificationManager { 24 25 db.AutoMigrate(&NotifRecord{}) 25 26 db.AutoMigrate(&NotifSeen{}) 26 27 27 28 return &NotificationManager{ 28 - db: db, 29 - repoman: rmgr, 29 + db: db, 30 + getRecord: getrec, 30 31 } 31 32 } 32 33 ··· 146 147 return nil, err 147 148 } 148 149 149 - _, rec, err := nm.repoman.GetRecord(ctx, voter.ID, "app.bsky.feed.vote", vote.Rkey, cid.Undef) 150 + _, rec, err := nm.getRecord(ctx, voter.ID, "app.bsky.feed.vote", vote.Rkey, cid.Undef) 150 151 if err != nil { 151 152 return nil, fmt.Errorf("getting vote: %w", err) 152 153 } ··· 186 187 return nil, err 187 188 } 188 189 189 - _, rec, err := nm.repoman.GetRecord(ctx, reposter.ID, "app.bsky.feed.repost", repost.Rkey, cid.Undef) 190 + _, rec, err := nm.getRecord(ctx, reposter.ID, "app.bsky.feed.repost", repost.Rkey, cid.Undef) 190 191 if err != nil { 191 192 return nil, fmt.Errorf("getting repost: %w", err) 192 193 } ··· 231 232 return nil, err 232 233 } 233 234 234 - _, rec, err := nm.repoman.GetRecord(ctx, author.ID, "app.bsky.feed.post", fp.Rkey, cid.Undef) 235 + _, rec, err := nm.getRecord(ctx, author.ID, "app.bsky.feed.post", fp.Rkey, cid.Undef) 235 236 if err != nil { 236 237 return nil, err 237 238 } ··· 261 262 return nil, err 262 263 } 263 264 264 - _, rec, err := nm.repoman.GetRecord(ctx, follower.ID, "app.bsky.graph.follow", frec.Rkey, cid.Undef) 265 + _, rec, err := nm.getRecord(ctx, follower.ID, "app.bsky.graph.follow", frec.Rkey, cid.Undef) 265 266 if err != nil { 266 267 return nil, err 267 268 }
+15 -5
server/fedmgr.go
··· 11 11 12 12 "github.com/gorilla/websocket" 13 13 "github.com/whyrusleeping/gosky/events" 14 + "github.com/whyrusleeping/gosky/key" 15 + "gorm.io/gorm" 14 16 ) 15 17 16 - type IndexCallback func(context.Context, string, *events.Event) error 18 + type IndexCallback func(context.Context, *Peering, *events.Event) error 17 19 18 - func (s *Server) SubscribeToPds(ctx context.Context, host string) error { 20 + // TODO: rename me 21 + type Slurper struct { 22 + cb IndexCallback 23 + 24 + db *gorm.DB 25 + signingKey *key.Key 26 + } 27 + 28 + func (s *Slurper) SubscribeToPds(ctx context.Context, host string) error { 19 29 var peering Peering 20 30 if err := s.db.First(&peering, "host = ?", host).Error; err != nil { 21 31 return err ··· 26 36 return nil 27 37 } 28 38 29 - func (s *Server) subscribeWithRedialer(host *Peering) { 39 + func (s *Slurper) subscribeWithRedialer(host *Peering) { 30 40 d := websocket.Dialer{} 31 41 32 42 var backoff int ··· 63 73 return time.Second * 30 64 74 } 65 75 66 - func (s *Server) handleConnection(host *Peering, con *websocket.Conn) error { 76 + func (s *Slurper) handleConnection(host *Peering, con *websocket.Conn) error { 67 77 for { 68 78 mt, data, err := con.ReadMessage() 69 79 if err != nil { ··· 78 88 } 79 89 80 90 fmt.Println("got event: ", host.Host, ev.Kind) 81 - if err := s.handleFedEvent(context.TODO(), host, &ev); err != nil { 91 + if err := s.cb(context.TODO(), host, &ev); err != nil { 82 92 log.Printf("failed to index event from %q: %s", host.Host, err) 83 93 } 84 94 }
+9 -2
server/server.go
··· 44 44 notifman *notifs.NotificationManager 45 45 indexer *indexer.Indexer 46 46 events *events.EventManager 47 + slurper *Slurper 47 48 signingKey *key.Key 48 49 echo *echo.Echo 49 50 jwtSigningKey []byte ··· 70 71 evtman := events.NewEventManager() 71 72 72 73 repoman := repomgr.NewRepoManager(db, cs) 73 - notifman := notifs.NewNotificationManager(db, repoman) 74 + notifman := notifs.NewNotificationManager(db, repoman.GetRecord) 74 75 75 76 ix, err := indexer.NewIndexer(db, notifman, evtman, didr) 76 77 if err != nil { ··· 89 90 handleSuffix: handleSuffix, 90 91 serviceUrl: serviceUrl, 91 92 jwtSigningKey: jwtkey, 93 + } 94 + 95 + s.slurper = &Slurper{ 96 + cb: s.handleFedEvent, 97 + db: db, 98 + signingKey: serkey, 92 99 } 93 100 94 101 repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) { ··· 560 567 return err 561 568 } 562 569 563 - if err := s.SubscribeToPds(context.TODO(), host); err != nil { 570 + if err := s.slurper.SubscribeToPds(context.TODO(), host); err != nil { 564 571 return err 565 572 } 566 573