this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

parallelize dbpersister event playback hydration (#209)

Credit to @ericvolp12 for most of the work here
I couldnt manage to easily rebase the last PR so i grabbed the diff and
just applied it on top.
I then made hydrateBatch send out events as it gets them instead of
waiting and collecting everything up.

authored by

Whyrusleeping and committed by
GitHub
fdb5f977 d2f8fc9a

+223 -30
+95 -29
events/dbpersist.go
··· 25 25 } 26 26 27 27 type Options struct { 28 - MaxBatchSize int 29 - MinBatchSize int 30 - MaxTimeBetweenFlush time.Duration 31 - CheckBatchInterval time.Duration 32 - UIDCacheSize int 33 - DIDCacheSize int 28 + MaxBatchSize int 29 + MinBatchSize int 30 + MaxTimeBetweenFlush time.Duration 31 + CheckBatchInterval time.Duration 32 + UIDCacheSize int 33 + DIDCacheSize int 34 + PlaybackBatchSize int 35 + HydrationConcurrency int 34 36 } 35 37 36 38 func DefaultOptions() *Options { 37 39 return &Options{ 38 - MaxBatchSize: 200, 39 - MinBatchSize: 10, 40 - MaxTimeBetweenFlush: 500 * time.Millisecond, 41 - CheckBatchInterval: 100 * time.Millisecond, 42 - UIDCacheSize: 10000, 43 - DIDCacheSize: 10000, 40 + MaxBatchSize: 200, 41 + MinBatchSize: 10, 42 + MaxTimeBetweenFlush: 500 * time.Millisecond, 43 + CheckBatchInterval: 100 * time.Millisecond, 44 + UIDCacheSize: 10000, 45 + DIDCacheSize: 10000, 46 + PlaybackBatchSize: 500, 47 + HydrationConcurrency: 10, 44 48 } 45 49 } 46 50 ··· 125 129 } 126 130 } 127 131 } 128 - 129 132 } 130 133 131 134 func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent)) { ··· 135 138 func (p *DbPersistence) FlushBatch(ctx context.Context) error { 136 139 p.lk.Lock() 137 140 defer p.lk.Unlock() 138 - 139 141 return p.flushBatchLocked(ctx) 140 142 } 141 143 ··· 286 288 } 287 289 288 290 func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 289 - rows, err := p.db.Model(RepoEventRecord{}).Where("seq > ?", since).Order("seq asc").Rows() 291 + rows, err := p.db.Model(&RepoEventRecord{}).Where("seq > ?", since).Order("seq asc").Rows() 290 292 if err != nil { 291 293 return err 292 294 } 293 295 defer rows.Close() 294 296 297 + // Batch events into groups of 100 and hydrate them in parallel. 298 + // Join the hydrated events back into a single stream in order and pass them to the callback. 299 + 300 + batch := make([]*RepoEventRecord, 0, p.batchOptions.PlaybackBatchSize) 295 301 for rows.Next() { 296 302 var evt RepoEventRecord 297 303 if err := p.db.ScanRows(rows, &evt); err != nil { 304 + // Handle error 298 305 return err 299 306 } 300 307 301 - var streamEvent *XRPCStreamEvent 302 - switch { 303 - case evt.Commit != nil: 304 - streamEvent, err = p.hydrateCommit(ctx, &evt) 305 - if err != nil { 306 - return fmt.Errorf("failed to hydrate commit: %w", err) 307 - } 308 - case evt.NewHandle != nil: 309 - streamEvent, err = p.hydrateHandleChange(ctx, &evt) 310 - if err != nil { 311 - return fmt.Errorf("failed to hydrate handle change: %w", err) 308 + batch = append(batch, &evt) 309 + 310 + if len(batch) >= p.batchOptions.PlaybackBatchSize { 311 + if err := p.hydrateBatch(ctx, batch, cb); err != nil { 312 + return err 312 313 } 313 - default: 314 - return fmt.Errorf("unknown event type: %s", evt.Type) 314 + 315 + batch = batch[:0] 315 316 } 317 + } 316 318 317 - if err := cb(streamEvent); err != nil { 319 + if len(batch) > 0 { 320 + if err := p.hydrateBatch(ctx, batch, cb); err != nil { 318 321 return err 322 + } 323 + } 324 + 325 + return nil 326 + } 327 + 328 + func (p *DbPersistence) hydrateBatch(ctx context.Context, batch []*RepoEventRecord, cb func(*XRPCStreamEvent) error) error { 329 + events := make([]*XRPCStreamEvent, len(batch)) 330 + 331 + type Result struct { 332 + Event *XRPCStreamEvent 333 + Index int 334 + Err error 335 + } 336 + 337 + resultChan := make(chan Result, len(batch)) 338 + 339 + // Semaphore pattern for limiting concurrent goroutines 340 + sem := make(chan struct{}, p.batchOptions.HydrationConcurrency) 341 + var wg sync.WaitGroup 342 + 343 + for i, record := range batch { 344 + wg.Add(1) 345 + go func(i int, record *RepoEventRecord) { 346 + defer wg.Done() 347 + sem <- struct{}{} 348 + // release the semaphore at the end of the goroutine 349 + defer func() { <-sem }() 350 + 351 + var streamEvent *XRPCStreamEvent 352 + var err error 353 + 354 + switch { 355 + case record.Commit != nil: 356 + streamEvent, err = p.hydrateCommit(ctx, record) 357 + case record.NewHandle != nil: 358 + streamEvent, err = p.hydrateHandleChange(ctx, record) 359 + default: 360 + err = fmt.Errorf("unknown event type: %s", record.Type) 361 + } 362 + 363 + resultChan <- Result{Event: streamEvent, Index: i, Err: err} 364 + 365 + }(i, record) 366 + } 367 + 368 + go func() { 369 + wg.Wait() 370 + close(resultChan) 371 + }() 372 + 373 + cur := 0 374 + for result := range resultChan { 375 + if result.Err != nil { 376 + return result.Err 377 + } 378 + 379 + events[result.Index] = result.Event 380 + 381 + for ; cur < len(events) && events[cur] != nil; cur++ { 382 + if err := cb(events[cur]); err != nil { 383 + return err 384 + } 319 385 } 320 386 } 321 387
+128 -1
events/dbpersist_test.go
··· 146 146 } 147 147 } 148 148 149 + func BenchmarkPlayback(b *testing.B) { 150 + ctx := context.Background() 151 + 152 + n := b.N 153 + 154 + db, _, cs, tempPath, err := setupDBs(b) 155 + if err != nil { 156 + b.Fatal(err) 157 + } 158 + 159 + db.AutoMigrate(&pds.User{}) 160 + db.AutoMigrate(&pds.Peering{}) 161 + db.AutoMigrate(&models.ActorInfo{}) 162 + 163 + db.Create(&models.ActorInfo{ 164 + Uid: 1, 165 + Did: "did:example:123", 166 + }) 167 + 168 + mgr := repomgr.NewRepoManager(db, cs, &util.FakeKeyManager{}) 169 + 170 + err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 171 + if err != nil { 172 + b.Fatal(err) 173 + } 174 + 175 + _, cid, err := mgr.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 176 + Text: "hello world", 177 + CreatedAt: time.Now().Format(util.ISO8601), 178 + }) 179 + if err != nil { 180 + b.Fatal(err) 181 + } 182 + 183 + defer os.RemoveAll(tempPath) 184 + 185 + // Initialize a DBPersister 186 + dbp, err := events.NewDbPersistence(db, cs, nil) 187 + if err != nil { 188 + b.Fatal(err) 189 + } 190 + 191 + // Create a bunch of events 192 + evtman := events.NewEventManager(dbp) 193 + 194 + userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 195 + if err != nil { 196 + b.Fatal(err) 197 + } 198 + 199 + inEvts := make([]*events.XRPCStreamEvent, n) 200 + for i := 0; i < n; i++ { 201 + cidLink := lexutil.LexLink(cid) 202 + headLink := lexutil.LexLink(userRepoHead) 203 + inEvts[i] = &events.XRPCStreamEvent{ 204 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 205 + Repo: "did:example:123", 206 + Commit: headLink, 207 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 208 + { 209 + Action: "add", 210 + Cid: &cidLink, 211 + Path: "path1", 212 + }, 213 + }, 214 + Time: time.Now().Format(util.ISO8601), 215 + }, 216 + } 217 + } 218 + 219 + numRoutines := 5 220 + wg := sync.WaitGroup{} 221 + 222 + errChan := make(chan error, numRoutines) 223 + 224 + // Add events in parallel 225 + for i := 0; i < numRoutines; i++ { 226 + wg.Add(1) 227 + go func() { 228 + defer wg.Done() 229 + for i := 0; i < n; i++ { 230 + err = evtman.AddEvent(ctx, inEvts[i]) 231 + if err != nil { 232 + errChan <- err 233 + } 234 + } 235 + }() 236 + } 237 + 238 + wg.Wait() 239 + close(errChan) 240 + 241 + // Check for errors 242 + for err := range errChan { 243 + if err != nil { 244 + b.Fatal(err) 245 + } 246 + } 247 + 248 + outEvtCount := 0 249 + expectedEvtCount := n * numRoutines 250 + 251 + // Flush manually 252 + err = dbp.FlushBatch(ctx) 253 + if err != nil { 254 + b.Fatal(err) 255 + } 256 + 257 + b.ResetTimer() 258 + 259 + dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 260 + outEvtCount++ 261 + return nil 262 + }) 263 + 264 + b.StopTimer() 265 + 266 + if outEvtCount != expectedEvtCount { 267 + b.Fatalf("expected %d events, got %d", expectedEvtCount, outEvtCount) 268 + } 269 + } 270 + 149 271 func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, *carstore.CarStore, string, error) { 150 272 dir, err := os.MkdirTemp("", "integtest") 151 273 if err != nil { ··· 164 286 165 287 tx.Commit() 166 288 167 - cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite"))) 289 + cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite?cache=shared&mode=rwc"))) 168 290 if err != nil { 169 291 return nil, nil, nil, "", err 292 + } 293 + 294 + tx = cardb.Exec("PRAGMA journal_mode=WAL;") 295 + if tx.Error != nil { 296 + return nil, nil, nil, "", tx.Error 170 297 } 171 298 172 299 cspath := filepath.Join(dir, "carstore")