this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

delete dead code

-141
-141
events/diskpersist_test.go
··· 169 169 testPersister(t, factory) 170 170 } 171 171 172 - func XTestDiskPersist(t *testing.T) { 173 - ctx := context.Background() 174 - 175 - db, _, cs, tempPath, err := setupDBs(t) 176 - if err != nil { 177 - t.Fatal(err) 178 - } 179 - 180 - db.AutoMigrate(&pds.User{}) 181 - db.AutoMigrate(&pds.Peering{}) 182 - db.AutoMigrate(&models.ActorInfo{}) 183 - 184 - db.Create(&models.ActorInfo{ 185 - Uid: 1, 186 - Did: "did:example:123", 187 - }) 188 - 189 - mgr := repomgr.NewRepoManager(cs, &util.FakeKeyManager{}) 190 - 191 - err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 192 - if err != nil { 193 - t.Fatal(err) 194 - } 195 - 196 - _, cid, err := mgr.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 197 - Text: "hello world", 198 - CreatedAt: time.Now().Format(util.ISO8601), 199 - }) 200 - if err != nil { 201 - t.Fatal(err) 202 - } 203 - 204 - defer os.RemoveAll(tempPath) 205 - 206 - // Initialize a DBPersister 207 - 208 - dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ 209 - EventsPerFile: 10, 210 - UIDCacheSize: 100000, 211 - DIDCacheSize: 100000, 212 - }) 213 - if err != nil { 214 - t.Fatal(err) 215 - } 216 - 217 - // Create a bunch of events 218 - evtman := NewEventManager(dp) 219 - 220 - userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 221 - if err != nil { 222 - t.Fatal(err) 223 - } 224 - 225 - n := 100 226 - inEvts := make([]*XRPCStreamEvent, n) 227 - for i := 0; i < n; i++ { 228 - cidLink := lexutil.LexLink(cid) 229 - headLink := lexutil.LexLink(userRepoHead) 230 - inEvts[i] = &XRPCStreamEvent{ 231 - RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 232 - Repo: "did:example:123", 233 - Commit: headLink, 234 - Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 235 - { 236 - Action: "add", 237 - Cid: &cidLink, 238 - Path: "path1", 239 - }, 240 - }, 241 - Time: time.Now().Format(util.ISO8601), 242 - }, 243 - } 244 - } 245 - 246 - // Add events in parallel 247 - for i := 0; i < n; i++ { 248 - err = evtman.AddEvent(ctx, inEvts[i]) 249 - if err != nil { 250 - t.Fatal(err) 251 - } 252 - } 253 - 254 - if err := dp.Flush(ctx); err != nil { 255 - t.Fatal(err) 256 - } 257 - 258 - outEvtCount := 0 259 - expectedEvtCount := n 260 - 261 - dp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 262 - outEvtCount++ 263 - return nil 264 - }) 265 - 266 - if outEvtCount != expectedEvtCount { 267 - t.Fatalf("expected %d events, got %d", expectedEvtCount, outEvtCount) 268 - } 269 - 270 - dp.Shutdown(ctx) 271 - 272 - time.Sleep(time.Millisecond * 100) 273 - 274 - dp2, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ 275 - EventsPerFile: 10, 276 - UIDCacheSize: 100000, 277 - DIDCacheSize: 100000, 278 - }) 279 - if err != nil { 280 - t.Fatal(err) 281 - } 282 - 283 - evtman2 := NewEventManager(dp2) 284 - 285 - inEvts = make([]*XRPCStreamEvent, n) 286 - for i := 0; i < n; i++ { 287 - cidLink := lexutil.LexLink(cid) 288 - headLink := lexutil.LexLink(userRepoHead) 289 - inEvts[i] = &XRPCStreamEvent{ 290 - RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 291 - Repo: "did:example:123", 292 - Commit: headLink, 293 - Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 294 - { 295 - Action: "add", 296 - Cid: &cidLink, 297 - Path: "path1", 298 - }, 299 - }, 300 - Time: time.Now().Format(util.ISO8601), 301 - }, 302 - } 303 - } 304 - 305 - for i := 0; i < n; i++ { 306 - err = evtman2.AddEvent(ctx, inEvts[i]) 307 - if err != nil { 308 - t.Fatal(err) 309 - } 310 - } 311 - } 312 - 313 172 func BenchmarkDiskPersist(b *testing.B) { 314 173 db, _, cs, tempPath, err := setupDBs(b) 315 174 if err != nil {