An atproto PDS written in Go
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

allow for a push based event emitter

Signed-off-by: Will Andrews <will7989@hotmail.com>

+137 -18
+14 -4
cmd/cocoon/main.go
··· 169 169 }, 170 170 telemetry.CLIFlagDebug, 171 171 telemetry.CLIFlagMetricsListenAddress, 172 + &cli.StringFlag{ 173 + Name: "subscribe-repos-service-url", 174 + EnvVars: []string{"SUBSCRIBE_REPOS_SERVICE_URL"}, 175 + }, 176 + &cli.BoolFlag{ 177 + Name: "push-based-events", 178 + EnvVars: []string{"PUSH_BASED_EVENTS"}, 179 + }, 172 180 }, 173 181 Commands: []*cli.Command{ 174 182 runServe, ··· 249 257 SecretKey: cmd.String("s3-secret-key"), 250 258 CDNUrl: cmd.String("s3-cdn-url"), 251 259 }, 252 - SessionSecret: cmd.String("session-secret"), 253 - SessionCookieKey: cmd.String("session-cookie-key"), 254 - BlockstoreVariant: server.MustReturnBlockstoreVariant(cmd.String("blockstore-variant")), 255 - FallbackProxy: cmd.String("fallback-proxy"), 260 + SessionSecret: cmd.String("session-secret"), 261 + SessionCookieKey: cmd.String("session-cookie-key"), 262 + BlockstoreVariant: server.MustReturnBlockstoreVariant(cmd.String("blockstore-variant")), 263 + FallbackProxy: cmd.String("fallback-proxy"), 264 + PushBasedEvents: cmd.Bool("push-based-events"), 265 + SubscribeReposServiceURL: cmd.String("subscribe-repos-service-url"), 256 266 }) 257 267 if err != nil { 258 268 fmt.Printf("error creating cocoon: %v", err)
+92
server/event_emmiter.go
··· 1 + package server 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "net/http" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/events" 10 + "github.com/bluesky-social/indigo/lex/util" 11 + ) 12 + 13 + func (s *Server) emmitEvents(ctx context.Context) error { 14 + ctx, cancel := context.WithCancel(ctx) 15 + defer cancel() 16 + 17 + logger := s.logger.With("component", "event-emmiter") 18 + ident := "self" 19 + var since *int64 20 + // TODO: track since 21 + 22 + evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 23 + return true 24 + }, since) 25 + if err != nil { 26 + return err 27 + } 28 + defer evtManCancel() 29 + 30 + header := events.EventHeader{Op: events.EvtKindMessage} 31 + for evt := range evts { 32 + func() { 33 + if ctx.Err() != nil { 34 + logger.Error("context error", "err", err) 35 + return 36 + } 37 + 38 + var obj util.CBOR 39 + switch { 40 + case evt.Error != nil: 41 + header.Op = events.EvtKindErrorFrame 42 + obj = evt.Error 43 + case evt.RepoCommit != nil: 44 + header.MsgType = "#commit" 45 + obj = evt.RepoCommit 46 + case evt.RepoIdentity != nil: 47 + header.MsgType = "#identity" 48 + obj = evt.RepoIdentity 49 + case evt.RepoAccount != nil: 50 + header.MsgType = "#account" 51 + obj = evt.RepoAccount 52 + case evt.RepoInfo != nil: 53 + header.MsgType = "#info" 54 + obj = evt.RepoInfo 55 + default: 56 + logger.Warn("unrecognized event kind") 57 + return 58 + } 59 + 60 + buf := new(bytes.Buffer) 61 + 62 + if err := header.MarshalCBOR(buf); err != nil { 63 + logger.Error("failed to marshal header to buffer", "err", err) 64 + return 65 + } 66 + 67 + if err := obj.MarshalCBOR(buf); err != nil { 68 + logger.Error("failed to marshal event to buffer", "err", err) 69 + return 70 + } 71 + 72 + // TODO: use a HTTP client here not the default 73 + _, err := http.Post(s.config.SubscribeReposServiceURL, "", buf) 74 + if err != nil { 75 + logger.Error("posting to web server", "error", err) 76 + return 77 + } 78 + }() 79 + } 80 + 81 + // we should tell the relay to request a new crawl at this point if we got disconnected 82 + // use a new context since the old one might be cancelled at this point 83 + go func() { 84 + retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 85 + defer retryCancel() 86 + if err := s.requestCrawl(retryCtx); err != nil { 87 + logger.Error("error requesting crawls", "err", err) 88 + } 89 + }() 90 + 91 + return nil 92 + }
+31 -14
server/server.go
··· 124 124 125 125 BlockstoreVariant BlockstoreVariant 126 126 FallbackProxy string 127 + 128 + PushBasedEvents bool 129 + SubscribeReposServiceURL string 127 130 } 128 131 129 132 type config struct { ··· 141 144 SessionCookieKey string 142 145 BlockstoreVariant BlockstoreVariant 143 146 FallbackProxy string 147 + 148 + PushBasedEvents bool 149 + SubscribeReposServiceURL string 144 150 } 145 151 146 152 type CustomValidator struct { ··· 435 441 plcClient: plcClient, 436 442 privateKey: &pkey, 437 443 config: &config{ 438 - LogLevel: args.LogLevel, 439 - Version: args.Version, 440 - Did: args.Did, 441 - Hostname: args.Hostname, 442 - ContactEmail: args.ContactEmail, 443 - EnforcePeering: false, 444 - Relays: args.Relays, 445 - AdminPassword: args.AdminPassword, 446 - RequireInvite: args.RequireInvite, 447 - SmtpName: args.SmtpName, 448 - SmtpEmail: args.SmtpEmail, 449 - SessionCookieKey: args.SessionCookieKey, 450 - BlockstoreVariant: args.BlockstoreVariant, 451 - FallbackProxy: args.FallbackProxy, 444 + LogLevel: args.LogLevel, 445 + Version: args.Version, 446 + Did: args.Did, 447 + Hostname: args.Hostname, 448 + ContactEmail: args.ContactEmail, 449 + EnforcePeering: false, 450 + Relays: args.Relays, 451 + AdminPassword: args.AdminPassword, 452 + RequireInvite: args.RequireInvite, 453 + SmtpName: args.SmtpName, 454 + SmtpEmail: args.SmtpEmail, 455 + SessionCookieKey: args.SessionCookieKey, 456 + BlockstoreVariant: args.BlockstoreVariant, 457 + FallbackProxy: args.FallbackProxy, 458 + PushBasedEvents: args.PushBasedEvents, 459 + SubscribeReposServiceURL: args.SubscribeReposServiceURL, 452 460 }, 453 461 evtman: events.NewEventManager(evtPersister), 454 462 passport: identity.NewPassport(h, identity.NewMemCache(10_000)), ··· 634 642 logger.Error("error requesting crawls", "err", err) 635 643 } 636 644 }() 645 + 646 + if s.config.PushBasedEvents { 647 + slog.Info("pushed based events enabled") 648 + go func() { 649 + if err := s.emmitEvents(ctx); err != nil { 650 + logger.Error("error emitting events", "err", err) 651 + } 652 + }() 653 + } 637 654 638 655 <-ctx.Done() 639 656