this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor out core pds components

+690 -683
+2 -1
cmd/pds/main.go
··· 11 11 "github.com/lestrrat-go/jwx/v2/jwk" 12 12 "github.com/urfave/cli/v2" 13 13 "github.com/whyrusleeping/gosky/carstore" 14 + "github.com/whyrusleeping/gosky/plc" 14 15 server "github.com/whyrusleeping/gosky/server" 15 16 "gorm.io/driver/postgres" 16 17 "gorm.io/driver/sqlite" ··· 118 119 } 119 120 120 121 pdshost := cctx.String("pdshost") 121 - srv, err := server.NewServer(db, cs, "server.key", ".pdstest", pdshost, server.NewFakeDid(db), []byte("jwtsecretplaceholder")) 122 + srv, err := server.NewServer(db, cs, "server.key", ".pdstest", pdshost, plc.NewFakeDid(db), []byte("jwtsecretplaceholder")) 122 123 if err != nil { 123 124 return err 124 125 }
+378
indexer/indexer.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "log" 8 + "strings" 9 + 10 + bsky "github.com/whyrusleeping/gosky/api/bsky" 11 + "github.com/whyrusleeping/gosky/events" 12 + "github.com/whyrusleeping/gosky/notifs" 13 + "github.com/whyrusleeping/gosky/plc" 14 + "github.com/whyrusleeping/gosky/repomgr" 15 + "github.com/whyrusleeping/gosky/types" 16 + "go.opentelemetry.io/otel" 17 + "gorm.io/gorm" 18 + ) 19 + 20 + type Indexer struct { 21 + db *gorm.DB 22 + 23 + notifman *notifs.NotificationManager 24 + events *events.EventManager 25 + didr plc.PLCClient 26 + 27 + SendRemoteFollow func(context.Context, string, uint) error 28 + CreateExternalUser func(context.Context, string) (*types.ActorInfo, error) 29 + } 30 + 31 + func NewIndexer(db *gorm.DB, notifman *notifs.NotificationManager, evtman *events.EventManager, didr plc.PLCClient) (*Indexer, error) { 32 + db.AutoMigrate(&types.FeedPost{}) 33 + db.AutoMigrate(&types.ActorInfo{}) 34 + db.AutoMigrate(&types.FollowRecord{}) 35 + db.AutoMigrate(&types.VoteRecord{}) 36 + db.AutoMigrate(&types.RepostRecord{}) 37 + 38 + return &Indexer{ 39 + db: db, 40 + notifman: notifman, 41 + events: evtman, 42 + didr: didr, 43 + }, nil 44 + } 45 + 46 + func (ix *Indexer) catchup(ctx context.Context, evt *repomgr.RepoEvent) error { 47 + // TODO: catch up on events that happened since this event (in the event of a crash or downtime) 48 + return nil 49 + } 50 + 51 + func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) { 52 + ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent") 53 + defer span.End() 54 + 55 + if err := ix.catchup(ctx, evt); err != nil { 56 + log.Println("failed to catch up on user repo changes, processing events off base: ", err) 57 + } 58 + 59 + fmt.Println("Handling Event!", evt.Kind) 60 + 61 + switch evt.Kind { 62 + case repomgr.EvtKindCreateRecord: 63 + if err := ix.handleRecordCreate(ctx, evt, true); err != nil { 64 + log.Println("handle recordCreate: ", err) 65 + } 66 + case repomgr.EvtKindInitActor: 67 + if err := ix.handleInitActor(ctx, evt); err != nil { 68 + log.Println("handle initActor: ", err) 69 + } 70 + default: 71 + log.Println("unrecognized repo event type: ", evt.Kind) 72 + } 73 + } 74 + 75 + func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEvent, local bool) error { 76 + fmt.Println("record create event", evt.Collection) 77 + var relevantPds []uint 78 + switch rec := evt.Record.(type) { 79 + case *bsky.FeedPost: 80 + var replyid uint 81 + if rec.Reply != nil { 82 + replyto, err := ix.GetPost(ctx, rec.Reply.Parent.Uri) 83 + if err != nil { 84 + return err 85 + } 86 + 87 + replyid = replyto.ID 88 + } 89 + 90 + fp := types.FeedPost{ 91 + Rkey: evt.Rkey, 92 + Cid: evt.RecCid.String(), 93 + Author: evt.User, 94 + ReplyTo: replyid, 95 + } 96 + if err := ix.db.Create(&fp).Error; err != nil { 97 + return err 98 + } 99 + 100 + if err := ix.addNewPostNotification(ctx, rec, &fp); err != nil { 101 + return err 102 + } 103 + 104 + case *bsky.FeedRepost: 105 + fp, err := ix.GetPost(ctx, rec.Subject.Uri) 106 + if err != nil { 107 + return err 108 + } 109 + 110 + author, err := ix.lookupUser(ctx, fp.Author) 111 + if err != nil { 112 + return err 113 + } 114 + 115 + relevantPds = append(relevantPds, author.PDS) 116 + 117 + rr := types.RepostRecord{ 118 + RecCreated: rec.CreatedAt, 119 + Post: fp.ID, 120 + Reposter: evt.User, 121 + Author: fp.Author, 122 + RecCid: evt.RecCid.String(), 123 + Rkey: evt.Rkey, 124 + } 125 + if err := ix.db.Create(&rr).Error; err != nil { 126 + return err 127 + } 128 + 129 + if err := ix.notifman.AddRepost(ctx, fp.Author, rr.ID, evt.User); err != nil { 130 + return err 131 + } 132 + 133 + case *bsky.FeedVote: 134 + var val int 135 + var dbdir types.VoteDir 136 + switch rec.Direction { 137 + case "up": 138 + val = 1 139 + dbdir = types.VoteDirUp 140 + case "down": 141 + val = -1 142 + dbdir = types.VoteDirDown 143 + default: 144 + return fmt.Errorf("invalid vote direction: %q", rec.Direction) 145 + } 146 + 147 + puri, err := parseAtUri(rec.Subject.Uri) 148 + if err != nil { 149 + return err 150 + } 151 + 152 + act, err := ix.lookupUserByDid(ctx, puri.Did) 153 + if err != nil { 154 + return err 155 + } 156 + 157 + relevantPds = append(relevantPds, act.PDS) 158 + 159 + var post types.FeedPost 160 + if err := ix.db.First(&post, "rkey = ? AND author = ?", puri.Rkey, act.Uid).Error; err != nil { 161 + return err 162 + } 163 + 164 + vr := types.VoteRecord{ 165 + Dir: dbdir, 166 + Voter: evt.User, 167 + Post: post.ID, 168 + Created: rec.CreatedAt, 169 + Rkey: evt.Rkey, 170 + Cid: evt.RecCid.String(), 171 + } 172 + if err := ix.db.Create(&vr).Error; err != nil { 173 + return err 174 + } 175 + 176 + if err := ix.db.Model(types.FeedPost{}).Where("id = ?", post.ID).Update("up_count", gorm.Expr("up_count + ?", val)).Error; err != nil { 177 + return err 178 + } 179 + 180 + if rec.Direction == "up" { 181 + if err := ix.addNewVoteNotification(ctx, act.ID, &vr); err != nil { 182 + return err 183 + } 184 + } 185 + 186 + case *bsky.GraphFollow: 187 + subj, err := ix.lookupUserByDid(ctx, rec.Subject.Did) 188 + if err != nil { 189 + if !errors.Is(err, gorm.ErrRecordNotFound) { 190 + return err 191 + } 192 + nu, err := ix.CreateExternalUser(ctx, rec.Subject.Did) 193 + if err != nil { 194 + return err 195 + } 196 + 197 + subj = nu 198 + } 199 + 200 + if subj.PDS != 0 { 201 + relevantPds = append(relevantPds, subj.PDS) 202 + } 203 + 204 + // 'follower' followed 'target' 205 + fr := types.FollowRecord{ 206 + Follower: evt.User, 207 + Target: subj.ID, 208 + Rkey: evt.Rkey, 209 + Cid: evt.RecCid.String(), 210 + } 211 + if err := ix.db.Create(&fr).Error; err != nil { 212 + return err 213 + } 214 + 215 + if err := ix.notifman.AddFollow(ctx, fr.Follower, fr.Target, fr.ID); err != nil { 216 + return err 217 + } 218 + 219 + if local && subj.PDS != 0 { 220 + if err := ix.SendRemoteFollow(ctx, subj.Did, subj.PDS); err != nil { 221 + log.Println("failed to issue remote follow directive: ", err) 222 + } 223 + } 224 + 225 + default: 226 + return fmt.Errorf("unrecognized record type: %T", rec) 227 + } 228 + 229 + did, err := ix.DidForUser(ctx, evt.User) 230 + if err != nil { 231 + return err 232 + } 233 + 234 + fmt.Println("Sending event: ", evt.Collection, relevantPds) 235 + if err := ix.events.AddEvent(&events.Event{ 236 + CarSlice: evt.RepoSlice, 237 + Kind: events.EvtKindCreateRecord, 238 + PrivUid: evt.User, 239 + User: did, 240 + Collection: evt.Collection, 241 + Rkey: evt.Rkey, 242 + PrivRelevantPds: relevantPds, 243 + }); err != nil { 244 + log.Println("failed to push event: ", err) 245 + } 246 + 247 + return nil 248 + } 249 + 250 + func (ix *Indexer) DidForUser(ctx context.Context, uid uint) (string, error) { 251 + var ai types.ActorInfo 252 + if err := ix.db.First(&ai, "id = ?", uid).Error; err != nil { 253 + return "", err 254 + } 255 + 256 + return ai.Did, nil 257 + } 258 + 259 + func (ix *Indexer) lookupUser(ctx context.Context, id uint) (*types.ActorInfo, error) { 260 + var ai types.ActorInfo 261 + if err := ix.db.First(&ai, "id = ?", id).Error; err != nil { 262 + return nil, err 263 + } 264 + 265 + return &ai, nil 266 + } 267 + 268 + func (ix *Indexer) lookupUserByDid(ctx context.Context, did string) (*types.ActorInfo, error) { 269 + var ai types.ActorInfo 270 + if err := ix.db.First(&ai, "did = ?", did).Error; err != nil { 271 + return nil, err 272 + } 273 + 274 + return &ai, nil 275 + } 276 + 277 + func (ix *Indexer) lookupUserByHandle(ctx context.Context, handle string) (*types.ActorInfo, error) { 278 + var ai types.ActorInfo 279 + if err := ix.db.First(&ai, "handle = ?", handle).Error; err != nil { 280 + return nil, err 281 + } 282 + 283 + return &ai, nil 284 + } 285 + 286 + func (ix *Indexer) addNewPostNotification(ctx context.Context, post *bsky.FeedPost, fp *types.FeedPost) error { 287 + if post.Reply != nil { 288 + replyto, err := ix.GetPost(ctx, post.Reply.Parent.Uri) 289 + if err != nil { 290 + fmt.Println("probably shouldnt error when processing a reply to a not-found post") 291 + return err 292 + } 293 + 294 + if err := ix.notifman.AddReplyTo(ctx, fp.Author, fp.ID, replyto); err != nil { 295 + return err 296 + } 297 + } 298 + 299 + for _, e := range post.Entities { 300 + switch e.Type { 301 + case "mention": 302 + mentioned, err := ix.lookupUserByDid(ctx, e.Value) 303 + if err != nil { 304 + return fmt.Errorf("mentioned user does not exist: %w", err) 305 + } 306 + 307 + if err := ix.notifman.AddMention(ctx, fp.Author, fp.ID, mentioned.ID); err != nil { 308 + return err 309 + } 310 + } 311 + } 312 + return nil 313 + } 314 + 315 + func (ix *Indexer) addNewVoteNotification(ctx context.Context, postauthor uint, vr *types.VoteRecord) error { 316 + return ix.notifman.AddUpVote(ctx, vr.Voter, vr.Post, vr.ID, postauthor) 317 + } 318 + 319 + func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent) error { 320 + ai := evt.ActorInfo 321 + if err := ix.db.Create(&types.ActorInfo{ 322 + Uid: evt.User, 323 + Handle: ai.Handle, 324 + Did: ai.Did, 325 + DisplayName: ai.DisplayName, 326 + DeclRefCid: ai.DeclRefCid, 327 + Type: ai.Type, 328 + }).Error; err != nil { 329 + return err 330 + } 331 + 332 + if err := ix.db.Create(&types.FollowRecord{ 333 + Follower: evt.User, 334 + Target: evt.User, 335 + }).Error; err != nil { 336 + return err 337 + } 338 + 339 + return nil 340 + } 341 + 342 + func (ix *Indexer) GetPost(ctx context.Context, uri string) (*types.FeedPost, error) { 343 + puri, err := parseAtUri(uri) 344 + if err != nil { 345 + return nil, err 346 + } 347 + 348 + var post types.FeedPost 349 + if err := ix.db.First(&post, "rkey = ? AND author = (?)", puri.Rkey, ix.db.Model(types.ActorInfo{}).Where("did = ?", puri.Did).Select("id")).Error; err != nil { 350 + return nil, err 351 + } 352 + 353 + return &post, nil 354 + } 355 + 356 + type parsedUri struct { 357 + Did string 358 + Collection string 359 + Rkey string 360 + } 361 + 362 + func parseAtUri(uri string) (*parsedUri, error) { 363 + if !strings.HasPrefix(uri, "at://") { 364 + return nil, fmt.Errorf("AT uris must be prefixed with 'at://'") 365 + } 366 + 367 + trimmed := strings.TrimPrefix(uri, "at://") 368 + parts := strings.Split(trimmed, "/") 369 + if len(parts) != 3 { 370 + return nil, fmt.Errorf("AT uris must have three parts: did, collection, tid") 371 + } 372 + 373 + return &parsedUri{ 374 + Did: parts[0], 375 + Collection: parts[1], 376 + Rkey: parts[2], 377 + }, nil 378 + }
+13
plc/plc.go
··· 1 + package plc 2 + 3 + import ( 4 + "context" 5 + 6 + "github.com/whyrusleeping/go-did" 7 + "github.com/whyrusleeping/gosky/key" 8 + ) 9 + 10 + type PLCClient interface { 11 + GetDocument(ctx context.Context, didstr string) (*did.Document, error) 12 + CreateDID(ctx context.Context, sigkey *key.Key, recovery string, handle string, service string) (string, error) 13 + }
+4 -50
server/events.go events/events.go
··· 1 - package schemagen 1 + package events 2 2 3 3 import ( 4 4 "fmt" 5 - "log" 6 - 7 - "github.com/gorilla/websocket" 8 - "github.com/labstack/echo/v4" 9 5 ) 10 6 11 7 type EventManager struct { ··· 89 85 CarSlice []byte 90 86 91 87 // some private fields for processing metadata 92 - uid uint 93 - pdsid uint 94 - relevantPds []uint 88 + PrivUid uint `json:"-"` 89 + PrivPdsId uint `json:"-"` 90 + PrivRelevantPds []uint `json:"-"` 95 91 } 96 92 97 93 func (em *EventManager) AddEvent(ev *Event) error { ··· 104 100 case <-em.closed: 105 101 return fmt.Errorf("event manager shut down") 106 102 } 107 - } 108 - 109 - func (s *Server) EventsHandler(c echo.Context) error { 110 - did := c.Request().Header.Get("DID") 111 - conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) 112 - if err != nil { 113 - return err 114 - } 115 - ctx := c.Request().Context() 116 - 117 - var peering Peering 118 - if err := s.db.First(&peering, "did = ?", did).Error; err != nil { 119 - return err 120 - } 121 - 122 - evts, cancel, err := s.events.Subscribe(func(evt *Event) bool { 123 - for _, pid := range evt.relevantPds { 124 - if pid == peering.ID { 125 - return true 126 - } 127 - } 128 - 129 - has, err := s.peerHasFollow(ctx, peering.ID, evt.uid) 130 - if err != nil { 131 - log.Println("error checking peer follow relationship: ", err) 132 - return false 133 - } 134 - 135 - return has 136 - }) 137 - if err != nil { 138 - return err 139 - } 140 - defer cancel() 141 - 142 - for evt := range evts { 143 - if err := conn.WriteJSON(evt); err != nil { 144 - return err 145 - } 146 - } 147 - 148 - return nil 149 103 } 150 104 151 105 func (em *EventManager) Subscribe(filter func(*Event) bool) (<-chan *Event, func(), error) {
+1 -1
server/fakedid.go plc/fakedid.go
··· 1 - package schemagen 1 + package plc 2 2 3 3 import ( 4 4 "context"
+6 -8
server/federation_test.go
··· 19 19 atproto "github.com/whyrusleeping/gosky/api/atproto" 20 20 bsky "github.com/whyrusleeping/gosky/api/bsky" 21 21 "github.com/whyrusleeping/gosky/carstore" 22 + "github.com/whyrusleeping/gosky/plc" 22 23 "github.com/whyrusleeping/gosky/xrpc" 23 24 "gorm.io/driver/sqlite" 24 25 "gorm.io/gorm" ··· 71 72 } 72 73 } 73 74 74 - func setupPDS(t *testing.T, host, suffix string, plc PLCClient) *testPDS { 75 + func setupPDS(t *testing.T, host, suffix string, plc plc.PLCClient) *testPDS { 75 76 dir, err := ioutil.TempDir("", "fedtest") 76 77 if err != nil { 77 78 t.Fatal(err) ··· 270 271 return resp.Notifications 271 272 } 272 273 273 - func testPLC(t *testing.T) *FakeDid { 274 + func testPLC(t *testing.T) *plc.FakeDid { 274 275 // TODO: just do in memory... 275 276 tdir, err := ioutil.TempDir("", "plcserv") 276 277 if err != nil { ··· 281 282 if err != nil { 282 283 t.Fatal(err) 283 284 } 284 - return NewFakeDid(db) 285 - 285 + return plc.NewFakeDid(db) 286 286 } 287 287 288 288 func TestBasicFederation(t *testing.T) { ··· 321 321 322 322 fmt.Println("laura notifications:") 323 323 lnot := laura.GetNotifs(t) 324 - for _, n := range lnot { 325 - fmt.Println(n) 324 + if len(lnot) != 1 { 325 + t.Fatal("wrong number of notifications") 326 326 } 327 - 328 - select {} 329 327 }
+3 -2
server/fedmgr.go
··· 10 10 "time" 11 11 12 12 "github.com/gorilla/websocket" 13 + "github.com/whyrusleeping/gosky/events" 13 14 ) 14 15 15 - type IndexCallback func(context.Context, string, *Event) error 16 + type IndexCallback func(context.Context, string, *events.Event) error 16 17 17 18 func (s *Server) SubscribeToPds(ctx context.Context, host string) error { 18 19 var peering Peering ··· 71 72 72 73 _ = mt 73 74 74 - var ev Event 75 + var ev events.Event 75 76 if err := json.Unmarshal(data, &ev); err != nil { 76 77 return fmt.Errorf("failed to unmarshal event: %w", err) 77 78 }
+30 -64
server/feedgen.go
··· 10 10 11 11 "github.com/ipfs/go-cid" 12 12 bsky "github.com/whyrusleeping/gosky/api/bsky" 13 + "github.com/whyrusleeping/gosky/indexer" 14 + "github.com/whyrusleeping/gosky/types" 13 15 "go.opentelemetry.io/otel" 14 16 "gorm.io/gorm" 15 17 ) 16 18 17 19 type FeedGenerator struct { 18 20 db *gorm.DB 19 - ix *Indexer 21 + ix *indexer.Indexer 20 22 21 23 readRecord ReadRecordFunc 22 24 } 23 25 24 - func NewFeedGenerator(db *gorm.DB, ix *Indexer, readRecord ReadRecordFunc) (*FeedGenerator, error) { 26 + func NewFeedGenerator(db *gorm.DB, ix *indexer.Indexer, readRecord ReadRecordFunc) (*FeedGenerator, error) { 25 27 return &FeedGenerator{ 26 28 db: db, 27 29 ix: ix, ··· 49 51 } 50 52 */ 51 53 52 - func (fg *FeedGenerator) hydrateFeed(ctx context.Context, items []*FeedPost, reposts []*RepostRecord) ([]*bsky.FeedFeedViewPost, error) { 54 + func (fg *FeedGenerator) hydrateFeed(ctx context.Context, items []*types.FeedPost, reposts []*types.RepostRecord) ([]*bsky.FeedFeedViewPost, error) { 53 55 out := make([]*bsky.FeedFeedViewPost, 0, len(items)) 54 56 for _, it := range items { 55 57 hit, err := fg.hydrateItem(ctx, it) ··· 62 64 63 65 if len(reposts) > 0 { 64 66 for _, rp := range reposts { 65 - var fp FeedPost 67 + var fp types.FeedPost 66 68 if err := fg.db.First(&fp, "id = ?", rp.Post).Error; err != nil { 67 69 return nil, err 68 70 } ··· 93 95 94 96 func (fg *FeedGenerator) didForUser(ctx context.Context, user uint) (string, error) { 95 97 // TODO: cache the shit out of this 96 - var ai ActorInfo 98 + var ai types.ActorInfo 97 99 if err := fg.db.First(&ai, "uid = ?", user).Error; err != nil { 98 100 return "", err 99 101 } ··· 103 105 104 106 func (fg *FeedGenerator) getActorRefInfo(ctx context.Context, user uint) (*bsky.ActorRef_WithInfo, error) { 105 107 // TODO: cache the shit out of this too 106 - var ai ActorInfo 108 + var ai types.ActorInfo 107 109 if err := fg.db.First(&ai, "uid = ?", user).Error; err != nil { 108 110 return nil, err 109 111 } ··· 111 113 return ai.ActorRef(), nil 112 114 } 113 115 114 - func (ai *ActorInfo) ActorRef() *bsky.ActorRef_WithInfo { 115 - return &bsky.ActorRef_WithInfo{ 116 - Did: ai.Did, 117 - Declaration: &bsky.SystemDeclRef{ 118 - Cid: ai.DeclRefCid, 119 - ActorType: ai.Type, 120 - }, 121 - Handle: ai.Handle, 122 - DisplayName: &ai.DisplayName, 123 - } 124 - } 125 - 126 - func (fg *FeedGenerator) hydrateItem(ctx context.Context, item *FeedPost) (*bsky.FeedFeedViewPost, error) { 116 + func (fg *FeedGenerator) hydrateItem(ctx context.Context, item *types.FeedPost) (*bsky.FeedFeedViewPost, error) { 127 117 authorDid, err := fg.didForUser(ctx, item.Author) 128 118 if err != nil { 129 119 return nil, err ··· 166 156 func (fg *FeedGenerator) getPostViewerState(ctx context.Context, item uint, viewer uint, viewerDid string) (*bsky.FeedPost_ViewerState, error) { 167 157 var out bsky.FeedPost_ViewerState 168 158 169 - var vote VoteRecord 159 + var vote types.VoteRecord 170 160 if err := fg.db.Find(&vote, "post = ? AND voter = ?", item, viewer).Error; err != nil { 171 161 return nil, err 172 162 } ··· 174 164 if vote.ID != 0 { 175 165 vuri := fmt.Sprintf("at://%s/app.bsky.feed.vote/%s", viewerDid, vote.Rkey) 176 166 switch vote.Dir { 177 - case VoteDirUp: 167 + case types.VoteDirUp: 178 168 out.Upvote = &vuri 179 - case VoteDirDown: 169 + case types.VoteDirDown: 180 170 out.Downvote = &vuri 181 171 } 182 172 } 183 173 184 - var rep RepostRecord 174 + var rep types.RepostRecord 185 175 if err := fg.db.Find(&rep, "post = ? AND reposter = ?", item, viewer).Error; err != nil { 186 176 return nil, err 187 177 } ··· 199 189 defer span.End() 200 190 201 191 // TODO: this query is just a temporary hack... 202 - var feed []*FeedPost 192 + var feed []*types.FeedPost 203 193 if err := fg.db.Debug().Find(&feed, "author in (?)", 204 - fg.db.Model(FollowRecord{}).Where("follower = ?", user.ID).Select("target"), 194 + fg.db.Model(types.FollowRecord{}).Where("follower = ?", user.ID).Select("target"), 205 195 ).Error; err != nil { 206 196 return nil, err 207 197 } 208 198 209 - var rps []*RepostRecord 199 + var rps []*types.RepostRecord 210 200 if err := fg.db.Debug().Find(&rps, "reposter in (?)", 211 - fg.db.Model(FollowRecord{}).Where("follower = ?", user.ID).Select("target"), 201 + fg.db.Model(types.FollowRecord{}).Where("follower = ?", user.ID).Select("target"), 212 202 ).Error; err != nil { 213 203 return nil, err 214 204 } ··· 260 250 // for memory efficiency, should probably return the actual type that goes out to the user... 261 251 // bsky.FeedGetAuthorFeed_FeedItem 262 252 263 - var feed []*FeedPost 253 + var feed []*types.FeedPost 264 254 if err := fg.db.Find(&feed, "author = ?", user.ID).Error; err != nil { 265 255 return nil, err 266 256 } 267 257 268 - var reposts []*RepostRecord 258 + var reposts []*types.RepostRecord 269 259 if err := fg.db.Find(&reposts, "reposter = ?", user.ID).Error; err != nil { 270 260 return nil, err 271 261 } ··· 278 268 return fg.personalizeFeed(ctx, fout, user) 279 269 } 280 270 281 - type parsedUri struct { 282 - Did string 283 - Collection string 284 - Rkey string 285 - } 286 - 287 - func parseAtUri(uri string) (*parsedUri, error) { 288 - if !strings.HasPrefix(uri, "at://") { 289 - return nil, fmt.Errorf("AT uris must be prefixed with 'at://'") 290 - } 291 - 292 - trimmed := strings.TrimPrefix(uri, "at://") 293 - parts := strings.Split(trimmed, "/") 294 - if len(parts) != 3 { 295 - return nil, fmt.Errorf("AT uris must have three parts: did, collection, tid") 296 - } 297 - 298 - return &parsedUri{ 299 - Did: parts[0], 300 - Collection: parts[1], 301 - Rkey: parts[2], 302 - }, nil 303 - } 304 - 305 - func (fg *FeedGenerator) GetActorProfileByID(ctx context.Context, actor uint) (*ActorInfo, error) { 306 - var ai ActorInfo 271 + func (fg *FeedGenerator) GetActorProfileByID(ctx context.Context, actor uint) (*types.ActorInfo, error) { 272 + var ai types.ActorInfo 307 273 if err := fg.db.First(&ai, "id = ?", actor).Error; err != nil { 308 274 return nil, fmt.Errorf("getActorProfileByID: %w", err) 309 275 } 310 276 311 277 return &ai, nil 312 278 } 313 - func (fg *FeedGenerator) GetActorProfile(ctx context.Context, actor string) (*ActorInfo, error) { 279 + func (fg *FeedGenerator) GetActorProfile(ctx context.Context, actor string) (*types.ActorInfo, error) { 314 280 fmt.Println("get actor profile: ", actor) 315 - var ai ActorInfo 281 + var ai types.ActorInfo 316 282 if strings.HasPrefix(actor, "did:") { 317 283 if err := fg.db.First(&ai, "did = ?", actor).Error; err != nil { 318 284 return nil, err ··· 378 344 CreatedAt string 379 345 } 380 346 381 - func (fg *FeedGenerator) hydrateVote(ctx context.Context, v *VoteRecord) (*HydratedVote, error) { 347 + func (fg *FeedGenerator) hydrateVote(ctx context.Context, v *types.VoteRecord) (*HydratedVote, error) { 382 348 aref, err := fg.getActorRefInfo(ctx, v.Voter) 383 349 if err != nil { 384 350 return nil, err ··· 406 372 return nil, fmt.Errorf("listing likes of old post versions not supported") 407 373 } 408 374 409 - var dbdir VoteDir 375 + var dbdir types.VoteDir 410 376 switch dir { 411 377 case "up": 412 - dbdir = VoteDirUp 378 + dbdir = types.VoteDirUp 413 379 case "down": 414 - dbdir = VoteDirDown 380 + dbdir = types.VoteDirDown 415 381 default: 416 382 return nil, fmt.Errorf("there are only two directions, up or down") 417 383 } 418 384 419 - var voterecs []VoteRecord 385 + var voterecs []types.VoteRecord 420 386 if err := fg.db.Limit(limit).Find(&voterecs, "dir = ? AND post = ?", dbdir, p.ID).Error; err != nil { 421 387 return nil, err 422 388 } ··· 441 407 } 442 408 443 409 func (fg *FeedGenerator) GetFollows(ctx context.Context, user string, limit int, before string) ([]*FollowInfo, error) { 444 - var follows []FollowRecord 445 - if err := fg.db.Limit(limit).Find(&follows, "follower = (?)", fg.db.Model(ActorInfo{}).Where("did = ? or handle = ?", user, user).Select("uid")).Error; err != nil { 410 + var follows []types.FollowRecord 411 + if err := fg.db.Limit(limit).Find(&follows, "follower = (?)", fg.db.Model(types.ActorInfo{}).Where("did = ? or handle = ?", user, user).Select("uid")).Error; err != nil { 446 412 return nil, err 447 413 } 448 414
+3 -8
server/handlers.go
··· 21 21 return nil, err 22 22 } 23 23 24 - scinfo, err := s.CreateScene(ctx, u, input.Handle, input.RecoveryKey) 25 - if err != nil { 26 - return nil, err 27 - } 28 - 29 - _ = scinfo 24 + _ = u 30 25 panic("nyi") 31 26 32 27 /* ··· 312 307 } 313 308 314 309 var out appbskytypes.GraphGetFollows_Output 315 - out.Subject = infoToActorRef(ai) 310 + out.Subject = ai.ActorRef() 316 311 317 312 out.Follows = []*appbskytypes.GraphGetFollows_Follow{} 318 313 for _, f := range follows { ··· 340 335 } 341 336 342 337 return &appbskytypes.GraphGetMemberships_Output{ 343 - Subject: infoToActorRef(ai), 338 + Subject: ai.ActorRef(), 344 339 Memberships: []*appbskytypes.GraphGetMemberships_Membership{}, 345 340 }, nil 346 341 }
-493
server/indexer.go
··· 1 - package schemagen 2 - 3 - import ( 4 - "context" 5 - "errors" 6 - "fmt" 7 - "log" 8 - "net/url" 9 - "time" 10 - 11 - bsky "github.com/whyrusleeping/gosky/api/bsky" 12 - "github.com/whyrusleeping/gosky/repomgr" 13 - "github.com/whyrusleeping/gosky/xrpc" 14 - "go.opentelemetry.io/otel" 15 - "gorm.io/gorm" 16 - ) 17 - 18 - type Indexer struct { 19 - db *gorm.DB 20 - 21 - notifman *NotificationManager 22 - events *EventManager 23 - didr PLCClient 24 - 25 - sendRemoteFollow func(context.Context, string, uint) error 26 - } 27 - 28 - func NewIndexer(db *gorm.DB, notifman *NotificationManager, evtman *EventManager, didr PLCClient) (*Indexer, error) { 29 - db.AutoMigrate(&FeedPost{}) 30 - db.AutoMigrate(&ActorInfo{}) 31 - db.AutoMigrate(&FollowRecord{}) 32 - db.AutoMigrate(&VoteRecord{}) 33 - db.AutoMigrate(&RepostRecord{}) 34 - 35 - return &Indexer{ 36 - db: db, 37 - notifman: notifman, 38 - events: evtman, 39 - didr: didr, 40 - }, nil 41 - } 42 - 43 - type FeedPost struct { 44 - gorm.Model 45 - Author uint 46 - Rkey string 47 - Cid string 48 - UpCount int64 49 - ReplyCount int64 50 - RepostCount int64 51 - ReplyTo uint 52 - } 53 - 54 - type RepostRecord struct { 55 - ID uint `gorm:"primarykey"` 56 - CreatedAt time.Time 57 - RecCreated string 58 - Post uint 59 - Reposter uint 60 - Author uint 61 - RecCid string 62 - Rkey string 63 - } 64 - 65 - type ActorInfo struct { 66 - gorm.Model 67 - Uid uint `gorm:"index"` 68 - Handle string 69 - DisplayName string 70 - Did string 71 - Following int64 72 - Followers int64 73 - Posts int64 74 - DeclRefCid string 75 - Type string 76 - PDS uint 77 - } 78 - 79 - type VoteDir int 80 - 81 - func (vd VoteDir) String() string { 82 - switch vd { 83 - case VoteDirUp: 84 - return "up" 85 - case VoteDirDown: 86 - return "down" 87 - default: 88 - return "<unknown>" 89 - } 90 - } 91 - 92 - const ( 93 - VoteDirUp = VoteDir(1) 94 - VoteDirDown = VoteDir(2) 95 - ) 96 - 97 - type VoteRecord struct { 98 - gorm.Model 99 - Dir VoteDir 100 - Voter uint 101 - Post uint 102 - Created string 103 - Rkey string 104 - Cid string 105 - } 106 - 107 - type FollowRecord struct { 108 - gorm.Model 109 - Follower uint 110 - Target uint 111 - Rkey string 112 - Cid string 113 - } 114 - 115 - func (ix *Indexer) catchup(ctx context.Context, evt *repomgr.RepoEvent) error { 116 - // TODO: catch up on events that happened since this event (in the event of a crash or downtime) 117 - return nil 118 - } 119 - 120 - func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) { 121 - ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent") 122 - defer span.End() 123 - 124 - if err := ix.catchup(ctx, evt); err != nil { 125 - log.Println("failed to catch up on user repo changes, processing events off base: ", err) 126 - } 127 - 128 - fmt.Println("Handling Event!", evt.Kind) 129 - 130 - switch evt.Kind { 131 - case repomgr.EvtKindCreateRecord: 132 - if err := ix.handleRecordCreate(ctx, evt, true); err != nil { 133 - log.Println("handle recordCreate: ", err) 134 - } 135 - case repomgr.EvtKindInitActor: 136 - if err := ix.handleInitActor(ctx, evt); err != nil { 137 - log.Println("handle initActor: ", err) 138 - } 139 - default: 140 - log.Println("unrecognized repo event type: ", evt.Kind) 141 - } 142 - } 143 - 144 - func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEvent, local bool) error { 145 - fmt.Println("record create event", evt.Collection) 146 - var relevantPds []uint 147 - switch rec := evt.Record.(type) { 148 - case *bsky.FeedPost: 149 - var replyid uint 150 - if rec.Reply != nil { 151 - replyto, err := ix.GetPost(ctx, rec.Reply.Parent.Uri) 152 - if err != nil { 153 - return err 154 - } 155 - 156 - replyid = replyto.ID 157 - } 158 - 159 - fp := FeedPost{ 160 - Rkey: evt.Rkey, 161 - Cid: evt.RecCid.String(), 162 - Author: evt.User, 163 - ReplyTo: replyid, 164 - } 165 - if err := ix.db.Create(&fp).Error; err != nil { 166 - return err 167 - } 168 - 169 - if err := ix.addNewPostNotification(ctx, rec, &fp); err != nil { 170 - return err 171 - } 172 - 173 - case *bsky.FeedRepost: 174 - fp, err := ix.GetPost(ctx, rec.Subject.Uri) 175 - if err != nil { 176 - return err 177 - } 178 - 179 - author, err := ix.lookupUser(ctx, fp.Author) 180 - if err != nil { 181 - return err 182 - } 183 - 184 - relevantPds = append(relevantPds, author.PDS) 185 - 186 - rr := RepostRecord{ 187 - RecCreated: rec.CreatedAt, 188 - Post: fp.ID, 189 - Reposter: evt.User, 190 - Author: fp.Author, 191 - RecCid: evt.RecCid.String(), 192 - Rkey: evt.Rkey, 193 - } 194 - if err := ix.db.Create(&rr).Error; err != nil { 195 - return err 196 - } 197 - 198 - if err := ix.notifman.AddRepost(ctx, fp.Author, rr.ID, evt.User); err != nil { 199 - return err 200 - } 201 - 202 - case *bsky.FeedVote: 203 - var val int 204 - var dbdir VoteDir 205 - switch rec.Direction { 206 - case "up": 207 - val = 1 208 - dbdir = VoteDirUp 209 - case "down": 210 - val = -1 211 - dbdir = VoteDirDown 212 - default: 213 - return fmt.Errorf("invalid vote direction: %q", rec.Direction) 214 - } 215 - 216 - puri, err := parseAtUri(rec.Subject.Uri) 217 - if err != nil { 218 - return err 219 - } 220 - 221 - act, err := ix.lookupUserByDid(ctx, puri.Did) 222 - if err != nil { 223 - return err 224 - } 225 - 226 - relevantPds = append(relevantPds, act.PDS) 227 - 228 - var post FeedPost 229 - if err := ix.db.First(&post, "rkey = ? AND author = ?", puri.Rkey, act.Uid).Error; err != nil { 230 - return err 231 - } 232 - 233 - vr := VoteRecord{ 234 - Dir: dbdir, 235 - Voter: evt.User, 236 - Post: post.ID, 237 - Created: rec.CreatedAt, 238 - Rkey: evt.Rkey, 239 - Cid: evt.RecCid.String(), 240 - } 241 - if err := ix.db.Create(&vr).Error; err != nil { 242 - return err 243 - } 244 - 245 - if err := ix.db.Model(FeedPost{}).Where("id = ?", post.ID).Update("up_count", gorm.Expr("up_count + ?", val)).Error; err != nil { 246 - return err 247 - } 248 - 249 - if rec.Direction == "up" { 250 - if err := ix.addNewVoteNotification(ctx, act.ID, &vr); err != nil { 251 - return err 252 - } 253 - } 254 - 255 - case *bsky.GraphFollow: 256 - subj, err := ix.lookupUserByDid(ctx, rec.Subject.Did) 257 - if err != nil { 258 - if !errors.Is(err, gorm.ErrRecordNotFound) { 259 - return err 260 - } 261 - nu, err := ix.createExternalUser(ctx, rec.Subject.Did) 262 - if err != nil { 263 - return err 264 - } 265 - 266 - subj = nu 267 - } 268 - 269 - if subj.PDS != 0 { 270 - relevantPds = append(relevantPds, subj.PDS) 271 - } 272 - 273 - // 'follower' followed 'target' 274 - fr := FollowRecord{ 275 - Follower: evt.User, 276 - Target: subj.ID, 277 - Rkey: evt.Rkey, 278 - Cid: evt.RecCid.String(), 279 - } 280 - if err := ix.db.Create(&fr).Error; err != nil { 281 - return err 282 - } 283 - 284 - if err := ix.notifman.AddFollow(ctx, fr.Follower, fr.Target, fr.ID); err != nil { 285 - return err 286 - } 287 - 288 - if local && subj.PDS != 0 { 289 - if err := ix.sendRemoteFollow(ctx, subj.Did, subj.PDS); err != nil { 290 - log.Println("failed to issue remote follow directive: ", err) 291 - } 292 - } 293 - 294 - default: 295 - return fmt.Errorf("unrecognized record type: %T", rec) 296 - } 297 - 298 - did, err := ix.didForUser(ctx, evt.User) 299 - if err != nil { 300 - return err 301 - } 302 - 303 - fmt.Println("Sending event: ", evt.Collection, relevantPds) 304 - if err := ix.events.AddEvent(&Event{ 305 - CarSlice: evt.RepoSlice, 306 - Kind: EvtKindCreateRecord, 307 - uid: evt.User, 308 - User: did, 309 - Collection: evt.Collection, 310 - Rkey: evt.Rkey, 311 - relevantPds: relevantPds, 312 - }); err != nil { 313 - log.Println("failed to push event: ", err) 314 - } 315 - 316 - return nil 317 - } 318 - func (ix *Indexer) createExternalUser(ctx context.Context, did string) (*ActorInfo, error) { 319 - doc, err := ix.didr.GetDocument(ctx, did) 320 - if err != nil { 321 - return nil, fmt.Errorf("could not locate DID document for followed user: %s", err) 322 - } 323 - 324 - if len(doc.Service) == 0 { 325 - return nil, fmt.Errorf("external followed user %s had no services in did document", did) 326 - } 327 - 328 - svc := doc.Service[0] 329 - durl, err := url.Parse(svc.ServiceEndpoint) 330 - if err != nil { 331 - return nil, err 332 - } 333 - 334 - // TODO: the PDS's DID should also be in the service, we could use that to look up? 335 - var peering Peering 336 - if err := ix.db.First(&peering, "host = ?", durl.Host).Error; err != nil { 337 - return nil, err 338 - } 339 - 340 - var handle string 341 - if len(doc.AlsoKnownAs) > 0 { 342 - hurl, err := url.Parse(doc.AlsoKnownAs[0]) 343 - if err != nil { 344 - return nil, err 345 - } 346 - 347 - handle = hurl.Host 348 - } 349 - 350 - c := &xrpc.Client{Host: svc.ServiceEndpoint} 351 - profile, err := bsky.ActorGetProfile(ctx, c, did) 352 - if err != nil { 353 - return nil, err 354 - } 355 - 356 - if handle != profile.Handle { 357 - return nil, fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle) 358 - } 359 - 360 - // TODO: request this users info from their server to fill out our data... 361 - u := User{ 362 - Handle: handle, 363 - Did: did, 364 - PDS: peering.ID, 365 - } 366 - 367 - if err := ix.db.Create(&u).Error; err != nil { 368 - return nil, fmt.Errorf("failed to create other pds user: %w", err) 369 - } 370 - 371 - // okay cool, its a user on a server we are peered with 372 - // lets make a local record of that user for the future 373 - subj := &ActorInfo{ 374 - Uid: u.ID, 375 - Handle: handle, 376 - DisplayName: *profile.DisplayName, 377 - Did: did, 378 - DeclRefCid: profile.Declaration.Cid, 379 - Type: "", 380 - PDS: peering.ID, 381 - } 382 - if err := ix.db.Create(subj).Error; err != nil { 383 - return nil, err 384 - } 385 - 386 - return subj, nil 387 - } 388 - 389 - func (ix *Indexer) didForUser(ctx context.Context, uid uint) (string, error) { 390 - var ai ActorInfo 391 - if err := ix.db.First(&ai, "id = ?", uid).Error; err != nil { 392 - return "", err 393 - } 394 - 395 - return ai.Did, nil 396 - } 397 - 398 - func (ix *Indexer) lookupUser(ctx context.Context, id uint) (*ActorInfo, error) { 399 - var ai ActorInfo 400 - if err := ix.db.First(&ai, "id = ?", id).Error; err != nil { 401 - return nil, err 402 - } 403 - 404 - return &ai, nil 405 - } 406 - 407 - func (ix *Indexer) lookupUserByDid(ctx context.Context, did string) (*ActorInfo, error) { 408 - var ai ActorInfo 409 - if err := ix.db.First(&ai, "did = ?", did).Error; err != nil { 410 - return nil, err 411 - } 412 - 413 - return &ai, nil 414 - } 415 - 416 - func (ix *Indexer) lookupUserByHandle(ctx context.Context, handle string) (*ActorInfo, error) { 417 - var ai ActorInfo 418 - if err := ix.db.First(&ai, "handle = ?", handle).Error; err != nil { 419 - return nil, err 420 - } 421 - 422 - return &ai, nil 423 - } 424 - 425 - func (ix *Indexer) addNewPostNotification(ctx context.Context, post *bsky.FeedPost, fp *FeedPost) error { 426 - if post.Reply != nil { 427 - replyto, err := ix.GetPost(ctx, post.Reply.Parent.Uri) 428 - if err != nil { 429 - fmt.Println("probably shouldnt error when processing a reply to a not-found post") 430 - return err 431 - } 432 - 433 - if err := ix.notifman.AddReplyTo(ctx, fp.Author, fp.ID, replyto); err != nil { 434 - return err 435 - } 436 - } 437 - 438 - for _, e := range post.Entities { 439 - switch e.Type { 440 - case "mention": 441 - mentioned, err := ix.lookupUserByDid(ctx, e.Value) 442 - if err != nil { 443 - return fmt.Errorf("mentioned user does not exist: %w", err) 444 - } 445 - 446 - if err := ix.notifman.AddMention(ctx, fp.Author, fp.ID, mentioned.ID); err != nil { 447 - return err 448 - } 449 - } 450 - } 451 - return nil 452 - } 453 - 454 - func (ix *Indexer) addNewVoteNotification(ctx context.Context, postauthor uint, vr *VoteRecord) error { 455 - return ix.notifman.AddUpVote(ctx, vr.Voter, vr.Post, vr.ID, postauthor) 456 - } 457 - 458 - func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent) error { 459 - ai := evt.ActorInfo 460 - if err := ix.db.Create(&ActorInfo{ 461 - Uid: evt.User, 462 - Handle: ai.Handle, 463 - Did: ai.Did, 464 - DisplayName: ai.DisplayName, 465 - DeclRefCid: ai.DeclRefCid, 466 - Type: ai.Type, 467 - }).Error; err != nil { 468 - return err 469 - } 470 - 471 - if err := ix.db.Create(&FollowRecord{ 472 - Follower: evt.User, 473 - Target: evt.User, 474 - }).Error; err != nil { 475 - return err 476 - } 477 - 478 - return nil 479 - } 480 - 481 - func (ix *Indexer) GetPost(ctx context.Context, uri string) (*FeedPost, error) { 482 - puri, err := parseAtUri(uri) 483 - if err != nil { 484 - return nil, err 485 - } 486 - 487 - var post FeedPost 488 - if err := ix.db.First(&post, "rkey = ? AND author = (?)", puri.Rkey, ix.db.Model(ActorInfo{}).Where("did = ?", puri.Did).Select("id")).Error; err != nil { 489 - return nil, err 490 - } 491 - 492 - return &post, nil 493 - }
+15 -14
server/notifs.go notifs/notifs.go
··· 1 - package schemagen 1 + package notifs 2 2 3 3 import ( 4 4 "context" ··· 8 8 "github.com/ipfs/go-cid" 9 9 appbskytypes "github.com/whyrusleeping/gosky/api/bsky" 10 10 "github.com/whyrusleeping/gosky/repomgr" 11 + "github.com/whyrusleeping/gosky/types" 11 12 "gorm.io/gorm" 12 13 "gorm.io/gorm/clause" 13 14 ) ··· 119 120 return nil, fmt.Errorf("attempted to hydrate unknown notif kind: %d", nrec.Kind) 120 121 } 121 122 } 122 - func (nm *NotificationManager) getActor(ctx context.Context, act uint) (*ActorInfo, error) { 123 - var ai ActorInfo 123 + func (nm *NotificationManager) getActor(ctx context.Context, act uint) (*types.ActorInfo, error) { 124 + var ai types.ActorInfo 124 125 if err := nm.db.First(&ai, "id = ?", act).Error; err != nil { 125 126 return nil, err 126 127 } ··· 129 130 } 130 131 131 132 func (nm *NotificationManager) hydrateNotificationUpVote(ctx context.Context, nrec *NotifRecord, lastSeen time.Time) (*appbskytypes.NotificationList_Notification, error) { 132 - var votedOn FeedPost 133 + var votedOn types.FeedPost 133 134 if err := nm.db.First(&votedOn, "id = ?", nrec.Record).Error; err != nil { 134 135 return nil, err 135 136 } ··· 139 140 return nil, err 140 141 } 141 142 142 - var vote VoteRecord 143 + var vote types.VoteRecord 143 144 if err := nm.db.First(&vote, "id = ?", nrec.Record).Error; err != nil { 144 145 return nil, err 145 146 } ··· 169 170 } 170 171 171 172 func (nm *NotificationManager) hydrateNotificationRepost(ctx context.Context, nrec *NotifRecord, lastSeen time.Time) (*appbskytypes.NotificationList_Notification, error) { 172 - var reposted FeedPost 173 + var reposted types.FeedPost 173 174 if err := nm.db.First(&reposted, "id = ?", nrec.Record).Error; err != nil { 174 175 return nil, err 175 176 } ··· 179 180 return nil, err 180 181 } 181 182 182 - var repost RepostRecord 183 + var repost types.RepostRecord 183 184 if err := nm.db.First(&repost, "id = ?", nrec.Record).Error; err != nil { 184 185 return nil, err 185 186 } ··· 209 210 } 210 211 211 212 func (nm *NotificationManager) hydrateNotificationReply(ctx context.Context, nrec *NotifRecord, lastSeen time.Time) (*appbskytypes.NotificationList_Notification, error) { 212 - var fp FeedPost 213 + var fp types.FeedPost 213 214 if err := nm.db.First(&fp, "id = ?", nrec.Record).Error; err != nil { 214 215 return nil, err 215 216 } 216 217 217 - var replyTo FeedPost 218 + var replyTo types.FeedPost 218 219 if err := nm.db.First(&replyTo, "id = ?", nrec.ReplyTo).Error; err != nil { 219 220 return nil, err 220 221 } 221 222 222 - var author ActorInfo 223 + var author types.ActorInfo 223 224 if err := nm.db.First(&author, "id = ?", fp.Author).Error; err != nil { 224 225 return nil, err 225 226 } 226 227 227 - var opAuthor ActorInfo 228 + var opAuthor types.ActorInfo 228 229 if err := nm.db.First(&opAuthor, "id = ?", replyTo.Author).Error; err != nil { 229 230 return nil, err 230 231 } ··· 249 250 } 250 251 251 252 func (nm *NotificationManager) hydrateNotificationFollow(ctx context.Context, nrec *NotifRecord, lastSeen time.Time) (*appbskytypes.NotificationList_Notification, error) { 252 - var frec FollowRecord 253 + var frec types.FollowRecord 253 254 if err := nm.db.First(&frec, "id = ?", nrec.Record).Error; err != nil { 254 255 return nil, err 255 256 } 256 257 257 - var follower ActorInfo 258 + var follower types.ActorInfo 258 259 if err := nm.db.First(&follower, "id = ?", nrec.Who).Error; err != nil { 259 260 return nil, err 260 261 } ··· 306 307 return nil 307 308 } 308 309 309 - func (nm *NotificationManager) AddReplyTo(ctx context.Context, user uint, replyid uint, replyto *FeedPost) error { 310 + func (nm *NotificationManager) AddReplyTo(ctx context.Context, user uint, replyid uint, replyto *types.FeedPost) error { 310 311 return nm.db.Create(&NotifRecord{ 311 312 Kind: NotifKindReply, 312 313 For: replyto.Author,
+142 -42
server/server.go
··· 6 6 "encoding/json" 7 7 "errors" 8 8 "fmt" 9 + "log" 9 10 "net/mail" 11 + "net/url" 10 12 "os" 11 13 "strings" 12 14 "time" 13 15 14 16 gojwt "github.com/golang-jwt/jwt" 17 + "github.com/gorilla/websocket" 15 18 "github.com/ipfs/go-cid" 16 19 "github.com/labstack/echo/v4" 17 20 "github.com/labstack/echo/v4/middleware" 18 21 "github.com/lestrrat-go/jwx/jwa" 19 22 jwk "github.com/lestrrat-go/jwx/jwk" 20 23 jwt "github.com/lestrrat-go/jwx/jwt" 21 - "github.com/whyrusleeping/go-did" 22 24 comatprototypes "github.com/whyrusleeping/gosky/api/atproto" 23 - appbskytypes "github.com/whyrusleeping/gosky/api/bsky" 25 + bsky "github.com/whyrusleeping/gosky/api/bsky" 24 26 "github.com/whyrusleeping/gosky/carstore" 27 + "github.com/whyrusleeping/gosky/events" 28 + "github.com/whyrusleeping/gosky/indexer" 25 29 "github.com/whyrusleeping/gosky/key" 26 30 "github.com/whyrusleeping/gosky/lex/util" 31 + "github.com/whyrusleeping/gosky/notifs" 32 + "github.com/whyrusleeping/gosky/plc" 27 33 "github.com/whyrusleeping/gosky/repomgr" 34 + "github.com/whyrusleeping/gosky/types" 28 35 "github.com/whyrusleeping/gosky/xrpc" 29 36 "gorm.io/gorm" 30 37 ) ··· 34 41 cs *carstore.CarStore 35 42 repoman *repomgr.RepoManager 36 43 feedgen *FeedGenerator 37 - notifman *NotificationManager 38 - indexer *Indexer 39 - events *EventManager 44 + notifman *notifs.NotificationManager 45 + indexer *indexer.Indexer 46 + events *events.EventManager 40 47 signingKey *key.Key 41 48 echo *echo.Echo 42 49 jwtSigningKey []byte ··· 44 51 handleSuffix string 45 52 serviceUrl string 46 53 47 - plc PLCClient 54 + plc plc.PLCClient 48 55 } 49 56 50 57 const UserActorDeclCid = "bafyreid27zk7lbis4zw5fz4podbvbs4fc5ivwji3dmrwa6zggnj4bnd57u" 51 58 const UserActorDeclType = "app.bsky.system.actorUser" 52 59 53 - type PLCClient interface { 54 - GetDocument(ctx context.Context, didstr string) (*did.Document, error) 55 - CreateDID(ctx context.Context, sigkey *key.Key, recovery string, handle string, service string) (string, error) 56 - } 57 - 58 - func NewServer(db *gorm.DB, cs *carstore.CarStore, kfile string, handleSuffix, serviceUrl string, didr PLCClient, jwtkey []byte) (*Server, error) { 60 + func NewServer(db *gorm.DB, cs *carstore.CarStore, kfile string, handleSuffix, serviceUrl string, didr plc.PLCClient, jwtkey []byte) (*Server, error) { 59 61 db.AutoMigrate(&User{}) 60 62 db.AutoMigrate(&Peering{}) 61 63 db.AutoMigrate(&ExternalFollow{}) ··· 65 67 return nil, err 66 68 } 67 69 68 - evtman := NewEventManager() 70 + evtman := events.NewEventManager() 69 71 70 72 repoman := repomgr.NewRepoManager(db, cs) 71 - notifman := NewNotificationManager(db, repoman) 73 + notifman := notifs.NewNotificationManager(db, repoman) 72 74 73 - ix, err := NewIndexer(db, notifman, evtman, didr) 75 + ix, err := indexer.NewIndexer(db, notifman, evtman, didr) 74 76 if err != nil { 75 77 return nil, err 76 78 } ··· 106 108 */ 107 109 }) 108 110 109 - ix.sendRemoteFollow = s.sendRemoteFollow 111 + ix.SendRemoteFollow = s.sendRemoteFollow 112 + ix.CreateExternalUser = s.createExternalUser 110 113 111 114 feedgen, err := NewFeedGenerator(db, ix, s.readRecordFunc) 112 115 if err != nil { ··· 120 123 return s, nil 121 124 } 122 125 123 - func (s *Server) handleFedEvent(ctx context.Context, host *Peering, evt *Event) error { 126 + func (s *Server) handleFedEvent(ctx context.Context, host *Peering, evt *events.Event) error { 124 127 fmt.Printf("[%s] got fed event from %q: %s\n", s.serviceUrl, host.Host, evt.Kind) 125 128 switch evt.Kind { 126 - case EvtKindCreateRecord: 129 + case events.EvtKindCreateRecord: 127 130 u, err := s.lookupUserByDid(ctx, evt.User) 128 131 if err != nil { 129 132 if !errors.Is(err, gorm.ErrRecordNotFound) { 130 133 return fmt.Errorf("looking up event user: %w", err) 131 134 } 132 135 133 - subj, err := s.indexer.createExternalUser(ctx, evt.User) 136 + subj, err := s.createExternalUser(ctx, evt.User) 134 137 if err != nil { 135 138 return err 136 139 } ··· 140 143 } 141 144 142 145 return s.repoman.HandleExternalUserEvent(ctx, host.ID, repomgr.EvtKindCreateRecord, u.ID, evt.Collection, evt.Rkey, evt.CarSlice) 143 - case EvtKindUpdateRecord: 146 + case events.EvtKindUpdateRecord: 144 147 default: 145 148 return fmt.Errorf("unrecognized fed event kind: %q", evt.Kind) 146 149 } 147 150 return nil 148 151 } 149 152 150 - func (s *Server) repoEventToFedEvent(ctx context.Context, evt *repomgr.RepoEvent) (*Event, error) { 151 - out := &Event{ 153 + func (s *Server) createExternalUser(ctx context.Context, did string) (*types.ActorInfo, error) { 154 + doc, err := s.plc.GetDocument(ctx, did) 155 + if err != nil { 156 + return nil, fmt.Errorf("could not locate DID document for followed user: %s", err) 157 + } 158 + 159 + if len(doc.Service) == 0 { 160 + return nil, fmt.Errorf("external followed user %s had no services in did document", did) 161 + } 162 + 163 + svc := doc.Service[0] 164 + durl, err := url.Parse(svc.ServiceEndpoint) 165 + if err != nil { 166 + return nil, err 167 + } 168 + 169 + // TODO: the PDS's DID should also be in the service, we could use that to look up? 170 + var peering Peering 171 + if err := s.db.First(&peering, "host = ?", durl.Host).Error; err != nil { 172 + return nil, err 173 + } 174 + 175 + var handle string 176 + if len(doc.AlsoKnownAs) > 0 { 177 + hurl, err := url.Parse(doc.AlsoKnownAs[0]) 178 + if err != nil { 179 + return nil, err 180 + } 181 + 182 + handle = hurl.Host 183 + } 184 + 185 + c := &xrpc.Client{Host: svc.ServiceEndpoint} 186 + profile, err := bsky.ActorGetProfile(ctx, c, did) 187 + if err != nil { 188 + return nil, err 189 + } 190 + 191 + if handle != profile.Handle { 192 + return nil, fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle) 193 + } 194 + 195 + // TODO: request this users info from their server to fill out our data... 196 + u := User{ 197 + Handle: handle, 198 + Did: did, 199 + PDS: peering.ID, 200 + } 201 + 202 + if err := s.db.Create(&u).Error; err != nil { 203 + return nil, fmt.Errorf("failed to create other pds user: %w", err) 204 + } 205 + 206 + // okay cool, its a user on a server we are peered with 207 + // lets make a local record of that user for the future 208 + subj := &types.ActorInfo{ 209 + Uid: u.ID, 210 + Handle: handle, 211 + DisplayName: *profile.DisplayName, 212 + Did: did, 213 + DeclRefCid: profile.Declaration.Cid, 214 + Type: "", 215 + PDS: peering.ID, 216 + } 217 + if err := s.db.Create(subj).Error; err != nil { 218 + return nil, err 219 + } 220 + 221 + return subj, nil 222 + } 223 + 224 + func (s *Server) repoEventToFedEvent(ctx context.Context, evt *repomgr.RepoEvent) (*events.Event, error) { 225 + out := &events.Event{ 152 226 CarSlice: evt.RepoSlice, 153 227 } 154 228 155 229 switch evt.Kind { 156 230 case repomgr.EvtKindCreateRecord: 157 - out.Kind = EvtKindCreateRecord 231 + out.Kind = events.EvtKindCreateRecord 158 232 case repomgr.EvtKindUpdateRecord: 159 - out.Kind = EvtKindUpdateRecord 233 + out.Kind = events.EvtKindUpdateRecord 160 234 case repomgr.EvtKindInitActor: 161 235 return nil, nil 162 236 default: 163 237 return nil, fmt.Errorf("unrecognized repo event kind: %q", evt.Kind) 164 238 } 165 239 166 - did, err := s.indexer.didForUser(ctx, evt.User) 240 + did, err := s.indexer.DidForUser(ctx, evt.User) 167 241 if err != nil { 168 242 return nil, err 169 243 } 170 244 171 - out.uid = evt.User 245 + out.PrivUid = evt.User 172 246 out.User = did 173 247 out.Collection = evt.Collection 174 248 out.Rkey = evt.Rkey ··· 455 529 return nil 456 530 } 457 531 458 - func infoToActorRef(ai *ActorInfo) *appbskytypes.ActorRef_WithInfo { 459 - return &appbskytypes.ActorRef_WithInfo{ 460 - Declaration: &appbskytypes.SystemDeclRef{ 461 - Cid: ai.DeclRefCid, 462 - ActorType: ai.Type, 463 - }, 464 - Handle: ai.Handle, 465 - DisplayName: &ai.DisplayName, 466 - Did: ai.Did, 467 - } 468 - } 469 - 470 532 func (s *Server) invalidateToken(ctx context.Context, u *User, tok *jwt.Token) error { 471 - panic("nyi") 472 - } 473 - 474 - func (s *Server) CreateScene(ctx context.Context, u *User, handle string, recovery *string) (interface{}, error) { 475 533 panic("nyi") 476 534 } 477 535 ··· 561 619 562 620 return extfollow.ID != 0, nil 563 621 } 622 + 623 + func (s *Server) EventsHandler(c echo.Context) error { 624 + did := c.Request().Header.Get("DID") 625 + conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) 626 + if err != nil { 627 + return err 628 + } 629 + ctx := c.Request().Context() 630 + 631 + var peering Peering 632 + if err := s.db.First(&peering, "did = ?", did).Error; err != nil { 633 + return err 634 + } 635 + 636 + evts, cancel, err := s.events.Subscribe(func(evt *events.Event) bool { 637 + for _, pid := range evt.PrivRelevantPds { 638 + if pid == peering.ID { 639 + return true 640 + } 641 + } 642 + 643 + has, err := s.peerHasFollow(ctx, peering.ID, evt.PrivUid) 644 + if err != nil { 645 + log.Println("error checking peer follow relationship: ", err) 646 + return false 647 + } 648 + 649 + return has 650 + }) 651 + if err != nil { 652 + return err 653 + } 654 + defer cancel() 655 + 656 + for evt := range evts { 657 + if err := conn.WriteJSON(evt); err != nil { 658 + return err 659 + } 660 + } 661 + 662 + return nil 663 + }
+93
types/types.go
··· 1 + package types 2 + 3 + import ( 4 + "time" 5 + 6 + "gorm.io/gorm" 7 + 8 + bsky "github.com/whyrusleeping/gosky/api/bsky" 9 + ) 10 + 11 + type FeedPost struct { 12 + gorm.Model 13 + Author uint 14 + Rkey string 15 + Cid string 16 + UpCount int64 17 + ReplyCount int64 18 + RepostCount int64 19 + ReplyTo uint 20 + } 21 + 22 + type RepostRecord struct { 23 + ID uint `gorm:"primarykey"` 24 + CreatedAt time.Time 25 + RecCreated string 26 + Post uint 27 + Reposter uint 28 + Author uint 29 + RecCid string 30 + Rkey string 31 + } 32 + 33 + type ActorInfo struct { 34 + gorm.Model 35 + Uid uint `gorm:"index"` 36 + Handle string 37 + DisplayName string 38 + Did string 39 + Following int64 40 + Followers int64 41 + Posts int64 42 + DeclRefCid string 43 + Type string 44 + PDS uint 45 + } 46 + 47 + func (ai *ActorInfo) ActorRef() *bsky.ActorRef_WithInfo { 48 + return &bsky.ActorRef_WithInfo{ 49 + Did: ai.Did, 50 + Declaration: &bsky.SystemDeclRef{ 51 + Cid: ai.DeclRefCid, 52 + ActorType: ai.Type, 53 + }, 54 + Handle: ai.Handle, 55 + DisplayName: &ai.DisplayName, 56 + } 57 + } 58 + 59 + type VoteDir int 60 + 61 + func (vd VoteDir) String() string { 62 + switch vd { 63 + case VoteDirUp: 64 + return "up" 65 + case VoteDirDown: 66 + return "down" 67 + default: 68 + return "<unknown>" 69 + } 70 + } 71 + 72 + const ( 73 + VoteDirUp = VoteDir(1) 74 + VoteDirDown = VoteDir(2) 75 + ) 76 + 77 + type VoteRecord struct { 78 + gorm.Model 79 + Dir VoteDir 80 + Voter uint 81 + Post uint 82 + Created string 83 + Rkey string 84 + Cid string 85 + } 86 + 87 + type FollowRecord struct { 88 + gorm.Model 89 + Follower uint 90 + Target uint 91 + Rkey string 92 + Cid string 93 + }