this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

less goroutines

+60 -80
+59 -79
events/diskpersist.go
··· 20 20 ) 21 21 22 22 type DiskPersistence struct { 23 - primaryDir string 24 - archiveDir string 25 - eventsPerFile int64 23 + primaryDir string 24 + archiveDir string 25 + eventsPerFile int64 26 + writeBufferSize int 26 27 27 - evts chan *persistJob 28 - flush chan chan struct{} 28 + evts chan *persistJob 29 29 30 30 meta *gorm.DB 31 31 ··· 42 42 scratch []byte 43 43 44 44 outbuf *bytes.Buffer 45 - evtbuf []*persistJob 45 + evtbuf []persistJob 46 + 47 + lk sync.Mutex 46 48 } 47 49 48 50 type persistJob struct { ··· 59 61 var _ (EventPersistence) = (*DiskPersistence)(nil) 60 62 61 63 type DiskPersistOptions struct { 62 - UIDCacheSize int 63 - DIDCacheSize int 64 - EventsPerFile int64 64 + UIDCacheSize int 65 + DIDCacheSize int 66 + EventsPerFile int64 67 + WriteBufferSize int 65 68 } 66 69 67 70 func DefaultDiskPersistOptions() *DiskPersistOptions { 68 71 return &DiskPersistOptions{ 69 - EventsPerFile: 10000, 70 - UIDCacheSize: 100000, 71 - DIDCacheSize: 100000, 72 + EventsPerFile: 10000, 73 + UIDCacheSize: 100000, 74 + DIDCacheSize: 100000, 75 + WriteBufferSize: 50, 72 76 } 73 77 } 74 78 ··· 96 100 } 97 101 98 102 dp := &DiskPersistence{ 99 - meta: db, 100 - primaryDir: primaryDir, 101 - archiveDir: archiveDir, 102 - buffers: bufpool, 103 - uidCache: uidCache, 104 - didCache: didCache, 105 - evts: make(chan *persistJob, 1024), 106 - eventsPerFile: opts.EventsPerFile, 107 - scratch: make([]byte, headerSize), 108 - outbuf: new(bytes.Buffer), 109 - flush: make(chan chan struct{}), 103 + meta: db, 104 + primaryDir: primaryDir, 105 + archiveDir: archiveDir, 106 + buffers: bufpool, 107 + uidCache: uidCache, 108 + didCache: didCache, 109 + evts: make(chan *persistJob, 1024), 110 + eventsPerFile: opts.EventsPerFile, 111 + scratch: make([]byte, headerSize), 112 + outbuf: new(bytes.Buffer), 113 + writeBufferSize: opts.WriteBufferSize, 110 114 } 111 115 112 116 if err := dp.resumeLog(); err != nil { 113 117 return nil, err 114 118 } 115 119 116 - go dp.persistWorker() 120 + go dp.flushRoutine() 117 121 118 122 return dp, nil 119 123 } ··· 250 254 251 255 var emptyHeader = make([]byte, headerSize) 252 256 253 - func (p *DiskPersistence) persistWorker() { 254 - t := time.NewTimer(time.Hour) 255 - t.Stop() 257 + func (p *DiskPersistence) addJobToQueue(job persistJob) error { 258 + p.lk.Lock() 259 + defer p.lk.Unlock() 256 260 257 - var tick <-chan time.Time 258 - for { 259 - select { 260 - case job, ok := <-p.evts: 261 - if !ok { 262 - return 263 - } 261 + if err := p.doPersist(job); err != nil { 262 + return err 263 + } 264 264 265 - if err := p.doPersist(job); err != nil { 266 - log.Error(err) 267 - } 265 + // TODO: for some reason replacing this constant with p.writeBufferSize dramatically reduces perf... 266 + if len(p.evtbuf) > 400 { 267 + if err := p.flushLog(context.TODO()); err != nil { 268 + return fmt.Errorf("failed to flush disk log: %w", err) 269 + } 270 + } 268 271 269 - if len(p.evtbuf) > 50 { 270 - if err := p.flushLog(context.TODO()); err != nil { 271 - log.Errorf("failed to flush disk log: %s", err) 272 - } 273 - t.Stop() 274 - tick = nil 275 - } 272 + return nil 273 + } 274 + 275 + func (p *DiskPersistence) flushRoutine() { 276 + t := time.NewTicker(time.Millisecond * 100) 276 277 277 - if len(p.evtbuf) > 0 && tick == nil { 278 - t.Reset(time.Millisecond * 50) 279 - tick = t.C 280 - } 281 - case <-tick: 278 + for { 279 + select { 280 + case <-t.C: 281 + p.lk.Lock() 282 282 if err := p.flushLog(context.TODO()); err != nil { 283 + // TODO: this happening is quite bad. Need a recovery strategy 283 284 log.Errorf("failed to flush disk log: %s", err) 284 285 } 285 - 286 - tick = nil 287 - case freq := <-p.flush: 288 - // ensure that flushing waits until all pending events get through 289 - // if theres a huge stream of events coming in this might never happen though... 290 - if len(p.evts) > 0 { 291 - go func() { 292 - time.Sleep(time.Millisecond * 50) 293 - p.flush <- freq 294 - }() 295 - } else { 296 - if len(p.evtbuf) > 0 { 297 - if err := p.flushLog(context.TODO()); err != nil { 298 - log.Errorf("failed to flush disk log: %s", err) 299 - } 300 - t.Stop() 301 - tick = nil 302 - } 303 - 304 - freq <- struct{}{} 305 - } 286 + p.lk.Unlock() 306 287 } 307 288 } 308 289 } 309 290 310 291 func (p *DiskPersistence) flushLog(ctx context.Context) error { 311 292 if len(p.evtbuf) == 0 { 312 - fmt.Println("attempted a double flush") 313 293 return nil 314 294 } 315 295 ··· 330 310 return nil 331 311 } 332 312 333 - func (p *DiskPersistence) doPersist(j *persistJob) error { 313 + func (p *DiskPersistence) doPersist(j persistJob) error { 334 314 b := j.Buf 335 315 e := j.Evt 336 316 seq := p.curSeq ··· 401 381 binary.LittleEndian.PutUint32(b[4:], evtKind) 402 382 binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-headerSize)) 403 383 404 - p.evts <- &persistJob{ 384 + return p.addJobToQueue(persistJob{ 405 385 Buf: b, 406 386 Evt: e, 407 387 Buffer: buffer, 408 - } 409 - 410 - return nil 388 + }) 411 389 } 412 390 413 391 type evtHeader struct { ··· 553 531 } 554 532 555 533 func (p *DiskPersistence) Flush(ctx context.Context) error { 556 - req := make(chan struct{}) 557 - p.flush <- req 558 - <-req 534 + p.lk.Lock() 535 + defer p.lk.Unlock() 536 + if len(p.evtbuf) > 0 { 537 + return p.flushLog(ctx) 538 + } 559 539 return nil 560 540 } 561 541
+1 -1
events/diskpersist_test.go
··· 197 197 } 198 198 } 199 199 200 - numRoutines := 5 200 + numRoutines := 4 201 201 wg := sync.WaitGroup{} 202 202 203 203 b.ResetTimer()