this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

wip

dholms a2294269 0f22fe8b

+372
+91
nexus/firehose.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "net/http" 8 + "net/url" 9 + "strings" 10 + 11 + comatproto "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/events" 13 + "github.com/bluesky-social/indigo/events/schedulers/parallel" 14 + "github.com/gorilla/websocket" 15 + ) 16 + 17 + func (nexus *Nexus) SubscribeFirehose(ctx context.Context) error { 18 + relayHost := "https://bsky.network" 19 + 20 + dialer := websocket.DefaultDialer 21 + u, err := url.Parse(relayHost) 22 + if err != nil { 23 + return fmt.Errorf("invalid relayHost URI: %w", err) 24 + } 25 + switch u.Scheme { 26 + case "http": 27 + u.Scheme = "ws" 28 + case "https": 29 + u.Scheme = "wss" 30 + } 31 + u.Path = "xrpc/com.atproto.sync.subscribeRepos" 32 + // if cmd.IsSet("cursor") { 33 + // u.RawQuery = fmt.Sprintf("cursor=%d", cmd.Int("cursor")) 34 + // } 35 + urlString := u.String() 36 + con, _, err := dialer.Dial(urlString, http.Header{ 37 + // "User-Agent": []string{*userAgent()}, 38 + }) 39 + if err != nil { 40 + return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 41 + } 42 + 43 + rsc := &events.RepoStreamCallbacks{ 44 + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 45 + return nexus.handleCommitEvent(ctx, evt) 46 + }, 47 + RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 48 + return nil 49 + }, 50 + RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 51 + return nil 52 + }, 53 + RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 54 + return nil 55 + }, 56 + } 57 + 58 + scheduler := parallel.NewScheduler( 59 + 1, 60 + 100, 61 + relayHost, 62 + rsc.EventHandler, 63 + ) 64 + slog.Info("starting firehose consumer", "relayHost", relayHost) 65 + err = events.HandleRepoStream(ctx, con, scheduler, nil) 66 + 67 + if err != nil { 68 + return err 69 + } 70 + return events.HandleRepoStream(ctx, con, scheduler, nil) 71 + } 72 + 73 + func (nexus *Nexus) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 74 + if _, exists := nexus.filterDids[evt.Repo]; !exists { 75 + return nil 76 + } 77 + for _, op := range evt.Ops { 78 + parts := strings.Split(op.Path, "/") 79 + collection := parts[0] 80 + rkey := parts[1] 81 + err := nexus.outbox.Send(&Op{ 82 + DID: evt.Repo, 83 + Collection: collection, 84 + Rkey: rkey, 85 + }) 86 + if err != nil { 87 + return err 88 + } 89 + } 90 + return nil 91 + }
+75
nexus/handlers.go
··· 1 + package main 2 + 3 + import ( 4 + "net/http" 5 + 6 + "github.com/bluesky-social/indigo/nexus/models" 7 + "github.com/gorilla/websocket" 8 + "github.com/labstack/echo/v4" 9 + "gorm.io/gorm/clause" 10 + ) 11 + 12 + var upgrader = websocket.Upgrader{ 13 + CheckOrigin: func(r *http.Request) bool { 14 + return true 15 + }, 16 + } 17 + 18 + func (n *Nexus) registerRoutes() { 19 + n.echo.GET("/health", n.handleHealthcheck) 20 + n.echo.GET("/listen", n.handleListen) 21 + n.echo.POST("/add-dids", n.handleAddDids) 22 + } 23 + 24 + func (n *Nexus) handleHealthcheck(c echo.Context) error { 25 + return c.JSON(200, map[string]string{ 26 + "status": "ok", 27 + }) 28 + } 29 + 30 + func (n *Nexus) handleListen(c echo.Context) error { 31 + // Upgrade to WebSocket 32 + ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) 33 + if err != nil { 34 + return err 35 + } 36 + defer ws.Close() 37 + 38 + n.logger.Info("websocket connected") 39 + 40 + for op := range n.outbox.outCh { 41 + if err := ws.WriteJSON(op); err != nil { 42 + n.logger.Info("websocket write error", "error", err) 43 + return nil 44 + } 45 + } 46 + return nil 47 + } 48 + 49 + type DidPayload struct { 50 + DIDs []string `json:"dids"` 51 + } 52 + 53 + func (n *Nexus) handleAddDids(c echo.Context) error { 54 + var payload DidPayload 55 + if err := c.Bind(&payload); err != nil { 56 + return err 57 + } 58 + 59 + rows := make([]models.FilterDid, 0, len(payload.DIDs)) 60 + for _, did := range payload.DIDs { 61 + rows = append(rows, models.FilterDid{Did: did}) 62 + } 63 + 64 + err := n.db.Clauses(clause.OnConflict{DoNothing: true}).Create(&rows).Error 65 + if err != nil { 66 + n.logger.Error("failed to insert dids", "error", err) 67 + return echo.NewHTTPError(http.StatusInternalServerError) 68 + } 69 + 70 + for _, did := range payload.DIDs { 71 + n.filterDids[did] = true 72 + } 73 + 74 + return c.NoContent(http.StatusOK) 75 + }
+53
nexus/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "log" 6 + "os" 7 + "os/signal" 8 + "syscall" 9 + "time" 10 + ) 11 + 12 + func main() { 13 + nexus, err := NewNexus(NexusConfig{ 14 + DBPath: "./nexus.db", 15 + }) 16 + if err != nil { 17 + log.Fatal(err) 18 + } 19 + 20 + fhCtx, fhCancel := context.WithCancel(context.Background()) 21 + go func() { 22 + err := nexus.SubscribeFirehose(fhCtx) 23 + if err != nil { 24 + log.Printf("Firehose error: %v", err) 25 + } 26 + }() 27 + 28 + // Start server in goroutine 29 + go func() { 30 + if err := nexus.Start(context.Background(), ":8080"); err != nil { 31 + log.Printf("Server error: %v", err) 32 + } 33 + }() 34 + 35 + // Wait for interrupt signal 36 + quit := make(chan os.Signal, 1) 37 + signal.Notify(quit, os.Interrupt, syscall.SIGTERM) 38 + <-quit 39 + 40 + log.Println("Shutting down server...") 41 + 42 + fhCancel() 43 + 44 + // Graceful shutdown with timeout 45 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 46 + defer cancel() 47 + 48 + if err := nexus.Shutdown(ctx); err != nil { 49 + log.Fatal(err) 50 + } 51 + 52 + log.Println("Server stopped") 53 + }
+15
nexus/models/models.go
··· 1 + package models 2 + 3 + type FilterDid struct { 4 + Did string `gorm:"primaryKey"` 5 + } 6 + 7 + type FilterCollection struct { 8 + Collection string `gorm:"primaryKey"` 9 + } 10 + 11 + type BufferedEvt struct { 12 + ID uint `gorm:"primaryKey"` 13 + Did string `gorm:"not null;index"` 14 + Evt map[string]interface{} `gorm:"serializer:json"` 15 + }
+106
nexus/nexus.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + 7 + "github.com/bluesky-social/indigo/nexus/models" 8 + "github.com/labstack/echo/v4" 9 + "gorm.io/driver/sqlite" 10 + "gorm.io/gorm" 11 + ) 12 + 13 + type Nexus struct { 14 + db *gorm.DB 15 + echo *echo.Echo 16 + logger *slog.Logger 17 + 18 + filterDids map[string]bool 19 + 20 + outbox *Outbox 21 + } 22 + 23 + type Op struct { 24 + DID string `json:"did"` 25 + Collection string `json:"collection"` 26 + Rkey string `json:"rkey"` 27 + } 28 + 29 + type NexusConfig struct { 30 + DBPath string 31 + } 32 + 33 + func NewNexus(config NexusConfig) (*Nexus, error) { 34 + // Open SQLite DB with GORM 35 + db, err := gorm.Open(sqlite.Open(config.DBPath), &gorm.Config{}) 36 + if err != nil { 37 + return nil, err 38 + } 39 + 40 + // Auto-migrate the schema 41 + if err := db.AutoMigrate(&models.BufferedEvt{}, &models.FilterCollection{}, &models.FilterDid{}); err != nil { 42 + return nil, err 43 + } 44 + 45 + // Create Echo instance 46 + e := echo.New() 47 + e.HideBanner = true 48 + 49 + n := &Nexus{ 50 + db: db, 51 + echo: e, 52 + logger: slog.Default().With("system", "nexus"), 53 + 54 + filterDids: make(map[string]bool), 55 + 56 + outbox: NewOutbox(db), 57 + } 58 + 59 + err = n.LoadFilters() 60 + if err != nil { 61 + return nil, err 62 + } 63 + 64 + // Register routes 65 + n.registerRoutes() 66 + 67 + return n, nil 68 + } 69 + 70 + func (n *Nexus) Start(ctx context.Context, addr string) error { 71 + n.logger.Info("starting nexus server", "addr", addr) 72 + return n.echo.Start(addr) 73 + } 74 + 75 + func (n *Nexus) Shutdown(ctx context.Context) error { 76 + n.logger.Info("shutting down nexus server") 77 + if err := n.echo.Shutdown(ctx); err != nil { 78 + n.logger.Error("error shutting down echo", "error", err) 79 + } 80 + 81 + sqlDB, err := n.db.DB() 82 + if err != nil { 83 + n.logger.Error("error getting sql db", "error", err) 84 + return err 85 + } 86 + 87 + if err := sqlDB.Close(); err != nil { 88 + n.logger.Error("error closing sqlite db", "error", err) 89 + return err 90 + } 91 + 92 + return nil 93 + } 94 + 95 + func (n *Nexus) LoadFilters() error { 96 + var filterDids []models.FilterDid 97 + if err := n.db.Find(&filterDids).Error; err != nil { 98 + return err 99 + } 100 + 101 + for _, f := range filterDids { 102 + n.filterDids[f.Did] = true 103 + } 104 + 105 + return nil 106 + }
+32
nexus/outbox.go
··· 1 + package main 2 + 3 + import "gorm.io/gorm" 4 + 5 + type Outbox struct { 6 + db *gorm.DB 7 + outCh chan *Op 8 + } 9 + 10 + func NewOutbox(db *gorm.DB) *Outbox { 11 + return &Outbox{ 12 + db: db, 13 + outCh: make(chan *Op), 14 + } 15 + } 16 + 17 + func (o *Outbox) Send(op *Op) error { 18 + o.outCh <- op 19 + return nil 20 + } 21 + 22 + // func (nexus *Nexus) handleOp(op *Op) error { 23 + // select { 24 + // case nexus.evtCh <- op: 25 + // default: 26 + // err := nexus.db.Create(&BufferedEvt{Did: op.DID, Evt: *op}).Error 27 + // if err != nil { 28 + // return err 29 + // } 30 + // } 31 + // return nil 32 + // }