this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

persisters now handle tombstone events (#370)

authored by

Whyrusleeping and committed by
GitHub
29ee1cee c28f9098

+82 -3
+6
carstore/bs.go
··· 1228 1228 return nil, err 1229 1229 } 1230 1230 1231 + span.SetAttributes(attribute.Int("shards", len(shards))) 1232 + 1231 1233 var shardIds []uint 1232 1234 for _, s := range shards { 1233 1235 shardIds = append(shardIds, s.ID) ··· 1243 1245 return nil, fmt.Errorf("getting block refs failed: %w", err) 1244 1246 } 1245 1247 1248 + span.SetAttributes(attribute.Int("blockRefs", len(brefs))) 1249 + 1246 1250 var staleRefs []staleRef 1247 1251 if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil { 1248 1252 return nil, err 1249 1253 } 1254 + 1255 + span.SetAttributes(attribute.Int("staleRefs", len(staleRefs))) 1250 1256 1251 1257 stale := make(map[cid.Cid]bool) 1252 1258 for _, br := range staleRefs {
+14
cmd/gosky/main.go
··· 1145 1145 1146 1146 return nil 1147 1147 }, 1148 + RepoTombstone: func(tomb *comatproto.SyncSubscribeRepos_Tombstone) error { 1149 + if jsonfmt { 1150 + b, err := json.Marshal(tomb) 1151 + if err != nil { 1152 + return err 1153 + } 1154 + fmt.Println(string(b)) 1155 + } else { 1156 + fmt.Printf("(%d) Tombstone: %s\n", tomb.Seq, tomb.Did) 1157 + } 1158 + 1159 + return nil 1160 + 1161 + }, 1148 1162 // TODO: all the other event types 1149 1163 Error: func(errf *events.ErrorFrame) error { 1150 1164 return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message)
+41
events/dbpersist.go
··· 165 165 e.RepoCommit.Seq = int64(item.Seq) 166 166 case e.RepoHandle != nil: 167 167 e.RepoHandle.Seq = int64(item.Seq) 168 + case e.RepoTombstone != nil: 169 + e.RepoTombstone.Seq = int64(item.Seq) 168 170 default: 169 171 return fmt.Errorf("unknown event type") 170 172 } ··· 209 211 if err != nil { 210 212 return err 211 213 } 214 + case e.RepoTombstone != nil: 215 + rer, err = p.RecordFromTombstone(ctx, e.RepoTombstone) 216 + if err != nil { 217 + return err 218 + } 212 219 default: 213 220 return nil 214 221 } ··· 239 246 }, nil 240 247 } 241 248 249 + func (p *DbPersistence) RecordFromTombstone(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Tombstone) (*RepoEventRecord, error) { 250 + t, err := time.Parse(util.ISO8601, evt.Time) 251 + if err != nil { 252 + return nil, err 253 + } 254 + 255 + uid, err := p.uidForDid(ctx, evt.Did) 256 + if err != nil { 257 + return nil, err 258 + } 259 + 260 + return &RepoEventRecord{ 261 + Repo: uid, 262 + Type: "repo_tombstone", 263 + Time: t, 264 + }, nil 265 + } 266 + 242 267 func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*RepoEventRecord, error) { 243 268 // TODO: hack hack hack 244 269 if len(evt.Ops) > 8192 { ··· 371 396 streamEvent, err = p.hydrateCommit(ctx, record) 372 397 case record.NewHandle != nil: 373 398 streamEvent, err = p.hydrateHandleChange(ctx, record) 399 + case record.Type == "repo_tombstone": 400 + streamEvent, err = p.hydrateTombstone(ctx, record) 374 401 default: 375 402 err = fmt.Errorf("unknown event type: %s", record.Type) 376 403 } ··· 448 475 Did: did, 449 476 Handle: *rer.NewHandle, 450 477 Time: rer.Time.Format(util.ISO8601), 478 + }, 479 + }, nil 480 + } 481 + 482 + func (p *DbPersistence) hydrateTombstone(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) { 483 + did, err := p.didForUid(ctx, rer.Repo) 484 + if err != nil { 485 + return nil, err 486 + } 487 + 488 + return &XRPCStreamEvent{ 489 + RepoTombstone: &comatproto.SyncSubscribeRepos_Tombstone{ 490 + Did: did, 491 + Time: rer.Time.Format(util.ISO8601), 451 492 }, 452 493 }, nil 453 494 }
+21 -3
events/diskpersist.go
··· 273 273 } 274 274 275 275 const ( 276 - evtKindCommit = 1 277 - evtKindHandle = 2 276 + evtKindCommit = 1 277 + evtKindHandle = 2 278 + evtKindTombstone = 3 278 279 ) 279 280 280 281 var emptyHeader = make([]byte, headerSize) ··· 450 451 e.RepoCommit.Seq = seq 451 452 case e.RepoHandle != nil: 452 453 e.RepoHandle.Seq = seq 454 + case e.RepoTombstone != nil: 455 + e.RepoTombstone.Seq = seq 453 456 default: 454 - // only those two get peristed right now 457 + // only those three get peristed right now 455 458 // we shouldnt actually ever get here... 456 459 return nil 457 460 } ··· 502 505 if err := e.RepoHandle.MarshalCBOR(cw); err != nil { 503 506 return fmt.Errorf("failed to marshal: %w", err) 504 507 } 508 + case e.RepoTombstone != nil: 509 + evtKind = evtKindTombstone 510 + did = e.RepoTombstone.Did 511 + if err := e.RepoTombstone.MarshalCBOR(cw); err != nil { 512 + return fmt.Errorf("failed to marshal: %w", err) 513 + } 505 514 default: 506 515 return nil 507 516 // only those two get peristed right now ··· 689 698 } 690 699 evt.Seq = h.Seq 691 700 if err := cb(&XRPCStreamEvent{RepoHandle: &evt}); err != nil { 701 + return err 702 + } 703 + case evtKindTombstone: 704 + var evt atproto.SyncSubscribeRepos_Tombstone 705 + if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 706 + return err 707 + } 708 + evt.Seq = h.Seq 709 + if err := cb(&XRPCStreamEvent{RepoTombstone: &evt}); err != nil { 692 710 return err 693 711 } 694 712 default: