this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

cleanup labelmaker server code

Refactoring things around a bit

+213 -174
+17 -5
cmd/labelmaker/main.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 4 5 "os" 5 6 "path/filepath" 6 7 ··· 74 75 Name: "subscribe-insecure-ws", 75 76 Usage: "when connecting to BGS instance, use ws:// instead of wss://", 76 77 }, 78 + &cli.StringFlag{ 79 + Name: "repo-did", 80 + Usage: "DID for labelmaker repo", 81 + Value: "did:plc:FAKE", 82 + }, 83 + &cli.StringFlag{ 84 + Name: "repo-handle", 85 + Usage: "handle for labelmaker repo", 86 + Value: "labelmaker.test", 87 + }, 77 88 } 78 89 79 90 app.Action = func(cctx *cli.Context) error { ··· 82 93 datadir := cctx.String("data-dir") 83 94 csdir := filepath.Join(datadir, "carstore") 84 95 os.MkdirAll(datadir, os.ModePerm) 96 + repoKeyPath := filepath.Join(datadir, "labelmaker.key") 85 97 86 98 dburl := cctx.String("db-url") 87 99 db, err := cliutil.SetupDatabase(dburl) ··· 110 122 return err 111 123 } 112 124 113 - // TODO: additional config work to be done 114 - repoDid := "did:plc:FAKE" 115 - repoHandle := "labelmaker.test" 116 - repoKeyPath := "data/labelmaker/labelmaker.key" 117 125 bgsUrl := cctx.String("bgs-host") 118 126 plcUrl := cctx.String("plc-host") 127 + useWss := !cctx.Bool("subscribe-insecure-ws") 128 + repoDid := cctx.String("repo-did") 129 + repoHandle := cctx.String("repo-handle") 119 130 120 - srv, err := labeling.NewServer(db, cstore, repoKeyPath, repoDid, repoHandle, bgsUrl, plcUrl) 131 + srv, err := labeling.NewServer(db, cstore, repoKeyPath, repoDid, repoHandle, plcUrl, useWss) 121 132 if err != nil { 122 133 return err 123 134 } 124 135 136 + srv.SubscribeBGS(context.TODO(), bgsUrl, useWss) 125 137 return srv.RunAPI(":2210") 126 138 } 127 139
+130 -165
labeling/service.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 - "crypto/ecdsa" 7 6 "fmt" 8 - "os" 9 7 "strings" 10 8 "time" 11 9 12 10 "github.com/bluesky-social/indigo/api" 13 - bsky "github.com/bluesky-social/indigo/api/bsky" 11 + appbsky "github.com/bluesky-social/indigo/api/bsky" 14 12 "github.com/bluesky-social/indigo/bgs" 15 13 "github.com/bluesky-social/indigo/carstore" 16 14 "github.com/bluesky-social/indigo/events" 17 15 "github.com/bluesky-social/indigo/indexer" 18 - lexutil "github.com/bluesky-social/indigo/lex/util" 19 16 "github.com/bluesky-social/indigo/models" 20 17 "github.com/bluesky-social/indigo/pds" 21 18 "github.com/bluesky-social/indigo/repo" 22 19 "github.com/bluesky-social/indigo/repomgr" 23 20 util "github.com/bluesky-social/indigo/util" 21 + cbg "github.com/whyrusleeping/cbor-gen" 24 22 25 23 "github.com/ipfs/go-cid" 26 24 logging "github.com/ipfs/go-log" 27 25 "github.com/labstack/echo/v4" 28 26 "github.com/labstack/echo/v4/middleware" 29 - "github.com/lestrrat-go/jwx/jwa" 30 - jwk "github.com/lestrrat-go/jwx/jwk" 31 27 "github.com/whyrusleeping/go-did" 32 28 "gorm.io/gorm" 33 29 ) ··· 52 48 userId util.Uid 53 49 } 54 50 55 - func NewServer(db *gorm.DB, cs *carstore.CarStore, keyFile, repoDid, repoHandle, bgsUrl, plcUrl string) (*Server, error) { 51 + // In addition to configuring the service, will connect to upstream BGS and start processing events. Won't handle HTTP or WebSocket endpoints until RunAPI() is called. 52 + // 'useWss' is a flag to use SSL for outbound WebSocket connections 53 + func NewServer(db *gorm.DB, cs *carstore.CarStore, keyFile, repoDid, repoHandle, plcUrl string, useWss bool) (*Server, error) { 56 54 57 55 serkey, err := loadKey(keyFile) 58 56 if err != nil { ··· 73 71 userId: 1, 74 72 } 75 73 76 - var kl = KeywordLabeler{value: "rude", keywords: []string{"🍆", "sex", "ab", "before", "yours", "the"}} 74 + var kl = KeywordLabeler{value: "meta", keywords: []string{"the", "bluesky", "atproto"}} 77 75 78 76 s := &Server{ 79 77 db: db, ··· 85 83 } 86 84 87 85 // ensure that local labelmaker repo exists 88 - // NOTE: doesn't need to have app.bsky profile and actor config, this is just expedient (reusing helper) 86 + // NOTE: doesn't need to have app.bsky profile and actor config, this is just expediant (reusing an existing helper function) 89 87 ctx := context.Background() 90 88 head, _ := s.repoman.GetRepoRoot(ctx, s.user.userId) 91 89 if head == cid.Undef { ··· 97 95 log.Infof("found labelmaker repo: %s", head) 98 96 } 99 97 100 - // TODO(bnewbold): enforce ssl (last boolean argument here) 101 - slurp := bgs.NewSlurper(db, s.handleBgsRepoEvent, false) 98 + slurp := bgs.NewSlurper(db, s.handleBgsRepoEvent, useWss) 102 99 s.bgsSlurper = slurp 103 100 104 - // subscribe our RepoEvent slurper to the BGS, to receive incoming records for labeler 105 - useWebsocketSSL := false 106 - log.Infof("subscribing to BGS: %s (SSL=%v)", bgsUrl, useWebsocketSSL) 107 - s.bgsSlurper.SubscribeToPds(ctx, bgsUrl, useWebsocketSSL) 108 - 109 - // NOTE: this is where outgoing RepoEvents could be generated 110 - // should skip indexing (we are not a PDS) and just ship out repo event stream 111 - /* 112 - repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) { 113 - if err := ix.HandleRepoEvent(ctx, evt); err != nil { 114 - log.Errorw("handle repo event failed", "user", evt.User, "err", err) 115 - } 116 - }) 117 - */ 118 - 119 101 go levtman.Run() 120 102 121 103 return s, nil 122 104 } 123 105 124 - func (s *Server) Shutdown(ctx context.Context) error { 125 - return s.echo.Shutdown(ctx) 106 + func (s *Server) SubscribeBGS(ctx context.Context, bgsUrl string, useWss bool) { 107 + // subscribe our RepoEvent slurper to the BGS, to receive incoming records for labeler 108 + log.Infof("subscribing to BGS: %s (SSL=%v)", bgsUrl, useWss) 109 + s.bgsSlurper.SubscribeToPds(ctx, bgsUrl, useWss) 126 110 } 127 111 128 - // incoming repo events 129 - func (s *Server) handleBgsRepoEvent(ctx context.Context, pds *models.PDS, evt *events.RepoStreamEvent) error { 130 - now := time.Now().Format(util.ISO8601) 131 - switch { 132 - case evt.Append != nil: 133 - // this is where we take incoming RepoEvents and label them 134 - // use an in-memory blockstore with repo wrapper to parse CAR 135 - // NOTE: could refactor to parse ops first, so we don't bother parsing the CAR if there are no posts/profiles to process (a common case, for likes/follows/reposts/etc) 136 - sliceRepo, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Append.Blocks)) 137 - if err != nil { 138 - log.Warnw("failed to parse CAR slice", "repoErr", err) 139 - return err 112 + // efficiency predicate to quickly discard events we know won't want to even parse 113 + func (s *Server) wantAnyRecords(ctx context.Context, ra *events.RepoAppend) bool { 114 + 115 + for _, op := range ra.Ops { 116 + if op.Action != "create" && op.Action != "update" { 117 + continue 140 118 } 141 - var labels []events.Label = []events.Label{} 142 - for _, op := range evt.Append.Ops { 143 - uri := "at://" + evt.Append.Repo + "/" + op.Path 144 - nsid := strings.SplitN(op.Path, "/", 2)[0] 145 - // filter to creation/update of ony post/profile records 146 - // TODO(bnewbold): how do I 'switch' on a tuple here in golang, instead of nested switch? 147 - switch op.Action { 148 - case "create", "update": 149 - log.Infof("labeling record: %v", uri) 150 - switch nsid { 151 - case "app.bsky.feed.post": 152 - cid, rec, err := sliceRepo.GetRecord(ctx, op.Path) 153 - if err != nil { 154 - return fmt.Errorf("record not in CAR slice: %s", uri) 155 - } 156 - cidStr := cid.String() 157 - post, suc := rec.(*bsky.FeedPost) 158 - if !suc { 159 - return fmt.Errorf("record failed to deserialize from CBOR: %s", rec) 160 - } 161 - // run through all the keyword labelers on posts, saving any resulting labels 162 - for _, labeler := range s.kwl { 163 - for _, val := range labeler.labelPost(*post) { 164 - labels = append(labels, events.Label{ 165 - SourceDid: s.user.did, 166 - SubjectUri: uri, 167 - SubjectCid: &cidStr, 168 - Value: val, 169 - Timestamp: now, 170 - }) 171 - } 172 - } 173 - case "app.bsky.actor.profile": 174 - // NOTE: copypasta from post above, could refactor to not duplicate 175 - cid, rec, err := sliceRepo.GetRecord(ctx, op.Path) 176 - if err != nil { 177 - return fmt.Errorf("record not in CAR slice: %s", uri) 178 - } 179 - cidStr := cid.String() 180 - profile, suc := rec.(*bsky.ActorProfile) 181 - if !suc { 182 - return fmt.Errorf("record failed to deserialize from CBOR: %s", rec) 183 - } 184 - // run through all the keyword labelers on profiles, saving any resulting labels 185 - for _, labeler := range s.kwl { 186 - for _, val := range labeler.labelActorProfile(*profile) { 187 - labels = append(labels, events.Label{ 188 - SourceDid: s.user.did, 189 - SubjectUri: uri, 190 - SubjectCid: &cidStr, 191 - Value: val, 192 - Timestamp: now, 193 - }) 194 - } 195 - } 196 - default: 197 - continue 198 - } 199 - default: 200 - continue 201 - } 119 + nsid := strings.SplitN(op.Path, "/", 2)[0] 120 + switch nsid { 121 + case "app.bsky.feed.post": 122 + return true 123 + case "app.bsky.actor.profile": 124 + return true 125 + default: 126 + continue 202 127 } 128 + } 129 + return false 130 + } 203 131 204 - // if any labels generated, persist them to repo... 205 - for i, l := range labels { 206 - path, _, err := s.repoman.CreateRecord(ctx, s.user.userId, "com.atproto.label.label", &l) 207 - if err != nil { 208 - return fmt.Errorf("failed to persist label in local repo: %w", err) 132 + func (s *Server) labelRecord(ctx context.Context, did, nsid, uri, cid string, rec cbg.CBORMarshaler) ([]string, error) { 133 + log.Infof("labeling record: %v", uri) 134 + var labelVals []string 135 + switch nsid { 136 + case "app.bsky.feed.post": 137 + post, suc := rec.(*appbsky.FeedPost) 138 + if !suc { 139 + return []string{}, fmt.Errorf("record failed to deserialize from CBOR: %s", rec) 140 + } 141 + // run through all the keyword labelers on posts, saving any resulting labels 142 + for _, labeler := range s.kwl { 143 + for _, val := range labeler.labelPost(*post) { 144 + labelVals = append(labelVals, val) 209 145 } 210 - labeluri := "at://" + s.user.did + "/" + path 211 - labels[i].LabelUri = &labeluri 212 - log.Infof("persisted label: %s", labeluri) 213 146 } 214 - 215 - // ... then re-publish as LabelStreamEvent 216 - log.Infof("%s", labels) 217 - if len(labels) > 0 { 218 - lev := events.LabelStreamEvent{ 219 - // XXX(bnewbold): what should sequence number be? do I need to maintain that? 220 - Batch: &events.LabelBatch{ 221 - Seq: 0, 222 - Labels: labels, 223 - }, 224 - } 225 - err = s.levents.AddEvent(&lev) 226 - if err != nil { 227 - return fmt.Errorf("failed to publish LabelStreamEvent: %w", err) 147 + case "app.bsky.actor.profile": 148 + profile, suc := rec.(*appbsky.ActorProfile) 149 + if !suc { 150 + return []string{}, fmt.Errorf("record failed to deserialize from CBOR: %s", rec) 151 + } 152 + // run through all the keyword labelers on posts, saving any resulting labels 153 + for _, labeler := range s.kwl { 154 + for _, val := range labeler.labelActorProfile(*profile) { 155 + labelVals = append(labelVals, val) 228 156 } 229 157 } 230 - // TODO: update state that we successfully processed the repo event (aka, persist "last" seq in database, or something like that) 231 - return nil 232 - default: 233 - return fmt.Errorf("invalid fed event") 234 158 } 159 + return labelVals, nil 235 160 } 236 161 237 - func (s *Server) readRecordFunc(ctx context.Context, user util.Uid, c cid.Cid) (lexutil.CBOR, error) { 238 - bs, err := s.cs.ReadOnlySession(user) 239 - if err != nil { 240 - return nil, err 241 - } 162 + // Process incoming repo events coming from BGS, which includes new and updated 163 + // records from any PDS. This function extracts records, handes them to the 164 + // labeling routine, and then persists and broadcasts any resulting labels 165 + func (s *Server) handleBgsRepoEvent(ctx context.Context, pds *models.PDS, evt *events.RepoStreamEvent) error { 242 166 243 - blk, err := bs.Get(ctx, c) 244 - if err != nil { 245 - return nil, err 167 + if evt.Append == nil { 168 + // TODO(bnewbold): is this really invalid? do we need to handle Info and Error events here? 169 + return fmt.Errorf("invalid repo append event") 246 170 } 247 171 248 - return lexutil.CborDecodeValue(blk.RawData()) 249 - } 172 + // quick check if we can skip processing the CAR slice entirely 173 + if !s.wantAnyRecords(ctx, evt.Append) { 174 + return nil 175 + } 250 176 251 - func loadKey(kfile string) (*did.PrivKey, error) { 252 - kb, err := os.ReadFile(kfile) 177 + // use an in-memory blockstore with repo wrapper to parse CAR slice 178 + sliceRepo, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Append.Blocks)) 253 179 if err != nil { 254 - return nil, err 180 + log.Warnw("failed to parse CAR slice", "repoErr", err) 181 + return err 255 182 } 256 183 257 - sk, err := jwk.ParseKey(kb) 258 - if err != nil { 259 - return nil, err 184 + now := time.Now().Format(util.ISO8601) 185 + labels := []events.Label{} 186 + 187 + for _, op := range evt.Append.Ops { 188 + uri := "at://" + evt.Append.Repo + "/" + op.Path 189 + nsid := strings.SplitN(op.Path, "/", 2)[0] 190 + 191 + if !(op.Action == "create" || op.Action == "update") { 192 + continue 193 + } 194 + 195 + cid, rec, err := sliceRepo.GetRecord(ctx, op.Path) 196 + if err != nil { 197 + return fmt.Errorf("record not in CAR slice: %s", uri) 198 + } 199 + cidStr := cid.String() 200 + labelVals, err := s.labelRecord(ctx, s.user.did, nsid, uri, cidStr, rec) 201 + if err != nil { 202 + return err 203 + } 204 + for _, val := range labelVals { 205 + labels = append(labels, events.Label{ 206 + SourceDid: s.user.did, 207 + SubjectUri: uri, 208 + SubjectCid: &cidStr, 209 + Value: val, 210 + Timestamp: now, 211 + }) 212 + } 260 213 } 261 214 262 - var spk ecdsa.PrivateKey 263 - if err := sk.Raw(&spk); err != nil { 264 - return nil, err 265 - } 266 - curve, ok := sk.Get("crv") 267 - if !ok { 268 - return nil, fmt.Errorf("need a curve set") 215 + // if any labels generated, persist them to repo... 216 + for i, l := range labels { 217 + path, _, err := s.repoman.CreateRecord(ctx, s.user.userId, "com.atproto.label.label", &l) 218 + if err != nil { 219 + return fmt.Errorf("failed to persist label in local repo: %w", err) 220 + } 221 + labeluri := "at://" + s.user.did + "/" + path 222 + labels[i].LabelUri = &labeluri 223 + log.Infof("persisted label: %s", labeluri) 269 224 } 270 225 271 - var out string 272 - kts := string(curve.(jwa.EllipticCurveAlgorithm)) 273 - switch kts { 274 - case "P-256": 275 - out = did.KeyTypeP256 276 - default: 277 - return nil, fmt.Errorf("unrecognized key type: %s", kts) 226 + // ... then re-publish as LabelStreamEvent 227 + log.Infof("%s", labels) 228 + if len(labels) > 0 { 229 + lev := events.LabelStreamEvent{ 230 + Batch: &events.LabelBatch{ 231 + // NOTE(bnewbold): seems like other code handles Seq field automatically 232 + Labels: labels, 233 + }, 234 + } 235 + err = s.levents.AddEvent(&lev) 236 + if err != nil { 237 + return fmt.Errorf("failed to publish LabelStreamEvent: %w", err) 238 + } 278 239 } 279 - 280 - return &did.PrivKey{ 281 - Raw: &spk, 282 - Type: out, 283 - }, nil 240 + // TODO(bnewbold): persist state that we successfully processed the repo event (aka, 241 + // persist "last" seq in database, or something like that). also above, at 242 + // the short-circuit 243 + return nil 284 244 } 285 245 286 246 func (s *Server) RunAPI(listen string) error { ··· 288 248 s.echo = e 289 249 e.HideBanner = true 290 250 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ 291 - Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", 251 + Format: "method=${method} uri=${uri} status=${status} latency=${latency_human}\n", 292 252 })) 293 253 294 254 e.HTTPErrorHandler = func(err error, ctx echo.Context) { 295 - fmt.Printf("HANDLER ERROR: (%s) %s\n", ctx.Path(), err) 255 + fmt.Printf("Error at path=%s: %v\n", ctx.Path(), err) 296 256 ctx.Response().WriteHeader(500) 297 257 } 298 258 299 259 s.RegisterHandlersComAtproto(e) 300 - e.GET("/events/v0/labels", s.EventsLabelsWebsocket) 260 + // TODO(bnewbold): this is a speculative endpoint name 261 + e.GET("/xrpc/com.atproto.label.subscribeAllLabels", s.EventsLabelsWebsocket) 301 262 302 263 return e.Start(listen) 303 264 } 265 + 266 + func (s *Server) Shutdown(ctx context.Context) error { 267 + return s.echo.Shutdown(ctx) 268 + }
+47
labeling/util.go
··· 1 + package labeling 2 + 3 + import ( 4 + "crypto/ecdsa" 5 + "fmt" 6 + "os" 7 + 8 + "github.com/lestrrat-go/jwx/jwa" 9 + jwk "github.com/lestrrat-go/jwx/jwk" 10 + "github.com/whyrusleeping/go-did" 11 + ) 12 + 13 + // TODO:(bnewbold): duplicates elsewhere; should refactor into cliutil 14 + func loadKey(kfile string) (*did.PrivKey, error) { 15 + kb, err := os.ReadFile(kfile) 16 + if err != nil { 17 + return nil, err 18 + } 19 + 20 + sk, err := jwk.ParseKey(kb) 21 + if err != nil { 22 + return nil, err 23 + } 24 + 25 + var spk ecdsa.PrivateKey 26 + if err := sk.Raw(&spk); err != nil { 27 + return nil, err 28 + } 29 + curve, ok := sk.Get("crv") 30 + if !ok { 31 + return nil, fmt.Errorf("need a curve set") 32 + } 33 + 34 + var out string 35 + kts := string(curve.(jwa.EllipticCurveAlgorithm)) 36 + switch kts { 37 + case "P-256": 38 + out = did.KeyTypeP256 39 + default: 40 + return nil, fmt.Errorf("unrecognized key type: %s", kts) 41 + } 42 + 43 + return &did.PrivKey{ 44 + Raw: &spk, 45 + Type: out, 46 + }, nil 47 + }
-1
labeling/ws_endpoints.go
··· 23 23 24 24 ctx := c.Request().Context() 25 25 26 - // TODO: authhhh 27 26 conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) 28 27 if err != nil { 29 28 return fmt.Errorf("upgrading websocket: %w", err)
+15 -3
labeling/xrpc_endpoints.go
··· 17 17 e.GET("/xrpc/com.atproto.repo.getRecord", s.HandleComAtprotoRepoGetRecord) 18 18 e.GET("/xrpc/com.atproto.repo.listRecords", s.HandleComAtprotoRepoListRecords) 19 19 e.GET("/xrpc/com.atproto.server.getAccountsConfig", s.HandleComAtprotoServerGetAccountsConfig) 20 - e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo) 21 - // TODO(bnewbold): after lexicons updated 22 - //e.GET("/xrpc/com.atproto.sync.getHead", s.HandleComAtprotoSyncGetRoot) 20 + e.GET("/xrpc/com.atproto.sync.getHead", s.HandleComAtprotoSyncGetHead) 23 21 return nil 24 22 } 25 23 ··· 145 143 } 146 144 return c.Stream(200, "application/vnd.ipld.car", out) 147 145 } 146 + 147 + func (s *Server) HandleComAtprotoSyncGetHead(c echo.Context) error { 148 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHead") 149 + defer span.End() 150 + did := c.QueryParam("did") 151 + var out *atproto.SyncGetHead_Output 152 + var handleErr error 153 + // func (s *Server) handleComAtprotoSyncGetHead(ctx context.Context,did string) (*comatprototypes.SyncGetHead_Output, error) 154 + out, handleErr = s.handleComAtprotoSyncGetHead(ctx, did) 155 + if handleErr != nil { 156 + return handleErr 157 + } 158 + return c.JSON(200, out) 159 + }
+4
labeling/xrpc_handlers.go
··· 103 103 Value: lexutil.LexiconTypeDecoder{rec}, 104 104 }, nil 105 105 } 106 + 107 + func (s *Server) handleComAtprotoSyncGetHead(ctx context.Context, did string) (*atproto.SyncGetHead_Output, error) { 108 + panic("not yet implemented") 109 + }