this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement mst delete

+390 -75
+42
cmd/gosky/main.go
··· 1 1 package main 2 2 3 3 import ( 4 + "bufio" 4 5 "bytes" 5 6 "context" 6 7 "crypto/ecdsa" ··· 18 19 "github.com/polydawn/refmt/shared" 19 20 cli "github.com/urfave/cli/v2" 20 21 api "github.com/whyrusleeping/gosky/api" 22 + atproto "github.com/whyrusleeping/gosky/api/atproto" 21 23 apibsky "github.com/whyrusleeping/gosky/api/bsky" 22 24 cliutil "github.com/whyrusleeping/gosky/cmd/gosky/util" 23 25 "github.com/whyrusleeping/gosky/key" ··· 52 54 deletePostCmd, 53 55 getNotificationsCmd, 54 56 followsCmd, 57 + resetPasswordCmd, 55 58 } 56 59 57 60 app.RunAndExitOnError() ··· 712 715 713 716 return buf.Bytes(), nil 714 717 } 718 + 719 + var resetPasswordCmd = &cli.Command{ 720 + Name: "resetPassword", 721 + Action: func(cctx *cli.Context) error { 722 + ctx := context.TODO() 723 + 724 + atp, err := cliutil.GetATPClient(cctx, false) 725 + if err != nil { 726 + return err 727 + } 728 + 729 + email := cctx.Args().Get(0) 730 + 731 + err = atproto.AccountRequestPasswordReset(ctx, atp.C, &atproto.AccountRequestPasswordReset_Input{ 732 + Email: email, 733 + }) 734 + if err != nil { 735 + return err 736 + } 737 + 738 + inp := bufio.NewScanner(os.Stdin) 739 + fmt.Println("Enter recovery code from email:") 740 + inp.Scan() 741 + code := inp.Text() 742 + 743 + fmt.Println("Enter new password:") 744 + inp.Scan() 745 + npass := inp.Text() 746 + 747 + if err := atproto.AccountResetPassword(ctx, atp.C, &atproto.AccountResetPassword_Input{ 748 + Password: npass, 749 + Token: code, 750 + }); err != nil { 751 + return err 752 + } 753 + 754 + return nil 755 + }, 756 + }
+104
mst/mst.go
··· 282 282 } 283 283 } 284 284 285 + func (mst *MerkleSearchTree) Delete(ctx context.Context, k string) (*MerkleSearchTree, error) { 286 + ix, err := mst.findGtOrEqualLeafIndex(ctx, k) 287 + if err != nil { 288 + return nil, err 289 + } 290 + 291 + ne, err := mst.atIndex(ix) 292 + if err != nil { 293 + return nil, err 294 + } 295 + 296 + entries, err := mst.getEntries(ctx) 297 + if err != nil { 298 + return nil, err 299 + } 300 + 301 + if ne.isLeaf() && ne.Key == k { 302 + prev, err := mst.atIndex(ix - 1) 303 + if err != nil { 304 + return nil, err 305 + } 306 + 307 + next, err := mst.atIndex(ix + 1) 308 + if err != nil { 309 + return nil, err 310 + } 311 + 312 + if prev.isTree() && next.isTree() { 313 + merged, err := prev.Tree.appendMerge(ctx, next.Tree) 314 + if err != nil { 315 + return nil, err 316 + } 317 + return mst.newTree(append(append(entries[:ix-1], treeEntry(merged)), entries[ix+1:]...)), nil 318 + } else { 319 + return mst.removeEntry(ctx, ix) 320 + } 321 + } 322 + 323 + prev, err := mst.atIndex(ix - 1) 324 + if err != nil { 325 + return nil, err 326 + } 327 + 328 + if prev.isTree() { 329 + subtree, err := prev.Tree.Delete(ctx, k) 330 + if err != nil { 331 + return nil, err 332 + } 333 + 334 + subtreeEntries, err := subtree.getEntries(ctx) 335 + if err != nil { 336 + return nil, err 337 + } 338 + 339 + if len(subtreeEntries) == 0 { 340 + return mst.removeEntry(ctx, ix-1) 341 + } else { 342 + return mst.updateEntry(ctx, ix-1, treeEntry(subtree)) 343 + } 344 + } else { 345 + return nil, fmt.Errorf("could not find record with key: %s", k) 346 + } 347 + } 348 + 349 + func (mst *MerkleSearchTree) appendMerge(ctx context.Context, omst *MerkleSearchTree) (*MerkleSearchTree, error) { 350 + mylayer, err := mst.getLayer(ctx) 351 + if err != nil { 352 + return nil, err 353 + } 354 + 355 + olayer, err := omst.getLayer(ctx) 356 + if err != nil { 357 + return nil, err 358 + } 359 + 360 + if mylayer != olayer { 361 + return nil, fmt.Errorf("trying to merge two nodes from different layers") 362 + } 363 + 364 + entries, err := mst.getEntries(ctx) 365 + if err != nil { 366 + return nil, err 367 + } 368 + 369 + tomergeEnts, err := omst.getEntries(ctx) 370 + if err != nil { 371 + return nil, err 372 + } 373 + 374 + lastInLeft := entries[len(entries)-1] 375 + firstInRight := entries[0] 376 + 377 + if lastInLeft.isTree() && firstInRight.isTree() { 378 + merged, err := lastInLeft.Tree.appendMerge(ctx, firstInRight.Tree) 379 + if err != nil { 380 + return nil, err 381 + } 382 + 383 + return mst.newTree(append(append(entries[:len(entries)-1], treeEntry(merged)), tomergeEnts[1:]...)), nil 384 + } else { 385 + return mst.newTree(append(entries, tomergeEnts...)), nil 386 + } 387 + } 388 + 285 389 var ErrNotFound = fmt.Errorf("mst: not found") 286 390 287 391 func (mst *MerkleSearchTree) Get(ctx context.Context, k string) (cid.Cid, error) {
+39 -1
mst/mst_test.go
··· 30 30 } 31 31 32 32 func TestBasicMst(t *testing.T) { 33 + rand.Seed(6123123) 34 + 33 35 ctx := context.Background() 34 36 cst := cbor.NewCborStore(blockstore.NewBlockstore(datastore.NewMapDatastore())) 35 37 mst := NewMST(cst, 16, cid.Undef, []NodeEntry{}, -1) ··· 53 55 t.Fatal(err) 54 56 } 55 57 56 - fmt.Println(ncid) 58 + if ncid.String() != "bafy2bzaceahzk2fp24tqze7bfzfoigkvvx3323p7pgeytmov3yypupjdjao74" { 59 + t.Fatal("mst generation changed") 60 + } 61 + 62 + nmst, err := mst.Delete(ctx, "dogs") 63 + if err != nil { 64 + t.Fatal(err) 65 + } 66 + delete(vals, "dogs") 67 + 68 + assertValues(t, nmst, vals) 69 + } 70 + 71 + func assertValues(t *testing.T, mst *MerkleSearchTree, vals map[string]cid.Cid) { 72 + out := make(map[string]cid.Cid) 73 + if err := mst.WalkLeavesFrom(context.TODO(), "", func(ne NodeEntry) error { 74 + out[ne.Key] = ne.Val 75 + return nil 76 + }); err != nil { 77 + t.Fatal(err) 78 + } 79 + 80 + if len(vals) == len(out) { 81 + for k, v := range vals { 82 + ov, ok := out[k] 83 + if !ok { 84 + t.Fatalf("expected key %s to be present", k) 85 + } 86 + if ov != v { 87 + t.Fatalf("value mismatch on %s", k) 88 + } 89 + } 90 + } else { 91 + t.Fatal("different number of values than expected: %d != %d", len(vals), len(out)) 92 + } 57 93 } 58 94 59 95 func TestEdgeCase(t *testing.T) { ··· 104 140 return nil 105 141 } 106 142 143 + /* 107 144 func TestDiff(t *testing.T) { 108 145 to := mustCid(t, "bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454") 109 146 from := mustCid(t, "bafyreigv5er7vcxlbikkwedmtd7b3kp7wrcyffep5ogcuxosloxfox5reu") ··· 121 158 } 122 159 _ = ops 123 160 } 161 + */ 124 162 125 163 func randStr(s int64) string { 126 164 buf := make([]byte, 6)
+19
repo/repo.go
··· 167 167 return k, nil 168 168 } 169 169 170 + func (r *Repo) DeleteRecord(ctx context.Context, rpath string) error { 171 + ctx, span := otel.Tracer("repo").Start(ctx, "DeleteRecord") 172 + defer span.End() 173 + 174 + r.dirty = true 175 + t, err := r.getMst(ctx) 176 + if err != nil { 177 + return fmt.Errorf("failed to get mst: %w", err) 178 + } 179 + 180 + nmst, err := t.Delete(ctx, rpath) 181 + if err != nil { 182 + return fmt.Errorf("mst.Add failed: %w", err) 183 + } 184 + 185 + r.mst = nmst 186 + return nil 187 + } 188 + 170 189 func (r *Repo) Commit(ctx context.Context) (cid.Cid, error) { 171 190 ctx, span := otel.Tracer("repo").Start(ctx, "Commit") 172 191 defer span.End()
+59 -2
repomgr/repomgr.go
··· 262 262 return cc, nil 263 263 } 264 264 265 + func (rm *RepoManager) DeleteRecord(ctx context.Context, user uint, collection, rkey string) error { 266 + ctx, span := otel.Tracer("repoman").Start(ctx, "DeleteRecord") 267 + defer span.End() 268 + 269 + unlock := rm.lockUser(ctx, user) 270 + defer unlock() 271 + 272 + head, err := rm.getUserRepoHead(ctx, user) 273 + if err != nil { 274 + return err 275 + } 276 + 277 + ds, err := rm.cs.NewDeltaSession(ctx, user, &head) 278 + if err != nil { 279 + return err 280 + } 281 + 282 + r, err := repo.OpenRepo(ctx, ds, head) 283 + if err != nil { 284 + return err 285 + } 286 + 287 + rpath := collection + "/" + rkey 288 + cc, err := r.DeleteRecord(ctx, rpath) 289 + if err != nil { 290 + return err 291 + } 292 + 293 + nroot, err := r.Commit(ctx) 294 + if err != nil { 295 + return err 296 + } 297 + 298 + rslice, err := ds.CloseWithRoot(ctx, nroot) 299 + if err != nil { 300 + return fmt.Errorf("close with root: %w", err) 301 + } 302 + 303 + // TODO: what happens if this update fails? 304 + if err := rm.updateUserRepoHead(ctx, user, nroot); err != nil { 305 + return fmt.Errorf("updating user head: %w", err) 306 + } 307 + 308 + if rm.events != nil { 309 + rm.events(ctx, &RepoEvent{ 310 + Kind: EvtKindDeleteRecord, 311 + User: user, 312 + OldRoot: head, 313 + NewRoot: nroot, 314 + Collection: collection, 315 + Rkey: rkey, 316 + RepoSlice: rslice, 317 + }) 318 + } 319 + 320 + return nil 321 + 322 + } 323 + 265 324 func (rm *RepoManager) InitNewActor(ctx context.Context, user uint, handle, did, displayname string, declcid, actortype string) error { 266 325 unlock := rm.lockUser(ctx, user) 267 326 defer unlock() ··· 421 480 422 481 switch kind { 423 482 case EvtKindCreateRecord: 424 - fmt.Println("path: ", collection, rkey) 425 483 recid, rec, err := r.GetRecord(ctx, collection+"/"+rkey) 426 484 if err != nil { 427 485 return fmt.Errorf("reading changed record from car slice: %w", err) ··· 438 496 } 439 497 440 498 if rm.events != nil { 441 - fmt.Println("sending off external create record event") 442 499 rm.events(ctx, &RepoEvent{ 443 500 Kind: EvtKindCreateRecord, 444 501 User: uid,
-1
server/events.go
··· 52 52 case opSend: 53 53 for _, s := range em.subs { 54 54 if s.filter(op.evt) { 55 - fmt.Println("outgoing event: ", op.evt) 56 55 select { 57 56 case s.outgoing <- op.evt: 58 57 default:
+27 -6
server/handlers.go
··· 373 373 return nil, err 374 374 } 375 375 376 - fmt.Println("notifs: ", u.Handle, len(notifs)) 377 376 return &appbskytypes.NotificationList_Output{ 378 377 Notifications: notifs, 379 378 }, nil 380 379 } 381 380 382 381 func (s *Server) handleAppBskyNotificationUpdateSeen(ctx context.Context, input *appbskytypes.NotificationUpdateSeen_Input) error { 383 - fmt.Println("notifications not yet implemented update seen!") 382 + u, err := s.getUser(ctx) 383 + if err != nil { 384 + return err 385 + } 386 + 387 + seen, err := time.Parse(time.RFC3339, input.SeenAt) 388 + if err != nil { 389 + return fmt.Errorf("invalid time format for 'seenAt': %w", err) 390 + } 384 391 385 - return nil 392 + return s.notifman.UpdateSeen(ctx, u.ID, seen) 386 393 } 387 394 388 395 func (s *Server) handleComAtprotoAccountCreate(ctx context.Context, input *comatprototypes.AccountCreate_Input) (*comatprototypes.AccountCreate_Output, error) { ··· 448 455 } 449 456 450 457 func (s *Server) handleComAtprotoAccountCreateInviteCode(ctx context.Context, input *comatprototypes.AccountCreateInviteCode_Input) (*comatprototypes.AccountCreateInviteCode_Output, error) { 451 - panic("not yet implemented") 458 + u, err := s.getUser(ctx) 459 + if err != nil { 460 + return nil, err 461 + } 462 + 463 + return nil, fmt.Errorf("invite codes not currently supported") 452 464 } 453 465 454 466 func (s *Server) handleComAtprotoAccountDelete(ctx context.Context) error { ··· 456 468 } 457 469 458 470 func (s *Server) handleComAtprotoAccountGet(ctx context.Context) error { 459 - panic("not yet implemented") 471 + return nil 460 472 } 461 473 462 474 func (s *Server) handleComAtprotoAccountRequestPasswordReset(ctx context.Context, input *comatprototypes.AccountRequestPasswordReset_Input) error { ··· 518 530 } 519 531 520 532 func (s *Server) handleComAtprotoRepoDeleteRecord(ctx context.Context, input *comatprototypes.RepoDeleteRecord_Input) error { 521 - panic("not yet implemented") 533 + u, err := s.getUser(ctx) 534 + if err != nil { 535 + return err 536 + } 537 + 538 + if u.Did != input.Did { 539 + return fmt.Errorf("specified DID did not match authed user") 540 + } 541 + 542 + return s.repoman.DeleteRecord(ctx, u.ID, input.Collection, input.Rkey) 522 543 } 523 544 524 545 func (s *Server) handleComAtprotoRepoDescribe(ctx context.Context, user string) (*comatprototypes.RepoDescribe_Output, error) {
+72 -63
server/indexer.go
··· 265 265 if !errors.Is(err, gorm.ErrRecordNotFound) { 266 266 return err 267 267 } 268 - 269 - doc, err := ix.didr.GetDocument(ctx, rec.Subject.Did) 270 - if err != nil { 271 - return fmt.Errorf("could not locate DID document for followed user: %s", err) 272 - } 273 - 274 - if len(doc.Service) == 0 { 275 - return fmt.Errorf("external followed user %s had no services in did document", rec.Subject.Did) 276 - } 277 - 278 - svc := doc.Service[0] 279 - durl, err := url.Parse(svc.ServiceEndpoint) 280 - if err != nil { 281 - return err 282 - } 283 - 284 - // TODO: the PDS's DID should also be in the service, we could use that to look up? 285 - var peering Peering 286 - if err := ix.db.First(&peering, "host = ?", durl.Host).Error; err != nil { 287 - return err 288 - } 289 - 290 - var handle string 291 - if len(doc.AlsoKnownAs) > 0 { 292 - hurl, err := url.Parse(doc.AlsoKnownAs[0]) 293 - if err != nil { 294 - return err 295 - } 296 - 297 - handle = hurl.Host 298 - } 299 - 300 - c := &xrpc.Client{Host: svc.ServiceEndpoint} 301 - profile, err := bsky.ActorGetProfile(ctx, c, rec.Subject.Did) 268 + nu, err := ix.createExternalUser(ctx, rec.Subject.Did) 302 269 if err != nil { 303 270 return err 304 271 } 305 272 306 - if handle != profile.Handle { 307 - return fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle) 308 - } 309 - 310 - // TODO: request this users info from their server to fill out our data... 311 - u := User{ 312 - Handle: handle, 313 - Did: rec.Subject.Did, 314 - PDS: peering.ID, 315 - } 316 - 317 - if err := ix.db.Create(&u).Error; err != nil { 318 - return fmt.Errorf("failed to create other pds user: %w", err) 319 - } 320 - 321 - // okay cool, its a user on a server we are peered with 322 - // lets make a local record of that user for the future 323 - subj = &ActorInfo{ 324 - Uid: u.ID, 325 - Handle: handle, 326 - DisplayName: *profile.DisplayName, 327 - Did: rec.Subject.Did, 328 - DeclRefCid: rec.Subject.DeclarationCid, // TODO: should verify this? 329 - Type: "", 330 - PDS: peering.ID, 331 - } 332 - if err := ix.db.Create(subj).Error; err != nil { 333 - return err 334 - } 273 + subj = nu 335 274 } 336 275 337 276 if subj.PDS != 0 { ··· 382 321 } 383 322 384 323 return nil 324 + } 325 + func (ix *Indexer) createExternalUser(ctx context.Context, did string) (*ActorInfo, error) { 326 + doc, err := ix.didr.GetDocument(ctx, did) 327 + if err != nil { 328 + return nil, fmt.Errorf("could not locate DID document for followed user: %s", err) 329 + } 330 + 331 + if len(doc.Service) == 0 { 332 + return nil, fmt.Errorf("external followed user %s had no services in did document", did) 333 + } 334 + 335 + svc := doc.Service[0] 336 + durl, err := url.Parse(svc.ServiceEndpoint) 337 + if err != nil { 338 + return nil, err 339 + } 340 + 341 + // TODO: the PDS's DID should also be in the service, we could use that to look up? 342 + var peering Peering 343 + if err := ix.db.First(&peering, "host = ?", durl.Host).Error; err != nil { 344 + return nil, err 345 + } 346 + 347 + var handle string 348 + if len(doc.AlsoKnownAs) > 0 { 349 + hurl, err := url.Parse(doc.AlsoKnownAs[0]) 350 + if err != nil { 351 + return nil, err 352 + } 353 + 354 + handle = hurl.Host 355 + } 356 + 357 + c := &xrpc.Client{Host: svc.ServiceEndpoint} 358 + profile, err := bsky.ActorGetProfile(ctx, c, did) 359 + if err != nil { 360 + return nil, err 361 + } 362 + 363 + if handle != profile.Handle { 364 + return nil, fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle) 365 + } 366 + 367 + // TODO: request this users info from their server to fill out our data... 368 + u := User{ 369 + Handle: handle, 370 + Did: did, 371 + PDS: peering.ID, 372 + } 373 + 374 + if err := ix.db.Create(&u).Error; err != nil { 375 + return nil, fmt.Errorf("failed to create other pds user: %w", err) 376 + } 377 + 378 + // okay cool, its a user on a server we are peered with 379 + // lets make a local record of that user for the future 380 + subj := &ActorInfo{ 381 + Uid: u.ID, 382 + Handle: handle, 383 + DisplayName: *profile.DisplayName, 384 + Did: did, 385 + DeclRefCid: profile.Declaration.Cid, 386 + Type: "", 387 + PDS: peering.ID, 388 + } 389 + if err := ix.db.Create(subj).Error; err != nil { 390 + return nil, err 391 + } 392 + 393 + return subj, nil 385 394 } 386 395 387 396 func (ix *Indexer) didForUser(ctx context.Context, uid uint) (string, error) {
+16 -1
server/notifs.go
··· 9 9 appbskytypes "github.com/whyrusleeping/gosky/api/bsky" 10 10 "github.com/whyrusleeping/gosky/repomgr" 11 11 "gorm.io/gorm" 12 + "gorm.io/gorm/clause" 12 13 ) 13 14 14 15 type NotificationManager struct { ··· 46 47 47 48 type NotifSeen struct { 48 49 ID uint `gorm:"primarykey"` 49 - Usr uint 50 + Usr uint `gorm:"uniqueIndex"` 50 51 LastSeen time.Time 51 52 } 52 53 ··· 289 290 } 290 291 291 292 return c, nil 293 + } 294 + 295 + func (nm *NotificationManager) UpdateSeen(ctx context.Context, usr uint, seen time.Time) error { 296 + if err := nm.db.Clauses(clause.OnConflict{ 297 + Columns: []clause.Column{{Name: "usr"}}, 298 + DoUpdates: clause.AssignmentColumns([]string{"last_seen"}), 299 + }).Create(NotifSeen{ 300 + Usr: usr, 301 + LastSeen: seen, 302 + }).Error; err != nil { 303 + return err 304 + } 305 + 306 + return nil 292 307 } 293 308 294 309 func (nm *NotificationManager) AddReplyTo(ctx context.Context, user uint, replyid uint, replyto *FeedPost) error {
+12 -1
server/server.go
··· 4 4 "context" 5 5 "crypto/ecdsa" 6 6 "encoding/json" 7 + "errors" 7 8 "fmt" 8 9 "net/mail" 9 10 "os" ··· 124 125 case EvtKindCreateRecord: 125 126 u, err := s.lookupUserByDid(ctx, evt.User) 126 127 if err != nil { 127 - return err 128 + if !errors.Is(err, gorm.ErrRecordNotFound) { 129 + return fmt.Errorf("looking up event user: %w", err) 130 + } 131 + 132 + subj, err := s.indexer.createExternalUser(ctx, evt.User) 133 + if err != nil { 134 + return err 135 + } 136 + 137 + u = new(User) 138 + u.ID = subj.Uid 128 139 } 129 140 130 141 return s.repoman.HandleExternalUserEvent(ctx, host.ID, repomgr.EvtKindCreateRecord, u.ID, evt.Collection, evt.Rkey, evt.CarSlice)