this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

use channels for synchronization

+61 -11
+61 -11
events/diskpersist.go
··· 23 23 archiveDir string 24 24 eventsPerFile int64 25 25 26 + evts chan *persistJob 27 + 26 28 meta *gorm.DB 27 29 28 30 broadcast func(*XRPCStreamEvent) ··· 30 32 logfi *os.File 31 33 32 34 curSeq int64 33 - seqLk sync.Mutex 34 35 35 36 uidCache *lru.ARCCache 36 37 didCache *lru.ARCCache 37 38 38 39 buffers *sync.Pool 39 40 scratch []byte 41 + } 42 + 43 + type persistJob struct { 44 + Buf []byte 45 + Evt *XRPCStreamEvent 46 + Done chan jobResult 47 + } 48 + 49 + type jobResult struct { 50 + Err error 51 + Seq int64 40 52 } 41 53 42 54 var _ (EventPersistence) = (*DiskPersistence)(nil) ··· 85 97 buffers: bufpool, 86 98 uidCache: uidCache, 87 99 didCache: didCache, 100 + evts: make(chan *persistJob, 1024), 88 101 eventsPerFile: opts.EventsPerFile, 89 102 scratch: make([]byte, headerSize), 90 103 } ··· 92 105 if err := dp.resumeLog(); err != nil { 93 106 return nil, err 94 107 } 108 + 109 + go dp.persistWorker() 95 110 96 111 return dp, nil 97 112 } ··· 228 243 229 244 var emptyHeader = make([]byte, headerSize) 230 245 246 + func (p *DiskPersistence) persistWorker() { 247 + for { 248 + select { 249 + case job, ok := <-p.evts: 250 + if !ok { 251 + return 252 + } 253 + 254 + seq, err := p.doPersist(job.Buf, job.Evt) 255 + if err != nil { 256 + job.Done <- jobResult{Err: err} 257 + } else { 258 + job.Done <- jobResult{Seq: seq} 259 + } 260 + } 261 + } 262 + } 263 + 231 264 func (p *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error { 232 265 buffer := p.buffers.Get().(*bytes.Buffer) 233 266 defer p.buffers.Put(buffer) ··· 259 292 binary.LittleEndian.PutUint32(b[4:], evtKind) 260 293 binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-headerSize)) 261 294 262 - p.seqLk.Lock() 263 - defer p.seqLk.Unlock() 295 + done := make(chan jobResult, 1) 296 + p.evts <- &persistJob{ 297 + Buf: b, 298 + Evt: e, 299 + Done: done, 300 + } 301 + 302 + resp := <-done 264 303 304 + if resp.Err != nil { 305 + return resp.Err 306 + } 307 + 308 + return nil 309 + } 310 + 311 + func (p *DiskPersistence) doPersist(b []byte, e *XRPCStreamEvent) (int64, error) { 265 312 seq := p.curSeq 266 313 p.curSeq++ 267 314 315 + binary.LittleEndian.PutUint64(b[12:], uint64(seq)) 316 + 268 317 switch { 269 318 case e.RepoCommit != nil: 270 319 e.RepoCommit.Seq = seq 271 320 case e.RepoHandle != nil: 272 321 e.RepoHandle.Seq = seq 273 322 default: 274 - return nil 275 323 // only those two get peristed right now 324 + // we shouldnt actually ever get here... 325 + return 0, nil 276 326 } 277 327 278 - binary.LittleEndian.PutUint64(b[12:], uint64(seq)) 279 - 280 - if _, err := io.Copy(p.logfi, buffer); err != nil { 281 - return err 328 + // TODO: does this guarantee a full write? 329 + _, err := p.logfi.Write(b) 330 + if err != nil { 331 + return 0, err 282 332 } 283 333 284 334 p.broadcast(e) 285 335 286 336 if seq%p.eventsPerFile == 0 { 287 337 // time to roll the log file 288 - if err := p.swapLog(ctx); err != nil { 289 - return err 338 + if err := p.swapLog(context.TODO()); err != nil { 339 + return 0, err 290 340 } 291 341 } 292 342 293 - return nil 343 + return seq, nil 294 344 } 295 345 296 346 type evtHeader struct {