this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add those missing files

+255
+198
events/dbpersist.go
··· 1 + package events 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "time" 9 + 10 + "github.com/bluesky-social/indigo/carstore" 11 + "github.com/bluesky-social/indigo/models" 12 + "github.com/bluesky-social/indigo/util" 13 + cid "github.com/ipfs/go-cid" 14 + "gorm.io/gorm" 15 + ) 16 + 17 + type DbPersistence struct { 18 + db *gorm.DB 19 + 20 + cs *carstore.CarStore 21 + } 22 + 23 + type RepoEventRecord struct { 24 + Seq uint `gorm:"primarykey"` 25 + Commit util.DbCID 26 + Prev *util.DbCID 27 + 28 + Time time.Time 29 + Blobs []byte 30 + Repo uint 31 + Event string 32 + } 33 + 34 + func NewDbPersistence(db *gorm.DB, cs *carstore.CarStore) (*DbPersistence, error) { 35 + if err := db.AutoMigrate(&RepoEventRecord{}); err != nil { 36 + return nil, err 37 + } 38 + 39 + return &DbPersistence{ 40 + db: db, 41 + cs: cs, 42 + }, nil 43 + } 44 + 45 + func (p *DbPersistence) Persist(ctx context.Context, e *RepoStreamEvent) error { 46 + if e.Append == nil { 47 + return nil 48 + } 49 + 50 + evt := e.Append 51 + 52 + uid, err := p.uidForDid(ctx, evt.Repo) 53 + if err != nil { 54 + return err 55 + } 56 + 57 + var prev *util.DbCID 58 + if evt.Prev != nil { 59 + c, err := cid.Decode(*evt.Prev) 60 + if err != nil { 61 + return fmt.Errorf("decoding prev cid (%q): %w", *evt.Prev, err) 62 + } 63 + 64 + prev = &util.DbCID{c} 65 + } 66 + 67 + com, err := cid.Decode(evt.Commit) 68 + if err != nil { 69 + return err 70 + } 71 + 72 + var blobs []byte 73 + if len(evt.Blobs) > 0 { 74 + b, err := json.Marshal(evt.Blobs) 75 + if err != nil { 76 + return err 77 + } 78 + blobs = b 79 + } 80 + 81 + t, err := time.Parse(util.ISO8601, evt.Time) 82 + if err != nil { 83 + return err 84 + } 85 + 86 + rer := RepoEventRecord{ 87 + Commit: util.DbCID{com}, 88 + Prev: prev, 89 + Repo: uid, 90 + Event: evt.Event, 91 + Blobs: blobs, 92 + Time: t, 93 + } 94 + if err := p.db.Create(&rer).Error; err != nil { 95 + return err 96 + } 97 + 98 + e.Append.Seq = int64(rer.Seq) 99 + 100 + return nil 101 + } 102 + 103 + func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*RepoStreamEvent) error) error { 104 + rows, err := p.db.Model(RepoEventRecord{}).Where("seq > ?", since).Order("seq asc").Rows() 105 + if err != nil { 106 + return err 107 + } 108 + 109 + for rows.Next() { 110 + var evt RepoEventRecord 111 + if err := p.db.ScanRows(rows, &evt); err != nil { 112 + return err 113 + } 114 + 115 + ra, err := p.hydrate(ctx, &evt) 116 + if err != nil { 117 + return err 118 + } 119 + 120 + if err := cb(&RepoStreamEvent{Append: ra}); err != nil { 121 + return err 122 + } 123 + } 124 + 125 + return nil 126 + } 127 + 128 + func (p *DbPersistence) uidForDid(ctx context.Context, did string) (uint, error) { 129 + var u models.ActorInfo 130 + if err := p.db.First(&u, "did = ?", did).Error; err != nil { 131 + return 0, err 132 + } 133 + 134 + return u.Uid, nil 135 + } 136 + 137 + func (p *DbPersistence) didForUid(ctx context.Context, uid uint) (string, error) { 138 + var u models.ActorInfo 139 + if err := p.db.First(&u, "uid = ?", uid).Error; err != nil { 140 + return "", err 141 + } 142 + 143 + return u.Did, nil 144 + } 145 + 146 + func (p *DbPersistence) hydrate(ctx context.Context, rer *RepoEventRecord) (*RepoAppend, error) { 147 + var blobs []string 148 + if len(rer.Blobs) > 0 { 149 + if err := json.Unmarshal(rer.Blobs, &blobs); err != nil { 150 + return nil, err 151 + } 152 + } 153 + 154 + did, err := p.didForUid(ctx, rer.Repo) 155 + if err != nil { 156 + return nil, err 157 + } 158 + 159 + var prev *string 160 + if rer.Prev != nil { 161 + s := rer.Prev.CID.String() 162 + prev = &s 163 + } 164 + 165 + out := &RepoAppend{ 166 + Seq: int64(rer.Seq), 167 + Repo: did, 168 + Commit: rer.Commit.CID.String(), 169 + Prev: prev, 170 + Time: rer.Time.Format(util.ISO8601), 171 + Blobs: blobs, 172 + Event: rer.Event, 173 + } 174 + 175 + cs, err := p.readCarSlice(ctx, rer) 176 + if err != nil { 177 + return nil, fmt.Errorf("read car slice: %w", err) 178 + } 179 + 180 + out.Blocks = cs 181 + 182 + return out, nil 183 + } 184 + 185 + func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) { 186 + 187 + var early cid.Cid 188 + if rer.Prev != nil { 189 + early = rer.Prev.CID 190 + } 191 + 192 + buf := new(bytes.Buffer) 193 + if err := p.cs.ReadUserCar(ctx, rer.Repo, early, rer.Commit.CID, true, buf); err != nil { 194 + return nil, err 195 + } 196 + 197 + return buf.Bytes(), nil 198 + }
+57
util/dbcid.go
··· 1 + package util 2 + 3 + import ( 4 + "database/sql/driver" 5 + "encoding/json" 6 + "fmt" 7 + 8 + "github.com/ipfs/go-cid" 9 + ) 10 + 11 + // NB: copied from my estuary code 12 + 13 + type DbCID struct { 14 + CID cid.Cid 15 + } 16 + 17 + func (dbc *DbCID) Scan(v interface{}) error { 18 + b, ok := v.([]byte) 19 + if !ok { 20 + return fmt.Errorf("dbcids must get bytes!") 21 + } 22 + 23 + if len(b) == 0 { 24 + return nil 25 + } 26 + 27 + c, err := cid.Cast(b) 28 + if err != nil { 29 + return err 30 + } 31 + 32 + dbc.CID = c 33 + return nil 34 + } 35 + 36 + func (dbc DbCID) Value() (driver.Value, error) { 37 + return dbc.CID.Bytes(), nil 38 + } 39 + 40 + func (dbc DbCID) MarshalJSON() ([]byte, error) { 41 + return json.Marshal(dbc.CID.String()) 42 + } 43 + 44 + func (dbc *DbCID) UnmarshalJSON(b []byte) error { 45 + var s string 46 + if err := json.Unmarshal(b, &s); err != nil { 47 + return err 48 + } 49 + 50 + c, err := cid.Decode(s) 51 + if err != nil { 52 + return err 53 + } 54 + 55 + dbc.CID = c 56 + return nil 57 + }