this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add repomgr code

why 0a53da36 81b2cecc

+232
+232
repomgr/repomgr.go
··· 1 + package repomgr 2 + 3 + import ( 4 + "context" 5 + "io" 6 + "sync" 7 + 8 + "github.com/ipfs/go-cid" 9 + cbg "github.com/whyrusleeping/cbor-gen" 10 + "github.com/whyrusleeping/gosky/carstore" 11 + "github.com/whyrusleeping/gosky/repo" 12 + "gorm.io/gorm" 13 + ) 14 + 15 + func NewRepoManager(db *gorm.DB, cs *carstore.CarStore, cb func(*RepoEvent)) *RepoManager { 16 + db.AutoMigrate(RepoHead{}) 17 + 18 + return &RepoManager{ 19 + db: db, 20 + cs: cs, 21 + events: cb, 22 + userLocks: make(map[uint]*userLock), 23 + } 24 + } 25 + 26 + type RepoManager struct { 27 + cs *carstore.CarStore 28 + db *gorm.DB 29 + 30 + lklk sync.Mutex 31 + userLocks map[uint]*userLock 32 + 33 + events func(*RepoEvent) 34 + } 35 + 36 + type ActorInfo struct { 37 + Did string 38 + Handle string 39 + DisplayName string 40 + DeclRefCid string 41 + Type string 42 + } 43 + 44 + type RepoEvent struct { 45 + Kind string 46 + User uint 47 + OldRoot cid.Cid 48 + NewRoot cid.Cid 49 + Collection string 50 + Rkey string 51 + RecCid cid.Cid 52 + Record any 53 + ActorInfo *ActorInfo 54 + } 55 + 56 + type RepoHead struct { 57 + gorm.Model 58 + User uint 59 + Root string 60 + } 61 + 62 + type userLock struct { 63 + lk sync.Mutex 64 + count int 65 + } 66 + 67 + func (rm *RepoManager) lockUser(user uint) func() { 68 + rm.lklk.Lock() 69 + 70 + ulk, ok := rm.userLocks[user] 71 + if !ok { 72 + ulk = &userLock{} 73 + rm.userLocks[user] = ulk 74 + } 75 + 76 + ulk.count++ 77 + 78 + rm.lklk.Unlock() 79 + 80 + ulk.lk.Lock() 81 + 82 + return func() { 83 + rm.lklk.Lock() 84 + 85 + ulk.lk.Unlock() 86 + ulk.count-- 87 + 88 + if ulk.count == 0 { 89 + delete(rm.userLocks, user) 90 + } 91 + rm.lklk.Unlock() 92 + } 93 + } 94 + 95 + func (rm *RepoManager) getUserRepoHead(ctx context.Context, user uint) (cid.Cid, error) { 96 + var headrec RepoHead 97 + if err := rm.db.First(&headrec, "user = ?", user).Error; err != nil { 98 + return cid.Undef, err 99 + } 100 + 101 + cc, err := cid.Decode(headrec.Root) 102 + if err != nil { 103 + return cid.Undef, err 104 + } 105 + 106 + return cc, nil 107 + } 108 + 109 + func (rm *RepoManager) updateUserRepoHead(ctx context.Context, user uint, root cid.Cid) error { 110 + if err := rm.db.Model(RepoHead{}).Where("user = ?", user).Update("root", root.String()).Error; err != nil { 111 + return err 112 + } 113 + 114 + return nil 115 + } 116 + 117 + func (rm *RepoManager) CreateRecord(ctx context.Context, user uint, collection string, rec cbg.CBORMarshaler) (string, cid.Cid, error) { 118 + ntid := repo.NextTID() 119 + 120 + unlock := rm.lockUser(user) 121 + defer unlock() 122 + 123 + rkey := collection + "/" + ntid 124 + 125 + head, err := rm.getUserRepoHead(ctx, user) 126 + if err != nil { 127 + return "", cid.Undef, err 128 + } 129 + 130 + ds, err := rm.cs.NewDeltaSession(user, head) 131 + if err != nil { 132 + return "", cid.Undef, err 133 + } 134 + 135 + r, err := repo.OpenRepo(ctx, ds, head) 136 + if err != nil { 137 + return "", cid.Undef, err 138 + } 139 + 140 + cc, err := r.CreateRecord(ctx, rkey, rec) 141 + if err != nil { 142 + return "", cid.Undef, err 143 + } 144 + 145 + nroot, err := r.Commit(ctx) 146 + if err != nil { 147 + return "", cid.Undef, err 148 + } 149 + 150 + if err := ds.CloseWithRoot(ctx, nroot); err != nil { 151 + return "", cid.Undef, err 152 + } 153 + 154 + // TODO: what happens if this update fails? 155 + if err := rm.updateUserRepoHead(ctx, user, nroot); err != nil { 156 + return "", cid.Undef, err 157 + } 158 + 159 + if rm.events != nil { 160 + rm.events(&RepoEvent{ 161 + Kind: "createRecord", 162 + User: user, 163 + OldRoot: head, 164 + NewRoot: nroot, 165 + Collection: collection, 166 + Rkey: rkey, 167 + Record: rec, 168 + RecCid: cc, 169 + }) 170 + } 171 + 172 + return rkey, cc, nil 173 + } 174 + 175 + func (rm *RepoManager) InitNewActor(ctx context.Context, user uint, handle, did, displayname string, declcid, actortype string) error { 176 + unlock := rm.lockUser(user) 177 + defer unlock() 178 + 179 + ds, err := rm.cs.NewDeltaSession(user, cid.Undef) 180 + if err != nil { 181 + return err 182 + } 183 + 184 + r := repo.NewRepo(ctx, ds) 185 + 186 + // TODO: set displayname? 187 + // TODO: set declaration? 188 + 189 + root, err := r.Commit(ctx) 190 + if err != nil { 191 + return err 192 + } 193 + 194 + if err := ds.CloseWithRoot(ctx, root); err != nil { 195 + return err 196 + } 197 + 198 + if err := rm.db.Create(&RepoHead{ 199 + User: user, 200 + Root: root.String(), 201 + }).Error; err != nil { 202 + return err 203 + } 204 + 205 + if rm.events != nil { 206 + rm.events(&RepoEvent{ 207 + Kind: "initActor", 208 + User: user, 209 + NewRoot: root, 210 + ActorInfo: &ActorInfo{ 211 + Did: did, 212 + Handle: handle, 213 + DisplayName: displayname, 214 + DeclRefCid: declcid, 215 + Type: actortype, 216 + }, 217 + }) 218 + } 219 + 220 + return nil 221 + } 222 + 223 + func (rm *RepoManager) GetRepoRoot(ctx context.Context, user uint) (cid.Cid, error) { 224 + unlock := rm.lockUser(user) 225 + defer unlock() 226 + 227 + return rm.getUserRepoHead(ctx, user) 228 + } 229 + 230 + func (rm *RepoManager) ReadRepo(ctx context.Context, user uint, fromcid cid.Cid, w io.Writer) error { 231 + return rm.cs.ReadUserCar(ctx, user, fromcid, true, w) 232 + }