this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove deadcode (2025-03-03) (#972)

Used the `deadcode` tool to identify major chunks of unused code:
https://go.dev/blog/deadcode

Mostly PDS and BGS cruft, but also some entire util files.

authored by

bnewbold and committed by
GitHub
a5758cf2 c2959122

-855
-122
bgs/bgs.go
··· 3 3 import ( 4 4 "context" 5 5 "database/sql" 6 - "encoding/json" 7 6 "errors" 8 7 "fmt" 9 8 "log/slog" ··· 11 10 "net/http" 12 11 _ "net/http/pprof" 13 12 "net/url" 14 - "reflect" 15 13 "strconv" 16 14 "strings" 17 15 "sync" 18 16 "time" 19 17 20 - "contrib.go.opencensus.io/exporter/prometheus" 21 18 "github.com/bluesky-social/indigo/api" 22 19 atproto "github.com/bluesky-social/indigo/api/atproto" 23 20 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 25 22 "github.com/bluesky-social/indigo/did" 26 23 "github.com/bluesky-social/indigo/events" 27 24 "github.com/bluesky-social/indigo/indexer" 28 - lexutil "github.com/bluesky-social/indigo/lex/util" 29 25 "github.com/bluesky-social/indigo/models" 30 26 "github.com/bluesky-social/indigo/repomgr" 31 27 "github.com/bluesky-social/indigo/xrpc" ··· 34 30 "golang.org/x/time/rate" 35 31 36 32 "github.com/gorilla/websocket" 37 - "github.com/ipfs/go-cid" 38 33 ipld "github.com/ipfs/go-ipld-format" 39 34 "github.com/labstack/echo/v4" 40 35 "github.com/labstack/echo/v4/middleware" ··· 204 199 205 200 func (bgs *BGS) StartMetrics(listen string) error { 206 201 http.Handle("/metrics", promhttp.Handler()) 207 - return http.ListenAndServe(listen, nil) 208 - } 209 - 210 - // Disabled for now, maybe reimplement behind admin auth later 211 - func (bgs *BGS) StartDebug(listen string) error { 212 - http.HandleFunc("/repodbg/user", func(w http.ResponseWriter, r *http.Request) { 213 - ctx := r.Context() 214 - did := r.FormValue("did") 215 - 216 - u, err := bgs.Index.LookupUserByDid(ctx, did) 217 - if err != nil { 218 - http.Error(w, err.Error(), 400) 219 - return 220 - } 221 - 222 - root, err := bgs.repoman.GetRepoRoot(ctx, u.Uid) 223 - if err != nil { 224 - http.Error(w, err.Error(), 400) 225 - return 226 - } 227 - 228 - out := map[string]any{ 229 - "root": root.String(), 230 - "actorInfo": u, 231 - } 232 - 233 - if r.FormValue("carstore") != "" { 234 - stat, err := bgs.repoman.CarStore().Stat(ctx, u.Uid) 235 - if err != nil { 236 - http.Error(w, err.Error(), 400) 237 - return 238 - } 239 - out["carstore"] = stat 240 - } 241 - 242 - json.NewEncoder(w).Encode(out) 243 - }) 244 - http.HandleFunc("/repodbg/crawl", func(w http.ResponseWriter, r *http.Request) { 245 - ctx := r.Context() 246 - did := r.FormValue("did") 247 - 248 - act, err := bgs.Index.GetUserOrMissing(ctx, did) 249 - if err != nil { 250 - w.WriteHeader(500) 251 - bgs.log.Error("failed to get user", "err", err) 252 - return 253 - } 254 - 255 - if err := bgs.Index.Crawler.Crawl(ctx, act); err != nil { 256 - w.WriteHeader(500) 257 - bgs.log.Error("failed to add user to crawler", "err", err) 258 - return 259 - } 260 - }) 261 - http.HandleFunc("/repodbg/blocks", func(w http.ResponseWriter, r *http.Request) { 262 - ctx := r.Context() 263 - did := r.FormValue("did") 264 - c := r.FormValue("cid") 265 - 266 - bcid, err := cid.Decode(c) 267 - if err != nil { 268 - http.Error(w, err.Error(), 400) 269 - return 270 - } 271 - 272 - cs := bgs.repoman.CarStore() 273 - 274 - u, err := bgs.Index.LookupUserByDid(ctx, did) 275 - if err != nil { 276 - http.Error(w, err.Error(), 400) 277 - return 278 - } 279 - 280 - bs, err := cs.ReadOnlySession(u.Uid) 281 - if err != nil { 282 - http.Error(w, err.Error(), 400) 283 - return 284 - } 285 - 286 - blk, err := bs.Get(ctx, bcid) 287 - if err != nil { 288 - http.Error(w, err.Error(), 400) 289 - return 290 - } 291 - 292 - w.WriteHeader(200) 293 - w.Write(blk.RawData()) 294 - }) 295 - 296 202 return http.ListenAndServe(listen, nil) 297 203 } 298 204 ··· 761 667 } 762 668 } 763 669 764 - func prometheusHandler() http.Handler { 765 - // Prometheus globals are exposed as interfaces, but the prometheus 766 - // OpenCensus exporter expects a concrete *Registry. The concrete type of 767 - // the globals are actually *Registry, so we downcast them, staying 768 - // defensive in case things change under the hood. 769 - registry, ok := promclient.DefaultRegisterer.(*promclient.Registry) 770 - if !ok { 771 - slog.Warn("failed to export default prometheus registry; some metrics will be unavailable; unexpected type", "type", reflect.TypeOf(promclient.DefaultRegisterer)) 772 - } 773 - exporter, err := prometheus.NewExporter(prometheus.Options{ 774 - Registry: registry, 775 - Namespace: "bigsky", 776 - }) 777 - if err != nil { 778 - slog.Error("could not create the prometheus stats exporter", "err", err, "system", "bgs") 779 - } 780 - 781 - return exporter 782 - } 783 - 784 670 // domainIsBanned checks if the given host is banned, starting with the host 785 671 // itself, then checking every parent domain up to the tld 786 672 func (s *BGS) domainIsBanned(ctx context.Context, host string) (bool, error) { ··· 865 751 } 866 752 867 753 return &u, nil 868 - } 869 - 870 - func stringLink(lnk *lexutil.LexLink) string { 871 - if lnk == nil { 872 - return "<nil>" 873 - } 874 - 875 - return lnk.String() 876 754 } 877 755 878 756 func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error {
-15
bgs/stubs.go
··· 17 17 Message string `json:"message"` 18 18 } 19 19 20 - func (s *BGS) RegisterHandlersAppBsky(e *echo.Echo) error { 21 - return nil 22 - } 23 - 24 - func (s *BGS) RegisterHandlersComAtproto(e *echo.Echo) error { 25 - e.GET("/xrpc/com.atproto.sync.getBlocks", s.HandleComAtprotoSyncGetBlocks) 26 - e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.HandleComAtprotoSyncGetLatestCommit) 27 - e.GET("/xrpc/com.atproto.sync.getRecord", s.HandleComAtprotoSyncGetRecord) 28 - e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo) 29 - e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) 30 - e.POST("/xrpc/com.atproto.sync.notifyOfUpdate", s.HandleComAtprotoSyncNotifyOfUpdate) 31 - e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) 32 - return nil 33 - } 34 - 35 20 func (s *BGS) HandleComAtprotoSyncGetBlocks(c echo.Context) error { 36 21 ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetBlocks") 37 22 defer span.End()
-60
events/repostream.go
··· 1 - package events 2 - 3 - import ( 4 - "context" 5 - 6 - "github.com/bluesky-social/indigo/repomgr" 7 - 8 - "github.com/gorilla/websocket" 9 - cid "github.com/ipfs/go-cid" 10 - ) 11 - 12 - type LiteStreamHandleFunc func(op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error 13 - 14 - func ConsumeRepoStreamLite2(ctx context.Context, con *websocket.Conn, cb LiteStreamHandleFunc) error { 15 - /* 16 - return HandleRepoStream(ctx, con, &RepoStreamCallbacks{ 17 - RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 18 - if evt.TooBig { 19 - log.Errorf("skipping too big events for now: %d", evt.Seq) 20 - return nil 21 - } 22 - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 23 - if err != nil { 24 - return fmt.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) 25 - } 26 - 27 - for _, op := range evt.Ops { 28 - ek := repomgr.EventKind(op.Action) 29 - switch ek { 30 - case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 31 - rc, rec, err := r.GetRecord(ctx, op.Path) 32 - if err != nil { 33 - e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) 34 - log.Error(e) 35 - continue 36 - } 37 - 38 - if lexutil.LexLink(rc) != *op.Cid { 39 - // TODO: do we even error here? 40 - return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) 41 - } 42 - 43 - if err := cb(ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { 44 - log.Errorf("event consumer callback (%s): %s", ek, err) 45 - continue 46 - } 47 - 48 - case repomgr.EvtKindDeleteRecord: 49 - if err := cb(ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { 50 - log.Errorf("event consumer callback (%s): %s", ek, err) 51 - continue 52 - } 53 - } 54 - } 55 - return nil 56 - }, 57 - }) 58 - */ 59 - return nil 60 - }
-20
pds/auth.go
··· 47 47 Did: did, 48 48 }, nil 49 49 } 50 - 51 - func (s *Server) createCrossServerAuthToken(ctx context.Context, otherpds string) (*xrpc.AuthInfo, error) { 52 - accessTok := makeToken(otherpds, "com.atproto.federation", time.Now().Add(24*time.Hour)) 53 - 54 - // setting this is a little weird, 55 - // since the token isn't signed by this key, we dont have a way to validate... 56 - accessTok.Set("pds", s.signingKey.Public().DID()) 57 - 58 - rval := make([]byte, 10) 59 - rand.Read(rval) 60 - 61 - accSig, err := jwt.Sign(accessTok, jwt.WithKey(jwa.HS256, s.jwtSigningKey)) 62 - if err != nil { 63 - return nil, err 64 - } 65 - 66 - return &xrpc.AuthInfo{ 67 - AccessJwt: string(accSig), 68 - }, nil 69 - }
-431
pds/feedgen.go
··· 1 - package pds 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "log/slog" 7 - "sort" 8 - "strings" 9 - "time" 10 - 11 - bsky "github.com/bluesky-social/indigo/api/bsky" 12 - "github.com/bluesky-social/indigo/indexer" 13 - lexutil "github.com/bluesky-social/indigo/lex/util" 14 - "github.com/bluesky-social/indigo/models" 15 - 16 - "github.com/ipfs/go-cid" 17 - "go.opentelemetry.io/otel" 18 - "gorm.io/gorm" 19 - ) 20 - 21 - type FeedGenerator struct { 22 - db *gorm.DB 23 - ix *indexer.Indexer 24 - 25 - readRecord ReadRecordFunc 26 - 27 - log *slog.Logger 28 - } 29 - 30 - func NewFeedGenerator(db *gorm.DB, ix *indexer.Indexer, readRecord ReadRecordFunc, log *slog.Logger) (*FeedGenerator, error) { 31 - return &FeedGenerator{ 32 - db: db, 33 - ix: ix, 34 - readRecord: readRecord, 35 - log: log, 36 - }, nil 37 - } 38 - 39 - type ReadRecordFunc func(context.Context, models.Uid, cid.Cid) (lexutil.CBOR, error) 40 - 41 - /* 42 - type HydratedFeedItem struct { 43 - Uri string 44 - RepostedBy *bsky.ActorDefs_ProfileViewBasic 45 - Record any 46 - ReplyCount int64 47 - RepostCount int64 48 - UpvoteCount int64 49 - DownvoteCount int64 50 - MyState *bsky.FeedGetAuthorFeed_MyState 51 - Cid string 52 - Author *bsky.ActorDefs_ProfileViewBasic 53 - TrendedBy *bsky.ActorDefs_ProfileViewBasic 54 - Embed *bsky.FeedEmbed 55 - IndexedAt string 56 - } 57 - */ 58 - 59 - func (fg *FeedGenerator) hydrateFeed(ctx context.Context, items []*models.FeedPost, reposts []*models.RepostRecord) ([]*bsky.FeedDefs_FeedViewPost, error) { 60 - out := make([]*bsky.FeedDefs_FeedViewPost, 0, len(items)) 61 - for _, it := range items { 62 - hit, err := fg.hydrateItem(ctx, it) 63 - if err != nil { 64 - return nil, err 65 - } 66 - 67 - out = append(out, hit) 68 - } 69 - 70 - if len(reposts) > 0 { 71 - for _, rp := range reposts { 72 - var fp models.FeedPost 73 - if err := fg.db.First(&fp, "id = ?", rp.Post).Error; err != nil { 74 - return nil, err 75 - } 76 - 77 - fvp, err := fg.hydrateItem(ctx, &fp) 78 - if err != nil { 79 - return nil, err 80 - } 81 - 82 - reposter, err := fg.getActorRefInfo(ctx, rp.Reposter) 83 - if err != nil { 84 - return nil, err 85 - } 86 - 87 - fvp.Reason = &bsky.FeedDefs_FeedViewPost_Reason{ 88 - FeedDefs_ReasonRepost: &bsky.FeedDefs_ReasonRepost{ 89 - By: reposter, 90 - IndexedAt: rp.CreatedAt.Format(time.RFC3339), 91 - }, 92 - } 93 - 94 - out = append(out, fvp) 95 - } 96 - } 97 - 98 - return out, nil 99 - } 100 - 101 - func (fg *FeedGenerator) didForUser(ctx context.Context, user models.Uid) (string, error) { 102 - // TODO: cache the shit out of this 103 - var ai models.ActorInfo 104 - if err := fg.db.First(&ai, "uid = ?", user).Error; err != nil { 105 - return "", err 106 - } 107 - 108 - return ai.Did, nil 109 - } 110 - 111 - func (fg *FeedGenerator) getActorRefInfo(ctx context.Context, user models.Uid) (*bsky.ActorDefs_ProfileViewBasic, error) { 112 - // TODO: cache the shit out of this too 113 - var ai models.ActorInfo 114 - if err := fg.db.First(&ai, "uid = ?", user).Error; err != nil { 115 - return nil, err 116 - } 117 - 118 - return ai.ActorRef(), nil 119 - } 120 - 121 - func (fg *FeedGenerator) hydrateItem(ctx context.Context, item *models.FeedPost) (*bsky.FeedDefs_FeedViewPost, error) { 122 - authorDid, err := fg.didForUser(ctx, item.Author) 123 - if err != nil { 124 - return nil, err 125 - } 126 - 127 - out := &bsky.FeedDefs_FeedViewPost{} 128 - 129 - out.Post = &bsky.FeedDefs_PostView{ 130 - Uri: "at://" + authorDid + "/app.bsky.feed.post/" + item.Rkey, 131 - ReplyCount: &item.ReplyCount, 132 - RepostCount: &item.RepostCount, 133 - LikeCount: &item.UpCount, 134 - Cid: item.Cid, 135 - IndexedAt: item.UpdatedAt.Format(time.RFC3339), 136 - } 137 - 138 - author, err := fg.getActorRefInfo(ctx, item.Author) 139 - if err != nil { 140 - return nil, err 141 - } 142 - 143 - out.Post.Author = author 144 - 145 - reccid, err := cid.Decode(item.Cid) 146 - if err != nil { 147 - return nil, err 148 - } 149 - 150 - rec, err := fg.readRecord(ctx, item.Author, reccid) 151 - if err != nil { 152 - return nil, err 153 - } 154 - 155 - out.Post.Record = &lexutil.LexiconTypeDecoder{Val: rec} 156 - 157 - return out, nil 158 - } 159 - 160 - func (fg *FeedGenerator) getPostViewerState(ctx context.Context, item uint, viewer models.Uid, viewerDid string) (*bsky.FeedDefs_ViewerState, error) { 161 - var out bsky.FeedDefs_ViewerState 162 - 163 - var vote models.VoteRecord 164 - if err := fg.db.Find(&vote, "post = ? AND voter = ?", item, viewer).Error; err != nil { 165 - return nil, err 166 - } 167 - 168 - if vote.ID != 0 { 169 - vuri := fmt.Sprintf("at://%s/app.bsky.feed.vote/%s", viewerDid, vote.Rkey) 170 - out.Like = &vuri 171 - } 172 - 173 - var rep models.RepostRecord 174 - if err := fg.db.Find(&rep, "post = ? AND reposter = ?", item, viewer).Error; err != nil { 175 - return nil, err 176 - } 177 - 178 - if rep.ID != 0 { 179 - rpuri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", viewerDid, rep.Rkey) 180 - out.Repost = &rpuri 181 - } 182 - 183 - return &out, nil 184 - } 185 - 186 - func (fg *FeedGenerator) GetTimeline(ctx context.Context, user *User, algo string, before string, limit int) ([]*bsky.FeedDefs_FeedViewPost, error) { 187 - ctx, span := otel.Tracer("feedgen").Start(context.Background(), "GetTimeline") 188 - defer span.End() 189 - 190 - // TODO: this query is just a temporary hack... 191 - var feed []*models.FeedPost 192 - if err := fg.db.Debug().Find(&feed, "author in (?)", 193 - fg.db.Model(models.FollowRecord{}).Where("follower = ?", user.ID).Select("target"), 194 - ).Error; err != nil { 195 - return nil, err 196 - } 197 - 198 - var rps []*models.RepostRecord 199 - if err := fg.db.Debug().Find(&rps, "reposter in (?)", 200 - fg.db.Model(models.FollowRecord{}).Where("follower = ?", user.ID).Select("target"), 201 - ).Error; err != nil { 202 - return nil, err 203 - } 204 - 205 - fout, err := fg.hydrateFeed(ctx, feed, rps) 206 - if err != nil { 207 - return nil, fmt.Errorf("hydrating feed: %w", err) 208 - } 209 - 210 - sort.Slice(fout, func(i, j int) bool { 211 - ti, _ := time.Parse(time.RFC3339, fout[i].Post.IndexedAt) 212 - tj, _ := time.Parse(time.RFC3339, fout[j].Post.IndexedAt) 213 - 214 - return tj.Before(ti) 215 - }) 216 - 217 - return fg.personalizeFeed(ctx, fout, user) 218 - } 219 - 220 - func (fg *FeedGenerator) personalizeFeed(ctx context.Context, feed []*bsky.FeedDefs_FeedViewPost, viewer *User) ([]*bsky.FeedDefs_FeedViewPost, error) { 221 - for _, p := range feed { 222 - 223 - // TODO: its inefficient to have to call 'GetPost' again here when we could instead be doing that inside the 'hydrateFeed' call earlier. 224 - // However, if we introduce per-user information into hydrateFeed, then 225 - // it cannot be effectively cached, so we separate the 'cacheable' 226 - // portion of feed generation from the 'per user' portion. An 227 - // optimization could be to hide the internal post IDs in the Post 228 - // structs for internal use (stripped out before sending to client) 229 - item, err := fg.ix.GetPost(ctx, p.Post.Uri) 230 - if err != nil { 231 - return nil, err 232 - } 233 - 234 - vs, err := fg.getPostViewerState(ctx, item.ID, viewer.ID, viewer.Did) 235 - if err != nil { 236 - return nil, fmt.Errorf("getting viewer state: %w", err) 237 - } 238 - 239 - p.Post.Viewer = vs 240 - } 241 - 242 - return feed, nil 243 - } 244 - 245 - func (fg *FeedGenerator) GetAuthorFeed(ctx context.Context, user *User, before string, limit int) ([]*bsky.FeedDefs_FeedViewPost, error) { 246 - ctx, span := otel.Tracer("feedgen").Start(context.Background(), "GetAuthorFeed") 247 - defer span.End() 248 - 249 - // for memory efficiency, should probably return the actual type that goes out to the user... 250 - // bsky.FeedGetAuthorFeed_FeedItem 251 - 252 - var feed []*models.FeedPost 253 - if err := fg.db.Find(&feed, "author = ?", user.ID).Error; err != nil { 254 - return nil, err 255 - } 256 - 257 - var reposts []*models.RepostRecord 258 - if err := fg.db.Find(&reposts, "reposter = ?", user.ID).Error; err != nil { 259 - return nil, err 260 - } 261 - 262 - fout, err := fg.hydrateFeed(ctx, feed, reposts) 263 - if err != nil { 264 - return nil, fmt.Errorf("hydrating feed: %w", err) 265 - } 266 - 267 - return fg.personalizeFeed(ctx, fout, user) 268 - } 269 - 270 - func (fg *FeedGenerator) GetActorProfileByID(ctx context.Context, actor uint) (*models.ActorInfo, error) { 271 - var ai models.ActorInfo 272 - if err := fg.db.First(&ai, "id = ?", actor).Error; err != nil { 273 - return nil, fmt.Errorf("getActorProfileByID: %w", err) 274 - } 275 - 276 - return &ai, nil 277 - } 278 - func (fg *FeedGenerator) GetActorProfile(ctx context.Context, actor string) (*models.ActorInfo, error) { 279 - fmt.Println("get actor profile: ", actor) 280 - var ai models.ActorInfo 281 - if strings.HasPrefix(actor, "did:") { 282 - if err := fg.db.First(&ai, "did = ?", actor).Error; err != nil { 283 - return nil, err 284 - } 285 - } else { 286 - if err := fg.db.First(&ai, "handle = ?", actor).Error; err != nil { 287 - return nil, err 288 - } 289 - } 290 - 291 - return &ai, nil 292 - } 293 - 294 - type ThreadPost struct { 295 - Post *bsky.FeedDefs_FeedViewPost 296 - PostID uint 297 - 298 - ParentUri string 299 - Parent *ThreadPost 300 - } 301 - 302 - func (fg *FeedGenerator) GetPostThread(ctx context.Context, uri string, depth int) (*ThreadPost, error) { 303 - post, err := fg.ix.GetPost(ctx, uri) 304 - if err != nil { 305 - return nil, fmt.Errorf("getting post for thread: %w", err) 306 - } 307 - 308 - hi, err := fg.hydrateItem(ctx, post) 309 - if err != nil { 310 - return nil, err 311 - } 312 - 313 - p, ok := hi.Post.Record.Val.(*bsky.FeedPost) 314 - if !ok { 315 - return nil, fmt.Errorf("getPostThread can only operate on app.bsky.feed.post records") 316 - } 317 - 318 - out := &ThreadPost{ 319 - Post: hi, 320 - PostID: post.ID, 321 - } 322 - 323 - if p.Reply != nil { 324 - out.ParentUri = p.Reply.Parent.Uri 325 - if depth > 0 { 326 - 327 - parent, err := fg.GetPostThread(ctx, p.Reply.Parent.Uri, depth-1) 328 - if err != nil { 329 - // TODO: check for and handle 'not found' 330 - return nil, err 331 - } 332 - out.Parent = parent 333 - } 334 - } 335 - 336 - return out, nil 337 - } 338 - 339 - type HydratedVote struct { 340 - Actor *bsky.ActorDefs_ProfileViewBasic 341 - Direction string 342 - IndexedAt time.Time 343 - CreatedAt string 344 - } 345 - 346 - func (fg *FeedGenerator) hydrateVote(ctx context.Context, v *models.VoteRecord) (*HydratedVote, error) { 347 - aref, err := fg.getActorRefInfo(ctx, v.Voter) 348 - if err != nil { 349 - return nil, err 350 - } 351 - 352 - return &HydratedVote{ 353 - Actor: aref, 354 - Direction: v.Dir.String(), 355 - IndexedAt: v.UpdatedAt, 356 - CreatedAt: v.Created, 357 - }, nil 358 - } 359 - 360 - func (fg *FeedGenerator) GetVotes(ctx context.Context, uri string, pcid cid.Cid, limit int, before string) ([]*HydratedVote, error) { 361 - if before != "" { 362 - fg.log.Warn("not respecting 'before' yet") 363 - } 364 - 365 - p, err := fg.ix.GetPost(ctx, uri) 366 - if err != nil { 367 - return nil, err 368 - } 369 - 370 - if p.Cid != pcid.String() { 371 - return nil, fmt.Errorf("listing likes of old post versions not supported") 372 - } 373 - 374 - var voterecs []models.VoteRecord 375 - if err := fg.db.Limit(limit).Find(&voterecs, "post = ?", p.ID).Error; err != nil { 376 - return nil, err 377 - } 378 - 379 - var out []*HydratedVote 380 - for _, vr := range voterecs { 381 - hv, err := fg.hydrateVote(ctx, &vr) 382 - if err != nil { 383 - return nil, err 384 - } 385 - out = append(out, hv) 386 - } 387 - 388 - return out, nil 389 - } 390 - 391 - type FollowInfo struct { 392 - Follower *bsky.ActorDefs_ProfileViewBasic 393 - Subject *bsky.ActorDefs_ProfileViewBasic 394 - CreatedAt string 395 - IndexedAt string 396 - } 397 - 398 - func (fg *FeedGenerator) GetFollows(ctx context.Context, user string, limit int, before string) ([]*FollowInfo, error) { 399 - var follows []models.FollowRecord 400 - if err := fg.db.Limit(limit).Find(&follows, "follower = (?)", fg.db.Model(models.ActorInfo{}).Where("did = ? or handle = ?", user, user).Select("uid")).Error; err != nil { 401 - return nil, err 402 - } 403 - 404 - profile, err := fg.GetActorProfile(ctx, user) 405 - if err != nil { 406 - return nil, err 407 - } 408 - 409 - ai, err := fg.getActorRefInfo(ctx, profile.Uid) 410 - if err != nil { 411 - return nil, err 412 - } 413 - 414 - out := []*FollowInfo{} 415 - for _, f := range follows { 416 - fai, err := fg.getActorRefInfo(ctx, f.Target) 417 - if err != nil { 418 - return nil, err 419 - } 420 - 421 - out = append(out, &FollowInfo{ 422 - Follower: ai, 423 - Subject: fai, 424 - CreatedAt: f.CreatedAt.Format(time.RFC3339), 425 - IndexedAt: f.CreatedAt.Format(time.RFC3339), 426 - }) 427 - 428 - } 429 - 430 - return nil, nil 431 - }
-74
pds/server.go
··· 24 24 "github.com/bluesky-social/indigo/plc" 25 25 "github.com/bluesky-social/indigo/repomgr" 26 26 "github.com/bluesky-social/indigo/util" 27 - bsutil "github.com/bluesky-social/indigo/util" 28 27 "github.com/bluesky-social/indigo/xrpc" 29 28 gojwt "github.com/golang-jwt/jwt" 30 29 "github.com/gorilla/websocket" 31 - "github.com/ipfs/go-cid" 32 30 "github.com/labstack/echo/v4" 33 31 "github.com/labstack/echo/v4/middleware" 34 32 "github.com/lestrrat-go/jwx/v2/jwt" ··· 40 38 db *gorm.DB 41 39 cs carstore.CarStore 42 40 repoman *repomgr.RepoManager 43 - feedgen *FeedGenerator 44 41 indexer *indexer.Indexer 45 42 events *events.EventManager 46 43 signingKey *did.PrivKey ··· 105 102 //ix.SendRemoteFollow = s.sendRemoteFollow 106 103 ix.CreateExternalUser = s.createExternalUser 107 104 108 - feedgen, err := NewFeedGenerator(db, ix, s.readRecordFunc, s.log) 109 - if err != nil { 110 - return nil, err 111 - } 112 - 113 - s.feedgen = feedgen 114 - 115 105 return s, nil 116 106 } 117 107 ··· 119 109 return s.echo.Shutdown(ctx) 120 110 } 121 111 122 - func (s *Server) handleFedEvent(ctx context.Context, host *Peering, env *events.XRPCStreamEvent) error { 123 - fmt.Printf("[%s] got fed event from %q\n", s.serviceUrl, host.Host) 124 - switch { 125 - case env.RepoCommit != nil: 126 - evt := env.RepoCommit 127 - u, err := s.lookupUserByDid(ctx, evt.Repo) 128 - if err != nil { 129 - if !errors.Is(err, gorm.ErrRecordNotFound) { 130 - return fmt.Errorf("looking up event user: %w", err) 131 - } 132 - 133 - subj, err := s.createExternalUser(ctx, evt.Repo) 134 - if err != nil { 135 - return err 136 - } 137 - 138 - u = new(User) 139 - u.ID = subj.Uid 140 - } 141 - 142 - return s.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops) 143 - default: 144 - return fmt.Errorf("invalid fed event") 145 - } 146 - } 147 - 148 112 func (s *Server) createExternalUser(ctx context.Context, did string) (*models.ActorInfo, error) { 149 113 doc, err := s.plc.GetDocument(ctx, did) 150 114 if err != nil { ··· 223 187 } 224 188 225 189 return subj, nil 226 - } 227 - 228 - func (s *Server) repoEventToFedEvent(ctx context.Context, evt *repomgr.RepoEvent) (*comatproto.SyncSubscribeRepos_Commit, error) { 229 - did, err := s.indexer.DidForUser(ctx, evt.User) 230 - if err != nil { 231 - return nil, err 232 - } 233 - 234 - out := &comatproto.SyncSubscribeRepos_Commit{ 235 - Blocks: evt.RepoSlice, 236 - Repo: did, 237 - Time: time.Now().Format(bsutil.ISO8601), 238 - //PrivUid: evt.User, 239 - } 240 - 241 - for _, op := range evt.Ops { 242 - out.Ops = append(out.Ops, &comatproto.SyncSubscribeRepos_RepoOp{ 243 - Path: op.Collection + "/" + op.Rkey, 244 - Action: string(op.Kind), 245 - Cid: (*lexutil.LexLink)(op.RecCid), 246 - }) 247 - } 248 - 249 - return out, nil 250 - } 251 - 252 - func (s *Server) readRecordFunc(ctx context.Context, user models.Uid, c cid.Cid) (lexutil.CBOR, error) { 253 - bs, err := s.cs.ReadOnlySession(user) 254 - if err != nil { 255 - return nil, err 256 - } 257 - 258 - blk, err := bs.Get(ctx, c) 259 - if err != nil { 260 - return nil, err 261 - } 262 - 263 - return lexutil.CborDecodeValue(blk.RawData()) 264 190 } 265 191 266 192 func (s *Server) RunAPI(addr string) error {
-48
util/cliutil/util.go
··· 55 55 } 56 56 } 57 57 58 - type CliConfig struct { 59 - filename string 60 - PDS string 61 - } 62 - 63 - func readGoskyConfig() (*CliConfig, error) { 64 - // TODO: use os.UserConfigDir()/gosky, falling back to os.UserHomeDir()/.gosky for backwards compatibility. 65 - d, err := os.UserHomeDir() 66 - if err != nil { 67 - return nil, fmt.Errorf("cannot read Home directory") 68 - } 69 - 70 - f := filepath.Join(d, ".gosky") 71 - 72 - b, err := os.ReadFile(f) 73 - if os.IsNotExist(err) { 74 - return nil, nil 75 - } 76 - 77 - var out CliConfig 78 - if err := json.Unmarshal(b, &out); err != nil { 79 - return nil, err 80 - } 81 - 82 - out.filename = f 83 - return &out, nil 84 - } 85 - 86 - var Config *CliConfig 87 - 88 - func TryReadConfig() { 89 - cfg, err := readGoskyConfig() 90 - if err != nil { 91 - fmt.Println(err) 92 - } else { 93 - Config = cfg 94 - } 95 - } 96 - 97 - func WriteConfig(cfg *CliConfig) error { 98 - b, err := json.Marshal(cfg) 99 - if err != nil { 100 - return err 101 - } 102 - 103 - return os.WriteFile(cfg.filename, b, 0664) 104 - } 105 - 106 58 func GetXrpcClient(cctx *cli.Context, authreq bool) (*xrpc.Client, error) { 107 59 h := "http://localhost:4989" 108 60 if pdsurl := cctx.String("pds-host"); pdsurl != "" {
-85
util/tierbs.go
··· 1 - package util 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - 7 - blockformat "github.com/ipfs/go-block-format" 8 - "github.com/ipfs/go-cid" 9 - blockstore "github.com/ipfs/go-ipfs-blockstore" 10 - ipld "github.com/ipfs/go-ipld-format" 11 - ) 12 - 13 - type ReadThroughBstore struct { 14 - base blockstore.Blockstore 15 - fresh blockstore.Blockstore 16 - } 17 - 18 - func NewReadThroughBstore(base, fresh blockstore.Blockstore) *ReadThroughBstore { 19 - return &ReadThroughBstore{ 20 - base: base, 21 - fresh: fresh, 22 - } 23 - } 24 - 25 - var _ blockstore.Blockstore = (*ReadThroughBstore)(nil) 26 - 27 - func (bs *ReadThroughBstore) DeleteBlock(ctx context.Context, c cid.Cid) error { 28 - return bs.fresh.DeleteBlock(ctx, c) 29 - 30 - } 31 - 32 - func (bs *ReadThroughBstore) Has(ctx context.Context, c cid.Cid) (bool, error) { 33 - h, err := bs.fresh.Has(ctx, c) 34 - if err != nil { 35 - return false, err 36 - } 37 - 38 - if h { 39 - return true, nil 40 - } 41 - 42 - return bs.base.Has(ctx, c) 43 - } 44 - 45 - func (bs *ReadThroughBstore) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { 46 - blk, err := bs.fresh.Get(ctx, c) 47 - if err == nil { 48 - return blk, nil 49 - } 50 - 51 - if !ipld.IsNotFound(err) { 52 - return nil, err 53 - } 54 - 55 - return bs.base.Get(ctx, c) 56 - } 57 - 58 - func (bs *ReadThroughBstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { 59 - size, err := bs.fresh.GetSize(ctx, c) 60 - if err == nil { 61 - return size, nil 62 - } 63 - 64 - if !ipld.IsNotFound(err) { 65 - return -1, err 66 - } 67 - 68 - return bs.base.GetSize(ctx, c) 69 - } 70 - 71 - func (bs *ReadThroughBstore) Put(context.Context, blockformat.Block) error { 72 - return fmt.Errorf("writes not allows on readthrough blockstore") 73 - } 74 - 75 - func (bs *ReadThroughBstore) PutMany(context.Context, []blockformat.Block) error { 76 - return fmt.Errorf("writes not allows on readthrough blockstore") 77 - } 78 - 79 - func (bs *ReadThroughBstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { 80 - return nil, fmt.Errorf("iteration not supported on readthrough blockstore") 81 - } 82 - 83 - func (bs *ReadThroughBstore) HashOnRead(enabled bool) { 84 - 85 - }