this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor to have event persisters do writethrough broadcast (#187)

authored by

Whyrusleeping and committed by
GitHub
03c8d951 a206cdf4

+559 -141
+1 -1
cmd/bigsky/main.go
··· 173 173 174 174 repoman := repomgr.NewRepoManager(db, cstore, kmgr) 175 175 176 - dbp, err := events.NewDbPersistence(db, cstore) 176 + dbp, err := events.NewDbPersistence(db, cstore, nil) 177 177 if err != nil { 178 178 return fmt.Errorf("setting up db event persistence: %w", err) 179 179 }
+251 -31
events/dbpersist.go
··· 5 5 "context" 6 6 "encoding/json" 7 7 "fmt" 8 + "sync" 8 9 "time" 9 10 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 12 13 lexutil "github.com/bluesky-social/indigo/lex/util" 13 14 "github.com/bluesky-social/indigo/models" 14 15 "github.com/bluesky-social/indigo/util" 16 + lru "github.com/hashicorp/golang-lru" 15 17 16 18 cid "github.com/ipfs/go-cid" 17 19 "gorm.io/gorm" 18 20 ) 19 21 22 + type PersistenceBatchItem struct { 23 + Record *RepoEventRecord 24 + Event *XRPCStreamEvent 25 + } 26 + 27 + type Options struct { 28 + MaxBatchSize int 29 + MinBatchSize int 30 + MaxTimeBetweenFlush time.Duration 31 + CheckBatchInterval time.Duration 32 + UIDCacheSize int 33 + DIDCacheSize int 34 + } 35 + 36 + func DefaultOptions() *Options { 37 + return &Options{ 38 + MaxBatchSize: 200, 39 + MinBatchSize: 10, 40 + MaxTimeBetweenFlush: 500 * time.Millisecond, 41 + CheckBatchInterval: 100 * time.Millisecond, 42 + UIDCacheSize: 10000, 43 + DIDCacheSize: 10000, 44 + } 45 + } 46 + 20 47 type DbPersistence struct { 21 48 db *gorm.DB 22 49 23 50 cs *carstore.CarStore 51 + 52 + lk sync.Mutex 53 + 54 + broadcast func(*XRPCStreamEvent) 55 + 56 + batch []*PersistenceBatchItem 57 + batchOptions Options 58 + lastFlush time.Time 59 + 60 + uidCache *lru.ARCCache 61 + didCache *lru.ARCCache 24 62 } 25 63 26 64 type RepoEventRecord struct { 27 - Seq uint `gorm:"primarykey"` 28 - Commit util.DbCID 29 - Prev *util.DbCID 65 + Seq uint `gorm:"primarykey"` 66 + Commit *util.DbCID 67 + Prev *util.DbCID 68 + NewHandle *string // NewHandle is only set if this is a handle change event 30 69 31 70 Time time.Time 32 71 Blobs []byte 33 72 Repo util.Uid 34 - Event string 73 + Type string 35 74 Rebase bool 36 75 37 76 Ops []byte 38 77 } 39 78 40 - func NewDbPersistence(db *gorm.DB, cs *carstore.CarStore) (*DbPersistence, error) { 79 + func NewDbPersistence(db *gorm.DB, cs *carstore.CarStore, options *Options) (*DbPersistence, error) { 41 80 if err := db.AutoMigrate(&RepoEventRecord{}); err != nil { 42 81 return nil, err 43 82 } 44 83 45 - return &DbPersistence{ 46 - db: db, 47 - cs: cs, 48 - }, nil 84 + if options == nil { 85 + options = DefaultOptions() 86 + } 87 + 88 + uidCache, err := lru.NewARC(options.UIDCacheSize) 89 + if err != nil { 90 + return nil, fmt.Errorf("failed to create uid cache: %w", err) 91 + } 92 + 93 + didCache, err := lru.NewARC(options.DIDCacheSize) 94 + if err != nil { 95 + return nil, fmt.Errorf("failed to create did cache: %w", err) 96 + } 97 + 98 + p := DbPersistence{ 99 + db: db, 100 + cs: cs, 101 + batchOptions: *options, 102 + batch: []*PersistenceBatchItem{}, 103 + uidCache: uidCache, 104 + didCache: didCache, 105 + } 106 + 107 + go p.batchFlusher() 108 + 109 + return &p, nil 110 + } 111 + 112 + func (p *DbPersistence) batchFlusher() { 113 + for { 114 + time.Sleep(p.batchOptions.CheckBatchInterval) 115 + 116 + p.lk.Lock() 117 + needsFlush := len(p.batch) > 0 && 118 + (len(p.batch) >= p.batchOptions.MinBatchSize || 119 + time.Since(p.lastFlush) >= p.batchOptions.MaxTimeBetweenFlush) 120 + p.lk.Unlock() 121 + 122 + if needsFlush { 123 + if err := p.FlushBatch(context.Background()); err != nil { 124 + log.Errorf("failed to flush batch: %s", err) 125 + } 126 + } 127 + } 128 + 129 + } 130 + 131 + func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent)) { 132 + p.broadcast = brc 133 + } 134 + 135 + func (p *DbPersistence) FlushBatch(ctx context.Context) error { 136 + p.lk.Lock() 137 + defer p.lk.Unlock() 138 + 139 + return p.flushBatchLocked(ctx) 140 + } 141 + 142 + func (p *DbPersistence) flushBatchLocked(ctx context.Context) error { 143 + // TODO: we technically don't need to hold the lock through the database 144 + // operation, all we need to do is swap the batch out, and ensure nobody 145 + // else tries to enter this function to flush another batch while we are 146 + // flushing. I'll leave that for a later optimization 147 + 148 + records := make([]*RepoEventRecord, len(p.batch)) 149 + for i, item := range p.batch { 150 + records[i] = item.Record 151 + } 152 + 153 + if err := p.db.CreateInBatches(records, 50).Error; err != nil { 154 + return fmt.Errorf("failed to create records: %w", err) 155 + } 156 + 157 + for i, item := range records { 158 + e := p.batch[i].Event 159 + switch { 160 + case e.RepoCommit != nil: 161 + e.RepoCommit.Seq = int64(item.Seq) 162 + case e.RepoHandle != nil: 163 + e.RepoHandle.Seq = int64(item.Seq) 164 + default: 165 + return fmt.Errorf("unknown event type") 166 + } 167 + p.broadcast(e) 168 + } 169 + 170 + p.batch = []*PersistenceBatchItem{} 171 + p.lastFlush = time.Now() 172 + 173 + return nil 174 + } 175 + 176 + func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *XRPCStreamEvent) error { 177 + p.lk.Lock() 178 + defer p.lk.Unlock() 179 + p.batch = append(p.batch, &PersistenceBatchItem{ 180 + Record: rec, 181 + Event: evt, 182 + }) 183 + 184 + if len(p.batch) >= p.batchOptions.MaxBatchSize { 185 + if err := p.flushBatchLocked(ctx); err != nil { 186 + return fmt.Errorf("failed to flush batch at max size: %w", err) 187 + } 188 + } 189 + 190 + return nil 49 191 } 50 192 51 193 func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error { 52 - if e.RepoCommit == nil { 194 + var rer *RepoEventRecord 195 + var err error 196 + 197 + switch { 198 + case e.RepoCommit != nil: 199 + rer, err = p.RecordFromRepoCommit(ctx, e.RepoCommit) 200 + if err != nil { 201 + return err 202 + } 203 + case e.RepoHandle != nil: 204 + rer, err = p.RecordFromHandleChange(ctx, e.RepoHandle) 205 + if err != nil { 206 + return err 207 + } 208 + default: 53 209 return nil 54 210 } 55 211 56 - evt := e.RepoCommit 212 + if err := p.AddItemToBatch(ctx, rer, e); err != nil { 213 + return err 214 + } 215 + 216 + return nil 217 + } 218 + 219 + func (p *DbPersistence) RecordFromHandleChange(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Handle) (*RepoEventRecord, error) { 220 + t, err := time.Parse(util.ISO8601, evt.Time) 221 + if err != nil { 222 + return nil, err 223 + } 224 + 225 + uid, err := p.uidForDid(ctx, evt.Did) 226 + if err != nil { 227 + return nil, err 228 + } 229 + 230 + return &RepoEventRecord{ 231 + Repo: uid, 232 + Type: "repo_handle", 233 + Time: t, 234 + NewHandle: &evt.Handle, 235 + }, nil 236 + } 57 237 238 + func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*RepoEventRecord, error) { 58 239 // TODO: hack hack hack 59 240 if len(evt.Ops) > 8192 { 60 241 log.Errorf("(VERY BAD) truncating ops field in outgoing event (len = %d)", len(evt.Ops)) ··· 63 244 64 245 uid, err := p.uidForDid(ctx, evt.Repo) 65 246 if err != nil { 66 - return err 247 + return nil, err 67 248 } 68 249 69 250 var prev *util.DbCID ··· 75 256 if len(evt.Blobs) > 0 { 76 257 b, err := json.Marshal(evt.Blobs) 77 258 if err != nil { 78 - return err 259 + return nil, err 79 260 } 80 261 blobs = b 81 262 } 82 263 83 264 t, err := time.Parse(util.ISO8601, evt.Time) 84 265 if err != nil { 85 - return err 266 + return nil, err 86 267 } 87 268 88 269 rer := RepoEventRecord{ 89 - Commit: util.DbCID{cid.Cid(evt.Commit)}, 270 + Commit: &util.DbCID{cid.Cid(evt.Commit)}, 90 271 Prev: prev, 91 272 Repo: uid, 92 - Event: "repo_append", // TODO: refactor to "#commit"? can "rebase" come through this path? 273 + Type: "repo_append", // TODO: refactor to "#commit"? can "rebase" come through this path? 93 274 Blobs: blobs, 94 275 Time: t, 95 276 Rebase: evt.Rebase, ··· 97 278 98 279 opsb, err := json.Marshal(evt.Ops) 99 280 if err != nil { 100 - return err 281 + return nil, err 101 282 } 102 283 rer.Ops = opsb 103 284 104 - if err := p.db.Create(&rer).Error; err != nil { 105 - return err 106 - } 107 - 108 - e.RepoCommit.Seq = int64(rer.Seq) 109 - 110 - return nil 285 + return &rer, nil 111 286 } 112 287 113 288 func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { ··· 123 298 return err 124 299 } 125 300 126 - ra, err := p.hydrateRepoEvent(ctx, &evt) 127 - if err != nil { 128 - return fmt.Errorf("hydrating event: %w", err) 301 + var streamEvent *XRPCStreamEvent 302 + switch { 303 + case evt.Commit != nil: 304 + streamEvent, err = p.hydrateCommit(ctx, &evt) 305 + if err != nil { 306 + return fmt.Errorf("failed to hydrate commit: %w", err) 307 + } 308 + case evt.NewHandle != nil: 309 + streamEvent, err = p.hydrateHandleChange(ctx, &evt) 310 + if err != nil { 311 + return fmt.Errorf("failed to hydrate handle change: %w", err) 312 + } 313 + default: 314 + return fmt.Errorf("unknown event type: %s", evt.Type) 129 315 } 130 316 131 - if err := cb(&XRPCStreamEvent{RepoCommit: ra}); err != nil { 317 + if err := cb(streamEvent); err != nil { 132 318 return err 133 319 } 134 320 } ··· 137 323 } 138 324 139 325 func (p *DbPersistence) uidForDid(ctx context.Context, did string) (util.Uid, error) { 326 + if uid, ok := p.didCache.Get(did); ok { 327 + return uid.(util.Uid), nil 328 + } 329 + 140 330 var u models.ActorInfo 141 331 if err := p.db.First(&u, "did = ?", did).Error; err != nil { 142 332 return 0, err 143 333 } 144 334 335 + p.didCache.Add(did, u.Uid) 336 + 145 337 return u.Uid, nil 146 338 } 147 339 148 340 func (p *DbPersistence) didForUid(ctx context.Context, uid util.Uid) (string, error) { 341 + if did, ok := p.uidCache.Get(uid); ok { 342 + return did.(string), nil 343 + } 344 + 149 345 var u models.ActorInfo 150 346 if err := p.db.First(&u, "uid = ?", uid).Error; err != nil { 151 347 return "", err 152 348 } 153 349 350 + p.uidCache.Add(uid, u.Did) 351 + 154 352 return u.Did, nil 155 353 } 156 354 157 - func (p *DbPersistence) hydrateRepoEvent(ctx context.Context, rer *RepoEventRecord) (*comatproto.SyncSubscribeRepos_Commit, error) { 355 + func (p *DbPersistence) hydrateHandleChange(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) { 356 + if rer.NewHandle == nil { 357 + return nil, fmt.Errorf("NewHandle is nil") 358 + } 359 + 360 + did, err := p.didForUid(ctx, rer.Repo) 361 + if err != nil { 362 + return nil, err 363 + } 364 + 365 + return &XRPCStreamEvent{ 366 + RepoHandle: &comatproto.SyncSubscribeRepos_Handle{ 367 + Did: did, 368 + Handle: *rer.NewHandle, 369 + Time: rer.Time.Format(util.ISO8601), 370 + }, 371 + }, nil 372 + } 373 + 374 + func (p *DbPersistence) hydrateCommit(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) { 375 + if rer.Commit == nil { 376 + return nil, fmt.Errorf("commit is nil") 377 + } 378 + 158 379 var blobs []string 159 380 if len(rer.Blobs) > 0 { 160 381 if err := json.Unmarshal(rer.Blobs, &blobs); err != nil { ··· 195 416 Blobs: blobCIDs, 196 417 Rebase: rer.Rebase, 197 418 Ops: ops, 198 - // TODO: there was previously an Event field here. are these all Commit, or are some other events? 199 419 } 200 420 201 421 cs, err := p.readCarSlice(ctx, rer) ··· 209 429 out.Blocks = cs 210 430 } 211 431 212 - return out, nil 432 + return &XRPCStreamEvent{RepoCommit: out}, nil 213 433 } 214 434 215 435 func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) {
+183
events/dbpersist_test.go
··· 1 + package events_test 2 + 3 + import ( 4 + "context" 5 + "os" 6 + "path/filepath" 7 + "sync" 8 + "testing" 9 + "time" 10 + 11 + atproto "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/api/bsky" 13 + "github.com/bluesky-social/indigo/carstore" 14 + "github.com/bluesky-social/indigo/events" 15 + lexutil "github.com/bluesky-social/indigo/lex/util" 16 + "github.com/bluesky-social/indigo/models" 17 + "github.com/bluesky-social/indigo/pds" 18 + "github.com/bluesky-social/indigo/repomgr" 19 + "github.com/bluesky-social/indigo/util" 20 + "github.com/ipfs/go-log/v2" 21 + "gorm.io/driver/sqlite" 22 + "gorm.io/gorm" 23 + ) 24 + 25 + func init() { 26 + log.SetAllLoggers(log.LevelDebug) 27 + } 28 + 29 + func BenchmarkDBPersist(b *testing.B) { 30 + ctx := context.Background() 31 + 32 + db, _, cs, tempPath, err := setupDBs(b) 33 + if err != nil { 34 + b.Fatal(err) 35 + } 36 + 37 + db.AutoMigrate(&pds.User{}) 38 + db.AutoMigrate(&pds.Peering{}) 39 + db.AutoMigrate(&models.ActorInfo{}) 40 + 41 + db.Create(&models.ActorInfo{ 42 + Uid: 1, 43 + Did: "did:example:123", 44 + }) 45 + 46 + mgr := repomgr.NewRepoManager(db, cs, &util.FakeKeyManager{}) 47 + 48 + err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 49 + if err != nil { 50 + b.Fatal(err) 51 + } 52 + 53 + _, cid, err := mgr.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 54 + Text: "hello world", 55 + CreatedAt: time.Now().Format(util.ISO8601), 56 + }) 57 + if err != nil { 58 + b.Fatal(err) 59 + } 60 + 61 + defer os.RemoveAll(tempPath) 62 + 63 + // Initialize a DBPersister 64 + dbp, err := events.NewDbPersistence(db, cs, nil) 65 + if err != nil { 66 + b.Fatal(err) 67 + } 68 + 69 + // Create a bunch of events 70 + evtman := events.NewEventManager(dbp) 71 + 72 + userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 73 + if err != nil { 74 + b.Fatal(err) 75 + } 76 + 77 + inEvts := make([]*events.XRPCStreamEvent, b.N) 78 + for i := 0; i < b.N; i++ { 79 + cidLink := lexutil.LexLink(cid) 80 + headLink := lexutil.LexLink(userRepoHead) 81 + inEvts[i] = &events.XRPCStreamEvent{ 82 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 83 + Repo: "did:example:123", 84 + Commit: headLink, 85 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 86 + { 87 + Action: "add", 88 + Cid: &cidLink, 89 + Path: "path1", 90 + }, 91 + }, 92 + Time: time.Now().Format(util.ISO8601), 93 + }, 94 + } 95 + } 96 + 97 + numRoutines := 5 98 + wg := sync.WaitGroup{} 99 + 100 + b.ResetTimer() 101 + 102 + errChan := make(chan error, numRoutines) 103 + 104 + // Add events in parallel 105 + for i := 0; i < numRoutines; i++ { 106 + wg.Add(1) 107 + go func() { 108 + defer wg.Done() 109 + for i := 0; i < b.N; i++ { 110 + err = evtman.AddEvent(ctx, inEvts[i]) 111 + if err != nil { 112 + errChan <- err 113 + } 114 + } 115 + }() 116 + } 117 + 118 + wg.Wait() 119 + close(errChan) 120 + 121 + // Check for errors 122 + for err := range errChan { 123 + if err != nil { 124 + b.Fatal(err) 125 + } 126 + } 127 + 128 + outEvtCount := 0 129 + expectedEvtCount := b.N * numRoutines 130 + 131 + // Flush manually 132 + err = dbp.FlushBatch(ctx) 133 + if err != nil { 134 + b.Fatal(err) 135 + } 136 + 137 + b.StopTimer() 138 + 139 + dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 140 + outEvtCount++ 141 + return nil 142 + }) 143 + 144 + if outEvtCount != expectedEvtCount { 145 + b.Fatalf("expected %d events, got %d", expectedEvtCount, outEvtCount) 146 + } 147 + } 148 + 149 + func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, *carstore.CarStore, string, error) { 150 + dir, err := os.MkdirTemp("", "integtest") 151 + if err != nil { 152 + return nil, nil, nil, "", err 153 + } 154 + 155 + maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite?cache=shared&mode=rwc"))) 156 + if err != nil { 157 + return nil, nil, nil, "", err 158 + } 159 + 160 + tx := maindb.Exec("PRAGMA journal_mode=WAL;") 161 + if tx.Error != nil { 162 + return nil, nil, nil, "", tx.Error 163 + } 164 + 165 + tx.Commit() 166 + 167 + cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite"))) 168 + if err != nil { 169 + return nil, nil, nil, "", err 170 + } 171 + 172 + cspath := filepath.Join(dir, "carstore") 173 + if err := os.Mkdir(cspath, 0775); err != nil { 174 + return nil, nil, nil, "", err 175 + } 176 + 177 + cs, err := carstore.NewCarStore(cardb, cspath) 178 + if err != nil { 179 + return nil, nil, nil, "", err 180 + } 181 + 182 + return maindb, cardb, cs, "", nil 183 + }
+8 -8
events/events.go
··· 26 26 } 27 27 28 28 func NewEventManager(persister EventPersistence) *EventManager { 29 - return &EventManager{ 29 + em := &EventManager{ 30 30 bufferSize: 1024, 31 31 persister: persister, 32 32 } 33 + 34 + persister.SetEventBroadcaster(em.broadcastEvent) 35 + 36 + return em 33 37 } 34 38 35 39 const ( ··· 45 49 } 46 50 47 51 func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { 48 - // NOTE: Assumes subsLk is held 52 + em.subsLk.Lock() 53 + defer em.subsLk.Unlock() 49 54 50 55 // TODO: for a larger fanout we should probably have dedicated goroutines 51 56 // for subsets of the subscriber set, and tiered channels to distribute ··· 68 73 } 69 74 70 75 func (em *EventManager) persistAndSendEvent(ctx context.Context, evt *XRPCStreamEvent) { 71 - em.subsLk.Lock() 72 - defer em.subsLk.Unlock() 73 - 74 - if err := em.persister.Persist(context.TODO(), evt); err != nil { 76 + if err := em.persister.Persist(ctx, evt); err != nil { 75 77 log.Errorf("failed to persist outbound event: %s", err) 76 78 } 77 - 78 - em.broadcastEvent(evt) 79 79 } 80 80 81 81 type Subscriber struct {
+10
events/persist.go
··· 14 14 Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error 15 15 TakeDownRepo(ctx context.Context, usr util.Uid) error 16 16 RebaseRepoEvents(ctx context.Context, usr util.Uid) error 17 + 18 + SetEventBroadcaster(func(*XRPCStreamEvent)) 17 19 } 18 20 19 21 // MemPersister is the most naive implementation of event persistence ··· 23 25 buf []*XRPCStreamEvent 24 26 lk sync.Mutex 25 27 seq int64 28 + 29 + broadcast func(*XRPCStreamEvent) 26 30 } 27 31 28 32 func NewMemPersister() *MemPersister { ··· 49 53 } 50 54 mp.buf = append(mp.buf, e) 51 55 56 + mp.broadcast(e) 57 + 52 58 return nil 53 59 } 54 60 ··· 78 84 func (mp *MemPersister) RebaseRepoEvents(ctx context.Context, usr util.Uid) error { 79 85 return fmt.Errorf("repo rebases not currently supported by memory persister, test usage only") 80 86 } 87 + 88 + func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) { 89 + mp.broadcast = brc 90 + }
+30 -30
testing/integ_test.go
··· 25 25 t.Skip("skipping BGS test in 'short' test mode") 26 26 } 27 27 assert := assert.New(t) 28 - didr := testPLC(t) 29 - p1 := mustSetupPDS(t, "localhost:5155", ".tpds", didr) 28 + didr := TestPLC(t) 29 + p1 := MustSetupPDS(t, "localhost:5155", ".tpds", didr) 30 30 p1.Run(t) 31 31 32 - b1 := mustSetupBGS(t, "localhost:8231", didr) 32 + b1 := MustSetupBGS(t, "localhost:8231", didr) 33 33 b1.Run(t) 34 34 35 35 p1.RequestScraping(t, b1) ··· 37 37 time.Sleep(time.Millisecond * 50) 38 38 39 39 evts := b1.Events(t, -1) 40 - defer evts.cancel() 40 + defer evts.Cancel() 41 41 42 42 bob := p1.MustNewUser(t, "bob.tpds") 43 43 alice := p1.MustNewUser(t, "alice.tpds") ··· 73 73 74 74 // playback 75 75 pbevts := b1.Events(t, 2) 76 - defer pbevts.cancel() 76 + defer pbevts.Cancel() 77 77 78 78 fmt.Println("event 5") 79 79 pbe1 := pbevts.Next() 80 80 assert.Equal(*e3, *pbe1) 81 81 } 82 82 83 - func randomFollows(t *testing.T, users []*testUser) { 83 + func randomFollows(t *testing.T, users []*TestUser) { 84 84 for n := 0; n < 3; n++ { 85 85 for i, u := range users { 86 86 oi := rand.Intn(len(users)) ··· 93 93 } 94 94 } 95 95 96 - func socialSim(t *testing.T, users []*testUser, postiter, likeiter int) []*atproto.RepoStrongRef { 96 + func socialSim(t *testing.T, users []*TestUser, postiter, likeiter int) []*atproto.RepoStrongRef { 97 97 var posts []*atproto.RepoStrongRef 98 98 for i := 0; i < postiter; i++ { 99 99 for _, u := range users { 100 - posts = append(posts, u.Post(t, makeRandomPost())) 100 + posts = append(posts, u.Post(t, MakeRandomPost())) 101 101 } 102 102 } 103 103 ··· 118 118 119 119 assert := assert.New(t) 120 120 _ = assert 121 - didr := testPLC(t) 122 - p1 := mustSetupPDS(t, "localhost:5185", ".pdsuno", didr) 121 + didr := TestPLC(t) 122 + p1 := MustSetupPDS(t, "localhost:5185", ".pdsuno", didr) 123 123 p1.Run(t) 124 124 125 - p2 := mustSetupPDS(t, "localhost:5186", ".pdsdos", didr) 125 + p2 := MustSetupPDS(t, "localhost:5186", ".pdsdos", didr) 126 126 p2.Run(t) 127 127 128 - b1 := mustSetupBGS(t, "localhost:8281", didr) 128 + b1 := MustSetupBGS(t, "localhost:8281", didr) 129 129 b1.Run(t) 130 130 131 131 p1.RequestScraping(t, b1) 132 132 time.Sleep(time.Millisecond * 100) 133 133 134 - var users []*testUser 134 + var users []*TestUser 135 135 for i := 0; i < 5; i++ { 136 136 users = append(users, p1.MustNewUser(t, usernames[i]+".pdsuno")) 137 137 } ··· 139 139 randomFollows(t, users) 140 140 socialSim(t, users, 10, 10) 141 141 142 - var users2 []*testUser 142 + var users2 []*TestUser 143 143 for i := 0; i < 5; i++ { 144 144 users2 = append(users2, p2.MustNewUser(t, usernames[i+5]+".pdsdos")) 145 145 } ··· 182 182 //t.Skip("test too sleepy to run in CI for now") 183 183 assert := assert.New(t) 184 184 _ = assert 185 - didr := testPLC(t) 186 - p1 := mustSetupPDS(t, "localhost:5195", ".pdsuno", didr) 185 + didr := TestPLC(t) 186 + p1 := MustSetupPDS(t, "localhost:5195", ".pdsuno", didr) 187 187 p1.Run(t) 188 188 189 - p2 := mustSetupPDS(t, "localhost:5196", ".pdsdos", didr) 189 + p2 := MustSetupPDS(t, "localhost:5196", ".pdsdos", didr) 190 190 p2.Run(t) 191 191 192 - b1 := mustSetupBGS(t, "localhost:8291", didr) 192 + b1 := MustSetupBGS(t, "localhost:8291", didr) 193 193 b1.Run(t) 194 194 195 195 p1.RequestScraping(t, b1) 196 196 time.Sleep(time.Millisecond * 50) 197 197 198 - users := []*testUser{p1.MustNewUser(t, usernames[0]+".pdsuno")} 198 + users := []*TestUser{p1.MustNewUser(t, usernames[0]+".pdsuno")} 199 199 200 200 socialSim(t, users, 10, 0) 201 201 202 - users2 := []*testUser{p2.MustNewUser(t, usernames[1]+".pdsdos")} 202 + users2 := []*TestUser{p2.MustNewUser(t, usernames[1]+".pdsdos")} 203 203 204 204 p2posts := socialSim(t, users2, 10, 0) 205 205 ··· 239 239 //t.Skip("test too sleepy to run in CI for now") 240 240 assert := assert.New(t) 241 241 _ = assert 242 - didr := testPLC(t) 243 - p1 := mustSetupPDS(t, "localhost:5385", ".pdsuno", didr) 242 + didr := TestPLC(t) 243 + p1 := MustSetupPDS(t, "localhost:5385", ".pdsuno", didr) 244 244 p1.Run(t) 245 245 246 - b1 := mustSetupBGS(t, "localhost:8391", didr) 246 + b1 := MustSetupBGS(t, "localhost:8391", didr) 247 247 b1.Run(t) 248 248 249 249 p1.RequestScraping(t, b1) ··· 272 272 assert := assert.New(t) 273 273 _ = assert 274 274 275 - didr := testPLC(t) 276 - p1 := mustSetupPDS(t, "localhost:5151", ".tpds", didr) 275 + didr := TestPLC(t) 276 + p1 := MustSetupPDS(t, "localhost:5151", ".tpds", didr) 277 277 p1.Run(t) 278 278 279 - b1 := mustSetupBGS(t, "localhost:3231", didr) 279 + b1 := MustSetupBGS(t, "localhost:3231", didr) 280 280 b1.Run(t) 281 281 282 282 p1.RequestScraping(t, b1) ··· 323 323 t.Skip("skipping BGS test in 'short' test mode") 324 324 } 325 325 assert := assert.New(t) 326 - didr := testPLC(t) 327 - p1 := mustSetupPDS(t, "localhost:9155", ".tpds", didr) 326 + didr := TestPLC(t) 327 + p1 := MustSetupPDS(t, "localhost:9155", ".tpds", didr) 328 328 p1.Run(t) 329 329 330 - b1 := mustSetupBGS(t, "localhost:1531", didr) 330 + b1 := MustSetupBGS(t, "localhost:1531", didr) 331 331 b1.Run(t) 332 332 333 333 p1.RequestScraping(t, b1) ··· 344 344 time.Sleep(time.Millisecond * 100) 345 345 346 346 evts1 := b1.Events(t, 0) 347 - defer evts1.cancel() 347 + defer evts1.Cancel() 348 348 349 349 preRebaseEvts := evts1.WaitFor(5) 350 350 fmt.Println(preRebaseEvts)
+10 -10
testing/labelmaker_fakedata_test.go
··· 69 69 return lm 70 70 } 71 71 72 - func labelEvents(t *testing.T, lm *labeler.Server, since int64) *eventStream { 72 + func labelEvents(t *testing.T, lm *labeler.Server, since int64) *EventStream { 73 73 d := websocket.Dialer{} 74 74 h := http.Header{} 75 75 bgsHost := "localhost:1234" ··· 90 90 91 91 ctx, cancel := context.WithCancel(context.Background()) 92 92 93 - es := &eventStream{ 94 - cancel: cancel, 93 + es := &EventStream{ 94 + Cancel: cancel, 95 95 } 96 96 97 97 go func() { ··· 103 103 rsc := &events.RepoStreamCallbacks{ 104 104 LabelLabels: func(evt *label.SubscribeLabels_Labels) error { 105 105 fmt.Println("received event: ", evt.Seq) 106 - es.lk.Lock() 107 - es.events = append(es.events, &events.XRPCStreamEvent{LabelLabels: evt}) 108 - es.lk.Unlock() 106 + es.Lk.Lock() 107 + es.Events = append(es.Events, &events.XRPCStreamEvent{LabelLabels: evt}) 108 + es.Lk.Unlock() 109 109 return nil 110 110 }, 111 111 } ··· 128 128 assert := assert.New(t) 129 129 _ = assert 130 130 ctx := context.TODO() 131 - didr := testPLC(t) 132 - p1 := mustSetupPDS(t, "localhost:5115", ".tpds", didr) 131 + didr := TestPLC(t) 132 + p1 := MustSetupPDS(t, "localhost:5115", ".tpds", didr) 133 133 p1.Run(t) 134 134 135 - b1 := mustSetupBGS(t, "localhost:8322", didr) 135 + b1 := MustSetupBGS(t, "localhost:8322", didr) 136 136 b1.Run(t) 137 137 138 138 p1.RequestScraping(t, b1) ··· 145 145 time.Sleep(time.Millisecond * 50) 146 146 147 147 evts := b1.Events(t, -1) 148 - defer evts.cancel() 148 + defer evts.Cancel() 149 149 150 150 bob := p1.MustNewUser(t, "bob.tpds") 151 151 alice := p1.MustNewUser(t, "alice.tpds")
+2 -2
testing/pds_fakedata_test.go
··· 59 59 t.Skip("skipping PDS+fakedata test in 'short' test mode") 60 60 } 61 61 assert := assert.New(t) 62 - plcc := testPLC(t) 63 - pds := mustSetupPDS(t, "localhost:5159", ".test", plcc) 62 + plcc := TestPLC(t) 63 + pds := MustSetupPDS(t, "localhost:5159", ".test", plcc) 64 64 pds.Run(t) 65 65 66 66 time.Sleep(time.Millisecond * 50)
+64 -59
testing/utils.go
··· 40 40 "gorm.io/gorm" 41 41 ) 42 42 43 - type testPDS struct { 43 + type TestPDS struct { 44 44 dir string 45 45 server *pds.Server 46 46 plc *api.PLCServer ··· 50 50 shutdown func() 51 51 } 52 52 53 - func (tp *testPDS) Cleanup() { 53 + func (tp *TestPDS) Cleanup() { 54 54 if tp.shutdown != nil { 55 55 tp.shutdown() 56 56 } ··· 60 60 } 61 61 } 62 62 63 - func mustSetupPDS(t *testing.T, host, suffix string, plc plc.PLCClient) *testPDS { 63 + func MustSetupPDS(t *testing.T, host, suffix string, plc plc.PLCClient) *TestPDS { 64 64 t.Helper() 65 65 66 66 tpds, err := SetupPDS(host, suffix, plc) ··· 71 71 return tpds 72 72 } 73 73 74 - func SetupPDS(host, suffix string, plc plc.PLCClient) (*testPDS, error) { 74 + func SetupPDS(host, suffix string, plc plc.PLCClient) (*TestPDS, error) { 75 75 dir, err := os.MkdirTemp("", "integtest") 76 76 if err != nil { 77 77 return nil, err 78 78 } 79 79 80 - maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite"))) 80 + maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite?cache=shared&mode=rwc"))) 81 81 if err != nil { 82 82 return nil, err 83 + } 84 + 85 + tx := maindb.Exec("PRAGMA journal_mode=WAL;") 86 + if tx.Error != nil { 87 + return nil, tx.Error 83 88 } 84 89 85 90 cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite"))) ··· 111 116 return nil, err 112 117 } 113 118 114 - return &testPDS{ 119 + return &TestPDS{ 115 120 dir: dir, 116 121 server: srv, 117 122 host: host, 118 123 }, nil 119 124 } 120 125 121 - func (tp *testPDS) Run(t *testing.T) { 126 + func (tp *TestPDS) Run(t *testing.T) { 122 127 // TODO: rig this up so it t.Fatals if the RunAPI call fails immediately 123 128 go func() { 124 129 if err := tp.server.RunAPI(tp.host); err != nil { ··· 132 137 } 133 138 } 134 139 135 - func (tp *testPDS) RequestScraping(t *testing.T, b *testBGS) { 140 + func (tp *TestPDS) RequestScraping(t *testing.T, b *TestBGS) { 136 141 t.Helper() 137 142 138 143 c := &xrpc.Client{Host: "http://" + b.host} ··· 141 146 } 142 147 } 143 148 144 - type testUser struct { 149 + type TestUser struct { 145 150 handle string 146 - pds *testPDS 151 + pds *TestPDS 147 152 did string 148 153 149 154 client *xrpc.Client 150 155 } 151 156 152 - func (tp *testPDS) MustNewUser(t *testing.T, handle string) *testUser { 157 + func (tp *TestPDS) MustNewUser(t *testing.T, handle string) *TestUser { 153 158 t.Helper() 154 159 155 160 u, err := tp.NewUser(handle) ··· 160 165 return u 161 166 } 162 167 163 - func (tp *testPDS) NewUser(handle string) (*testUser, error) { 168 + func (tp *TestPDS) NewUser(handle string) (*TestUser, error) { 164 169 ctx := context.TODO() 165 170 166 171 c := &xrpc.Client{ ··· 184 189 Did: out.Did, 185 190 } 186 191 187 - return &testUser{ 192 + return &TestUser{ 188 193 pds: tp, 189 194 handle: out.Handle, 190 195 client: c, ··· 192 197 }, nil 193 198 } 194 199 195 - func (u *testUser) Reply(t *testing.T, replyto, root *atproto.RepoStrongRef, body string) string { 200 + func (u *TestUser) Reply(t *testing.T, replyto, root *atproto.RepoStrongRef, body string) string { 196 201 t.Helper() 197 202 198 203 ctx := context.TODO() ··· 215 220 return resp.Uri 216 221 } 217 222 218 - func (u *testUser) DID() string { 223 + func (u *TestUser) DID() string { 219 224 return u.did 220 225 } 221 226 222 - func (u *testUser) Post(t *testing.T, body string) *atproto.RepoStrongRef { 227 + func (u *TestUser) Post(t *testing.T, body string) *atproto.RepoStrongRef { 223 228 t.Helper() 224 229 225 230 ctx := context.TODO() ··· 242 247 } 243 248 } 244 249 245 - func (u *testUser) Like(t *testing.T, post *atproto.RepoStrongRef) { 250 + func (u *TestUser) Like(t *testing.T, post *atproto.RepoStrongRef) { 246 251 t.Helper() 247 252 248 253 ctx := context.TODO() ··· 261 266 262 267 } 263 268 264 - func (u *testUser) Follow(t *testing.T, did string) string { 269 + func (u *TestUser) Follow(t *testing.T, did string) string { 265 270 t.Helper() 266 271 267 272 ctx := context.TODO() ··· 281 286 return resp.Uri 282 287 } 283 288 284 - func (u *testUser) GetFeed(t *testing.T) []*bsky.FeedDefs_FeedViewPost { 289 + func (u *TestUser) GetFeed(t *testing.T) []*bsky.FeedDefs_FeedViewPost { 285 290 t.Helper() 286 291 287 292 ctx := context.TODO() ··· 293 298 return resp.Feed 294 299 } 295 300 296 - func (u *testUser) GetNotifs(t *testing.T) []*bsky.NotificationListNotifications_Notification { 301 + func (u *TestUser) GetNotifs(t *testing.T) []*bsky.NotificationListNotifications_Notification { 297 302 t.Helper() 298 303 299 304 ctx := context.TODO() ··· 305 310 return resp.Notifications 306 311 } 307 312 308 - func (u *testUser) ChangeHandle(t *testing.T, nhandle string) { 313 + func (u *TestUser) ChangeHandle(t *testing.T, nhandle string) { 309 314 t.Helper() 310 315 311 316 ctx := context.TODO() ··· 316 321 } 317 322 } 318 323 319 - func (u *testUser) DoRebase(t *testing.T) { 324 + func (u *TestUser) DoRebase(t *testing.T) { 320 325 t.Helper() 321 326 322 327 ctx := context.TODO() ··· 328 333 } 329 334 } 330 335 331 - func testPLC(t *testing.T) *plc.FakeDid { 336 + func TestPLC(t *testing.T) *plc.FakeDid { 332 337 // TODO: just do in memory... 333 338 tdir, err := os.MkdirTemp("", "plcserv") 334 339 if err != nil { ··· 342 347 return plc.NewFakeDid(db) 343 348 } 344 349 345 - type testBGS struct { 350 + type TestBGS struct { 346 351 bgs *bgs.BGS 347 352 host string 348 353 } 349 354 350 - func mustSetupBGS(t *testing.T, host string, didr plc.PLCClient) *testBGS { 355 + func MustSetupBGS(t *testing.T, host string, didr plc.PLCClient) *TestBGS { 351 356 tbgs, err := SetupBGS(host, didr) 352 357 if err != nil { 353 358 t.Fatal(err) ··· 356 361 return tbgs 357 362 } 358 363 359 - func SetupBGS(host string, didr plc.PLCClient) (*testBGS, error) { 364 + func SetupBGS(host string, didr plc.PLCClient) (*TestBGS, error) { 360 365 dir, err := os.MkdirTemp("", "integtest") 361 366 if err != nil { 362 367 return nil, err ··· 389 394 390 395 notifman := notifs.NewNotificationManager(maindb, repoman.GetRecord) 391 396 392 - dbpersist, err := events.NewDbPersistence(maindb, cs) 397 + dbpersist, err := events.NewDbPersistence(maindb, cs, nil) 393 398 if err != nil { 394 399 return nil, err 395 400 } ··· 412 417 return nil, err 413 418 } 414 419 415 - return &testBGS{ 420 + return &TestBGS{ 416 421 bgs: b, 417 422 host: host, 418 423 }, nil 419 424 } 420 425 421 - func (b *testBGS) Run(t *testing.T) { 426 + func (b *TestBGS) Run(t *testing.T) { 422 427 go func() { 423 428 if err := b.bgs.Start(b.host); err != nil { 424 429 fmt.Println(err) ··· 427 432 time.Sleep(time.Millisecond * 10) 428 433 } 429 434 430 - type eventStream struct { 431 - lk sync.Mutex 432 - events []*events.XRPCStreamEvent 433 - cancel func() 435 + type EventStream struct { 436 + Lk sync.Mutex 437 + Events []*events.XRPCStreamEvent 438 + Cancel func() 434 439 435 - cur int 440 + Cur int 436 441 } 437 442 438 - func (b *testBGS) Events(t *testing.T, since int64) *eventStream { 443 + func (b *TestBGS) Events(t *testing.T, since int64) *EventStream { 439 444 d := websocket.Dialer{} 440 445 h := http.Header{} 441 446 ··· 455 460 456 461 ctx, cancel := context.WithCancel(context.Background()) 457 462 458 - es := &eventStream{ 459 - cancel: cancel, 463 + es := &EventStream{ 464 + Cancel: cancel, 460 465 } 461 466 462 467 go func() { ··· 468 473 rsc := &events.RepoStreamCallbacks{ 469 474 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 470 475 fmt.Println("received event: ", evt.Seq, evt.Repo) 471 - es.lk.Lock() 472 - es.events = append(es.events, &events.XRPCStreamEvent{RepoCommit: evt}) 473 - es.lk.Unlock() 476 + es.Lk.Lock() 477 + es.Events = append(es.Events, &events.XRPCStreamEvent{RepoCommit: evt}) 478 + es.Lk.Unlock() 474 479 return nil 475 480 }, 476 481 RepoHandle: func(evt *atproto.SyncSubscribeRepos_Handle) error { 477 482 fmt.Println("received handle event: ", evt.Seq, evt.Did) 478 - es.lk.Lock() 479 - es.events = append(es.events, &events.XRPCStreamEvent{RepoHandle: evt}) 480 - es.lk.Unlock() 483 + es.Lk.Lock() 484 + es.Events = append(es.Events, &events.XRPCStreamEvent{RepoHandle: evt}) 485 + es.Lk.Unlock() 481 486 return nil 482 487 }, 483 488 } ··· 489 494 return es 490 495 } 491 496 492 - func (es *eventStream) Next() *events.XRPCStreamEvent { 493 - defer es.lk.Unlock() 497 + func (es *EventStream) Next() *events.XRPCStreamEvent { 498 + defer es.Lk.Unlock() 494 499 for { 495 - es.lk.Lock() 496 - if len(es.events) > es.cur { 497 - es.cur++ 498 - return es.events[es.cur-1] 500 + es.Lk.Lock() 501 + if len(es.Events) > es.Cur { 502 + es.Cur++ 503 + return es.Events[es.Cur-1] 499 504 } 500 - es.lk.Unlock() 505 + es.Lk.Unlock() 501 506 time.Sleep(time.Millisecond * 10) 502 507 } 503 508 } 504 509 505 - func (es *eventStream) All() []*events.XRPCStreamEvent { 506 - es.lk.Lock() 507 - defer es.lk.Unlock() 508 - out := make([]*events.XRPCStreamEvent, len(es.events)) 509 - for i, e := range es.events { 510 + func (es *EventStream) All() []*events.XRPCStreamEvent { 511 + es.Lk.Lock() 512 + defer es.Lk.Unlock() 513 + out := make([]*events.XRPCStreamEvent, len(es.Events)) 514 + for i, e := range es.Events { 510 515 out[i] = e 511 516 } 512 517 513 518 return out 514 519 } 515 520 516 - func (es *eventStream) WaitFor(n int) []*events.XRPCStreamEvent { 521 + func (es *EventStream) WaitFor(n int) []*events.XRPCStreamEvent { 517 522 var out []*events.XRPCStreamEvent 518 523 for i := 0; i < n; i++ { 519 524 out = append(out, es.Next()) ··· 588 593 "parrot", 589 594 } 590 595 591 - func makeRandomPost() string { 596 + func MakeRandomPost() string { 592 597 var out []string 593 598 for i := 0; i < 20; i++ { 594 599 out = append(out, words[mathrand.Intn(len(words))]) ··· 673 678 return fmt.Sprintf("at://did:plc:%s/%s/%s", did, collection, rkey) 674 679 } 675 680 676 - func randAction() string { 681 + func RandAction() string { 677 682 v := mathrand.Intn(100) 678 683 if v < 40 { 679 684 return "post" ··· 696 701 697 702 var root cid.Cid 698 703 for i := 0; i < size; i++ { 699 - switch randAction() { 704 + switch RandAction() { 700 705 case "post": 701 706 _, _, err := r.CreateRecord(ctx, "app.bsky.feed.post", &bsky.FeedPost{ 702 707 CreatedAt: time.Now().Format(bsutil.ISO8601),