this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

do batch buffering of writes out to disk

+126 -50
+122 -50
events/diskpersist.go
··· 11 11 "os" 12 12 "path/filepath" 13 13 "sync" 14 + "time" 14 15 15 16 "github.com/bluesky-social/indigo/api/atproto" 16 17 "github.com/bluesky-social/indigo/models" ··· 23 24 archiveDir string 24 25 eventsPerFile int64 25 26 26 - evts chan *persistJob 27 + evts chan *persistJob 28 + flush chan chan struct{} 27 29 28 30 meta *gorm.DB 29 31 ··· 38 40 39 41 buffers *sync.Pool 40 42 scratch []byte 43 + 44 + outbuf *bytes.Buffer 45 + evtbuf []*persistJob 41 46 } 42 47 43 48 type persistJob struct { 44 49 Buf []byte 45 50 Evt *XRPCStreamEvent 46 - Done chan jobResult 51 + Done chan error 47 52 } 48 53 49 54 type jobResult struct { ··· 100 105 evts: make(chan *persistJob, 1024), 101 106 eventsPerFile: opts.EventsPerFile, 102 107 scratch: make([]byte, headerSize), 108 + outbuf: new(bytes.Buffer), 109 + flush: make(chan chan struct{}), 103 110 } 104 111 105 112 if err := dp.resumeLog(); err != nil { ··· 244 251 var emptyHeader = make([]byte, headerSize) 245 252 246 253 func (p *DiskPersistence) persistWorker() { 254 + t := time.NewTimer(time.Hour) 255 + t.Stop() 256 + 257 + var tick <-chan time.Time 247 258 for { 248 259 select { 249 260 case job, ok := <-p.evts: ··· 251 262 return 252 263 } 253 264 254 - seq, err := p.doPersist(job.Buf, job.Evt) 255 - if err != nil { 256 - job.Done <- jobResult{Err: err} 265 + if err := p.doPersist(job); err != nil { 266 + job.Done <- err 267 + continue 257 268 } else { 258 - job.Done <- jobResult{Seq: seq} 269 + job.Done <- nil 259 270 } 260 - } 261 - } 262 - } 263 271 264 - func (p *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error { 265 - buffer := p.buffers.Get().(*bytes.Buffer) 266 - defer p.buffers.Put(buffer) 272 + if len(p.evtbuf) > 50 { 273 + if err := p.flushLog(context.TODO()); err != nil { 274 + log.Errorf("failed to flush disk log: %s", err) 275 + } 276 + t.Stop() 277 + tick = nil 278 + } 267 279 268 - buffer.Truncate(0) 280 + if len(p.evtbuf) > 0 && tick == nil { 281 + t.Reset(time.Millisecond * 50) 282 + tick = t.C 283 + } 284 + case <-tick: 285 + if err := p.flushLog(context.TODO()); err != nil { 286 + log.Errorf("failed to flush disk log: %s", err) 287 + } 269 288 270 - buffer.Write(emptyHeader) 289 + tick = nil 290 + case freq := <-p.flush: 291 + // ensure that flushing waits until all pending events get through 292 + // if theres a huge stream of events coming in this might never happen though... 293 + if len(p.evts) > 0 { 294 + go func() { 295 + time.Sleep(time.Millisecond * 50) 296 + p.flush <- freq 297 + }() 298 + } else { 299 + if len(p.evtbuf) > 0 { 300 + if err := p.flushLog(context.TODO()); err != nil { 301 + log.Errorf("failed to flush disk log: %s", err) 302 + } 303 + t.Stop() 304 + tick = nil 305 + } 271 306 272 - var evtKind uint32 273 - switch { 274 - case e.RepoCommit != nil: 275 - evtKind = evtKindCommit 276 - if err := e.RepoCommit.MarshalCBOR(buffer); err != nil { 277 - return fmt.Errorf("failed to marshal: %w", err) 278 - } 279 - case e.RepoHandle != nil: 280 - evtKind = evtKindHandle 281 - if err := e.RepoHandle.MarshalCBOR(buffer); err != nil { 282 - return fmt.Errorf("failed to marshal: %w", err) 307 + freq <- struct{}{} 308 + } 283 309 } 284 - default: 285 - return nil 286 - // only those two get peristed right now 287 310 } 288 - 289 - b := buffer.Bytes() 311 + } 290 312 291 - binary.LittleEndian.PutUint32(b, 0) 292 - binary.LittleEndian.PutUint32(b[4:], evtKind) 293 - binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-headerSize)) 313 + func (p *DiskPersistence) flushLog(ctx context.Context) error { 314 + if len(p.evtbuf) == 0 { 315 + fmt.Println("attempted a double flush") 316 + return nil 317 + } 294 318 295 - done := make(chan jobResult, 1) 296 - p.evts <- &persistJob{ 297 - Buf: b, 298 - Evt: e, 299 - Done: done, 319 + _, err := io.Copy(p.logfi, p.outbuf) 320 + if err != nil { 321 + return err 300 322 } 301 323 302 - resp := <-done 324 + p.outbuf.Truncate(0) 303 325 304 - if resp.Err != nil { 305 - return resp.Err 326 + for _, ej := range p.evtbuf { 327 + p.broadcast(ej.Evt) 306 328 } 329 + 330 + p.evtbuf = p.evtbuf[:0] 307 331 308 332 return nil 309 333 } 310 334 311 - func (p *DiskPersistence) doPersist(b []byte, e *XRPCStreamEvent) (int64, error) { 335 + func (p *DiskPersistence) doPersist(j *persistJob) error { 336 + b := j.Buf 337 + e := j.Evt 312 338 seq := p.curSeq 313 339 p.curSeq++ 314 340 ··· 322 348 default: 323 349 // only those two get peristed right now 324 350 // we shouldnt actually ever get here... 325 - return 0, nil 351 + return nil 326 352 } 327 353 328 354 // TODO: does this guarantee a full write? 329 - _, err := p.logfi.Write(b) 355 + _, err := p.outbuf.Write(b) 330 356 if err != nil { 331 - return 0, err 357 + return err 332 358 } 333 359 334 - p.broadcast(e) 360 + p.evtbuf = append(p.evtbuf, j) 335 361 336 362 if seq%p.eventsPerFile == 0 { 363 + if err := p.flushLog(context.TODO()); err != nil { 364 + return err 365 + } 366 + 337 367 // time to roll the log file 338 368 if err := p.swapLog(context.TODO()); err != nil { 339 - return 0, err 369 + return err 340 370 } 341 371 } 342 372 343 - return seq, nil 373 + return nil 374 + } 375 + 376 + func (p *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error { 377 + buffer := p.buffers.Get().(*bytes.Buffer) 378 + defer p.buffers.Put(buffer) 379 + 380 + buffer.Truncate(0) 381 + 382 + buffer.Write(emptyHeader) 383 + 384 + var evtKind uint32 385 + switch { 386 + case e.RepoCommit != nil: 387 + evtKind = evtKindCommit 388 + if err := e.RepoCommit.MarshalCBOR(buffer); err != nil { 389 + return fmt.Errorf("failed to marshal: %w", err) 390 + } 391 + case e.RepoHandle != nil: 392 + evtKind = evtKindHandle 393 + if err := e.RepoHandle.MarshalCBOR(buffer); err != nil { 394 + return fmt.Errorf("failed to marshal: %w", err) 395 + } 396 + default: 397 + return nil 398 + // only those two get peristed right now 399 + } 400 + 401 + b := buffer.Bytes() 402 + 403 + binary.LittleEndian.PutUint32(b, 0) 404 + binary.LittleEndian.PutUint32(b[4:], evtKind) 405 + binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-headerSize)) 406 + 407 + done := make(chan error, 1) 408 + p.evts <- &persistJob{ 409 + Buf: b, 410 + Evt: e, 411 + Done: done, 412 + } 413 + 414 + return <-done // NB: getting rid of this channel 'wait for things to be done' thing makes it go a good deal faster 344 415 } 345 416 346 417 type evtHeader struct { ··· 416 487 } 417 488 418 489 for _, lf := range logs { 419 - fmt.Println("LOGS", lf) 420 490 if err := p.readEventsFrom(ctx, since, filepath.Join(p.primaryDir, lf.Path), cb); err != nil { 421 - fmt.Println("EVENT ERROR: ", err) 422 491 return err 423 492 } 424 493 since = 0 ··· 488 557 } 489 558 490 559 func (p *DiskPersistence) Flush(ctx context.Context) error { 560 + req := make(chan struct{}) 561 + p.flush <- req 562 + <-req 491 563 return nil 492 564 } 493 565
+4
events/diskpersist_test.go
··· 103 103 } 104 104 } 105 105 106 + if err := dp.Flush(ctx); err != nil { 107 + t.Fatal(err) 108 + } 109 + 106 110 outEvtCount := 0 107 111 expectedEvtCount := n 108 112