this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

do cbor encoding outside lock, use buffer pool

+35 -13
+35 -13
events/diskpersist.go
··· 35 35 uidCache *lru.ARCCache 36 36 didCache *lru.ARCCache 37 37 38 - buffer *bytes.Buffer 38 + buffers *sync.Pool 39 39 scratch []byte 40 40 } 41 41 ··· 72 72 73 73 db.AutoMigrate(&LogFileRef{}) 74 74 75 + bufpool := &sync.Pool{ 76 + New: func() any { 77 + return new(bytes.Buffer) 78 + }, 79 + } 80 + 75 81 dp := &DiskPersistence{ 76 82 meta: db, 77 83 primaryDir: primaryDir, 78 84 archiveDir: archiveDir, 79 - buffer: new(bytes.Buffer), 85 + buffers: bufpool, 80 86 uidCache: uidCache, 81 87 didCache: didCache, 82 88 eventsPerFile: opts.EventsPerFile, ··· 221 227 ) 222 228 223 229 func (p *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error { 224 - p.seqLk.Lock() 225 - defer p.seqLk.Unlock() 230 + buffer := p.buffers.Get().(*bytes.Buffer) 231 + defer p.buffers.Put(buffer) 226 232 227 - p.buffer.Truncate(0) 228 - seq := p.curSeq 229 - p.curSeq++ 233 + buffer.Truncate(0) 230 234 231 235 var evtKind uint32 232 236 switch { 233 237 case e.RepoCommit != nil: 234 238 evtKind = evtKindCommit 235 - e.RepoCommit.Seq = seq 236 - if err := e.RepoCommit.MarshalCBOR(p.buffer); err != nil { 239 + if err := e.RepoCommit.MarshalCBOR(buffer); err != nil { 237 240 return fmt.Errorf("failed to marshal: %w", err) 238 241 } 239 242 case e.RepoHandle != nil: 240 243 evtKind = evtKindHandle 241 - e.RepoHandle.Seq = seq 242 - if err := e.RepoHandle.MarshalCBOR(p.buffer); err != nil { 244 + if err := e.RepoHandle.MarshalCBOR(buffer); err != nil { 243 245 return fmt.Errorf("failed to marshal: %w", err) 244 246 } 245 247 default: ··· 247 249 // only those two get peristed right now 248 250 } 249 251 250 - if err := p.writeHeader(ctx, 0, evtKind, uint32(p.buffer.Len()), seq); err != nil { 252 + p.seqLk.Lock() 253 + defer p.seqLk.Unlock() 254 + 255 + seq := p.curSeq 256 + p.curSeq++ 257 + 258 + switch { 259 + case e.RepoCommit != nil: 260 + e.RepoCommit.Seq = seq 261 + case e.RepoHandle != nil: 262 + e.RepoHandle.Seq = seq 263 + default: 264 + return nil 265 + // only those two get peristed right now 266 + } 267 + 268 + if err := p.writeHeader(ctx, 0, evtKind, uint32(buffer.Len()), seq); err != nil { 251 269 return fmt.Errorf("failed to write header: %w", err) 252 270 } 253 271 254 - if _, err := io.Copy(p.logfi, p.buffer); err != nil { 272 + if _, err := io.Copy(p.logfi, buffer); err != nil { 255 273 return err 256 274 } 257 275 ··· 340 358 } 341 359 342 360 for _, lf := range logs { 361 + fmt.Println("LOGS", lf) 343 362 if err := p.readEventsFrom(ctx, since, filepath.Join(p.primaryDir, lf.Path), cb); err != nil { 363 + fmt.Println("EVENT ERROR: ", err) 344 364 return err 345 365 } 346 366 since = 0 ··· 381 401 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len)); err != nil { 382 402 return err 383 403 } 404 + evt.Seq = h.Seq 384 405 if err := cb(&XRPCStreamEvent{RepoCommit: &evt}); err != nil { 385 406 return err 386 407 } ··· 389 410 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len)); err != nil { 390 411 return err 391 412 } 413 + evt.Seq = h.Seq 392 414 if err := cb(&XRPCStreamEvent{RepoHandle: &evt}); err != nil { 393 415 return err 394 416 }