this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

correct file modes on reopen (#271)

authored by

Whyrusleeping and committed by
GitHub
af95b9fd e5f34b53

+84 -2
+7 -1
bgs/bgs.go
··· 317 317 } 318 318 319 319 func (bgs *BGS) Shutdown() []error { 320 - return bgs.slurper.Shutdown() 320 + errs := bgs.slurper.Shutdown() 321 + 322 + if err := bgs.events.Shutdown(context.TODO()); err != nil { 323 + errs = append(errs, err) 324 + } 325 + 326 + return errs 321 327 } 322 328 323 329 type HealthStatus struct {
+4
events/dbpersist.go
··· 540 540 // a little weird that this is the same action as a takedown 541 541 return p.deleteAllEventsForUser(ctx, usr) 542 542 } 543 + 544 + func (p *DbPersistence) Shutdown(context.Context) error { 545 + return nil 546 + }
+17 -1
events/diskpersist.go
··· 44 44 outbuf *bytes.Buffer 45 45 evtbuf []persistJob 46 46 47 + shutdown chan struct{} 48 + 47 49 lk sync.Mutex 48 50 } 49 51 ··· 116 118 scratch: make([]byte, headerSize), 117 119 outbuf: new(bytes.Buffer), 118 120 writeBufferSize: opts.WriteBufferSize, 121 + shutdown: make(chan struct{}), 119 122 } 120 123 121 124 if err := dp.resumeLog(); err != nil { ··· 145 148 return dp.initLogFile() 146 149 } 147 150 148 - fi, err := os.Open(filepath.Join(dp.primaryDir, lfr.Path)) 151 + // 0 for the mode is fine since thats only used if O_CREAT is passed 152 + fi, err := os.OpenFile(filepath.Join(dp.primaryDir, lfr.Path), os.O_RDWR, 0) 149 153 if err != nil { 150 154 return err 151 155 } ··· 284 288 285 289 for { 286 290 select { 291 + case <-p.shutdown: 292 + return 287 293 case <-t.C: 288 294 p.lk.Lock() 289 295 if err := p.flushLog(context.TODO()); err != nil { ··· 705 711 if len(p.evtbuf) > 0 { 706 712 return p.flushLog(ctx) 707 713 } 714 + return nil 715 + } 716 + 717 + func (p *DiskPersistence) Shutdown(ctx context.Context) error { 718 + close(p.shutdown) 719 + if err := p.Flush(ctx); err != nil { 720 + return err 721 + } 722 + 723 + p.logfi.Close() 708 724 return nil 709 725 } 710 726
+43
events/diskpersist_test.go
··· 119 119 if outEvtCount != expectedEvtCount { 120 120 t.Fatalf("expected %d events, got %d", expectedEvtCount, outEvtCount) 121 121 } 122 + 123 + dp.Shutdown(ctx) 124 + 125 + time.Sleep(time.Millisecond * 100) 126 + 127 + dp2, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 128 + EventsPerFile: 10, 129 + UIDCacheSize: 100000, 130 + DIDCacheSize: 100000, 131 + }) 132 + if err != nil { 133 + t.Fatal(err) 134 + } 135 + 136 + evtman2 := events.NewEventManager(dp2) 137 + 138 + inEvts = make([]*events.XRPCStreamEvent, n) 139 + for i := 0; i < n; i++ { 140 + cidLink := lexutil.LexLink(cid) 141 + headLink := lexutil.LexLink(userRepoHead) 142 + inEvts[i] = &events.XRPCStreamEvent{ 143 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 144 + Repo: "did:example:123", 145 + Commit: headLink, 146 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 147 + { 148 + Action: "add", 149 + Cid: &cidLink, 150 + Path: "path1", 151 + }, 152 + }, 153 + Time: time.Now().Format(util.ISO8601), 154 + }, 155 + } 156 + } 157 + 158 + for i := 0; i < n; i++ { 159 + err = evtman2.AddEvent(ctx, inEvts[i]) 160 + if err != nil { 161 + t.Fatal(err) 162 + } 163 + } 122 164 } 123 165 124 166 func BenchmarkDiskPersist(b *testing.B) { ··· 141 183 } 142 184 143 185 runPersisterBenchmark(b, cs, db, dp) 186 + 144 187 } 145 188 146 189 func runPersisterBenchmark(b *testing.B, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) {
+4
events/events.go
··· 54 54 evt *XRPCStreamEvent 55 55 } 56 56 57 + func (em *EventManager) Shutdown(ctx context.Context) error { 58 + return em.persister.Shutdown(ctx) 59 + } 60 + 57 61 func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { 58 62 em.subsLk.Lock() 59 63 defer em.subsLk.Unlock()
+5
events/persist.go
··· 15 15 TakeDownRepo(ctx context.Context, usr models.Uid) error 16 16 RebaseRepoEvents(ctx context.Context, usr models.Uid) error 17 17 Flush(context.Context) error 18 + Shutdown(context.Context) error 18 19 19 20 SetEventBroadcaster(func(*XRPCStreamEvent)) 20 21 } ··· 93 94 func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) { 94 95 mp.broadcast = brc 95 96 } 97 + 98 + func (mp *MemPersister) Shutdown(context.Context) error { 99 + return nil 100 + }
+4
events/yolopersist.go
··· 63 63 func (yp *YoloPersister) Flush(ctx context.Context) error { 64 64 return nil 65 65 } 66 + 67 + func (yp *YoloPersister) Shutdown(ctx context.Context) error { 68 + return nil 69 + }