BYOK Personal Data Server (PDS) written in Go
ipfs vow atproto pds go
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: code quality

+137 -82
+3 -3
blockstore/ipfs.go
··· 29 29 // NewIPFS creates a blockstore. 30 30 func NewIPFS(did string, cli *rpc.HttpApi) *IPFSBlockstore { 31 31 return &IPFSBlockstore{ 32 - did: did, 33 - cli: cli, 34 - inserts: make(map[cid.Cid]blocks.Block), 32 + did: did, 33 + cli: cli, 34 + inserts: make(map[cid.Cid]blocks.Block), 35 35 } 36 36 } 37 37
+30 -21
identity/identity.go
··· 11 11 12 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 13 "github.com/bluesky-social/indigo/util" 14 + "golang.org/x/sync/singleflight" 14 15 ) 16 + 17 + var docFlight singleflight.Group 15 18 16 19 func ResolveHandleFromTXT(ctx context.Context, handle string) (string, error) { 17 20 name := fmt.Sprintf("_atproto.%s", handle) ··· 104 107 cli = util.RobustHTTPClient() 105 108 } 106 109 107 - ustr, err := DidToDocUrl(did) 108 - if err != nil { 109 - return nil, err 110 - } 110 + v, err, _ := docFlight.Do(did, func() (any, error) { 111 + ustr, err := DidToDocUrl(did) 112 + if err != nil { 113 + return nil, err 114 + } 111 115 112 - req, err := http.NewRequestWithContext(ctx, "GET", ustr, nil) 113 - if err != nil { 114 - return nil, err 115 - } 116 + req, err := http.NewRequestWithContext(ctx, "GET", ustr, nil) 117 + if err != nil { 118 + return nil, err 119 + } 116 120 117 - resp, err := cli.Do(req) 118 - if err != nil { 119 - return nil, err 120 - } 121 - defer func() { _ = resp.Body.Close() }() 121 + resp, err := cli.Do(req) 122 + if err != nil { 123 + return nil, err 124 + } 125 + defer func() { _ = resp.Body.Close() }() 122 126 123 - if resp.StatusCode != 200 { 124 - _, _ = io.Copy(io.Discard, resp.Body) 125 - return nil, fmt.Errorf("unable to find did doc at url. did: %s. url: %s", did, ustr) 126 - } 127 + if resp.StatusCode != 200 { 128 + _, _ = io.Copy(io.Discard, resp.Body) 129 + return nil, fmt.Errorf("unable to find did doc at url. did: %s. url: %s", did, ustr) 130 + } 127 131 128 - var diddoc DidDoc 129 - if err := json.NewDecoder(resp.Body).Decode(&diddoc); err != nil { 132 + var diddoc DidDoc 133 + if err := json.NewDecoder(resp.Body).Decode(&diddoc); err != nil { 134 + return nil, err 135 + } 136 + 137 + return &diddoc, nil 138 + }) 139 + if err != nil { 130 140 return nil, err 131 141 } 132 - 133 - return &diddoc, nil 142 + return v.(*DidDoc), nil 134 143 } 135 144 136 145 func FetchDidData(ctx context.Context, cli *http.Client, did string) (*DidData, error) {
+6 -2
internal/helpers/helpers.go
··· 5 5 "encoding/hex" 6 6 "encoding/json" 7 7 "errors" 8 - "math/rand" 8 + "math/big" 9 9 "net/http" 10 10 "net/url" 11 11 ··· 127 127 func RandomVarchar(length int) string { 128 128 b := make([]rune, length) 129 129 for i := range b { 130 - b[i] = letters[rand.Intn(len(letters))] 130 + n, err := crand.Int(crand.Reader, big.NewInt(int64(len(letters)))) 131 + if err != nil { 132 + panic(err) 133 + } 134 + b[i] = letters[n.Int64()] 131 135 } 132 136 return string(b) 133 137 }
+3 -3
oauth/client/manager.go
··· 254 254 "default_max_age", 255 255 "userinfo_signed_response_alg", 256 256 "id_token_signed_response_alg", 257 - "userinfo_encryhpted_response_alg", 257 + "userinfo_encrypted_response_alg", 258 258 "authorization_encrypted_response_enc", 259 259 "authorization_encrypted_response_alg", 260 260 "tls_client_certificate_bound_access_tokens", ··· 367 367 } 368 368 369 369 if !slices.Contains(metadata.ResponseTypes, "code") { 370 - return nil, errors.New("response_types must inclue `code`") 370 + return nil, errors.New("response_types must include `code`") 371 371 } 372 372 373 373 if !slices.Contains(metadata.GrantTypes, "authorization_code") { ··· 427 427 return nil, fmt.Errorf("loopback redirect uri %s must use http", ruri) 428 428 } 429 429 case u.Scheme == "http": 430 - return nil, errors.New("only loopbvack redirect uris are allowed to use the `http` scheme") 430 + return nil, errors.New("only loopback redirect uris are allowed to use the `http` scheme") 431 431 case u.Scheme == "https": 432 432 if isLocalHostname(u.Hostname()) { 433 433 return nil, fmt.Errorf("redirect uri %s's domain must not be a local hostname", ruri)
+16 -4
oauth/helpers.go
··· 12 12 ) 13 13 14 14 func GenerateCode() string { 15 - h, _ := helpers.RandomHex(constants.CodeBytesLength) 15 + h, err := helpers.RandomHex(constants.CodeBytesLength) 16 + if err != nil { 17 + panic(fmt.Sprintf("GenerateCode: %v", err)) 18 + } 16 19 return constants.CodePrefix + h 17 20 } 18 21 19 22 func GenerateTokenId() string { 20 - h, _ := helpers.RandomHex(constants.TokenIdBytesLength) 23 + h, err := helpers.RandomHex(constants.TokenIdBytesLength) 24 + if err != nil { 25 + panic(fmt.Sprintf("GenerateTokenId: %v", err)) 26 + } 21 27 return constants.TokenIdPrefix + h 22 28 } 23 29 24 30 func GenerateRefreshToken() string { 25 - h, _ := helpers.RandomHex(constants.RefreshTokenBytesLength) 31 + h, err := helpers.RandomHex(constants.RefreshTokenBytesLength) 32 + if err != nil { 33 + panic(fmt.Sprintf("GenerateRefreshToken: %v", err)) 34 + } 26 35 return constants.RefreshTokenPrefix + h 27 36 } 28 37 29 38 func GenerateRequestId() string { 30 - h, _ := helpers.RandomHex(constants.RequestIdBytesLength) 39 + h, err := helpers.RandomHex(constants.RequestIdBytesLength) 40 + if err != nil { 41 + panic(fmt.Sprintf("GenerateRequestId: %v", err)) 42 + } 31 43 return constants.RequestIdPrefix + h 32 44 } 33 45
+1 -1
plc/types.go
··· 4 4 "encoding/json" 5 5 6 6 "github.com/bluesky-social/indigo/atproto/atdata" 7 - "pkg.rbrt.fr/vow/identity" 8 7 cbg "github.com/whyrusleeping/cbor-gen" 8 + "pkg.rbrt.fr/vow/identity" 9 9 ) 10 10 11 11 type DidCredentials struct {
+3 -1
server/handle_account_signin.go
··· 108 108 } 109 109 110 110 if err := bcrypt.CompareHashAndPassword([]byte(repo.Password), []byte(req.Password)); err != nil { 111 - if err != bcrypt.ErrMismatchedHashAndPassword { 111 + if errors.Is(err, bcrypt.ErrMismatchedHashAndPassword) { 112 112 sess.AddFlash("Handle or password is incorrect", "error") 113 113 } else { 114 114 sess.AddFlash("Something went wrong!", "error") ··· 124 124 Path: "/", 125 125 MaxAge: int(AccountSessionMaxAge.Seconds()), 126 126 HttpOnly: true, 127 + Secure: true, 128 + SameSite: http.SameSiteLaxMode, 127 129 } 128 130 129 131 sess.Values = map[any]any{}
+2 -3
server/handle_repo_get_record.go
··· 3 3 import ( 4 4 "fmt" 5 5 "io" 6 + "maps" 6 7 "net/http" 7 8 "strings" 8 9 ··· 81 82 } 82 83 defer func() { _ = resp.Body.Close() }() 83 84 84 - for k, v := range resp.Header { 85 - w.Header()[k] = v 86 - } 85 + maps.Copy(w.Header(), resp.Header) 87 86 w.WriteHeader(resp.StatusCode) 88 87 _, _ = io.Copy(w, resp.Body) 89 88 }
+1 -1
server/handle_repo_list_missing_blobs.go
··· 6 6 "strconv" 7 7 8 8 "github.com/bluesky-social/indigo/atproto/atdata" 9 + "github.com/ipfs/go-cid" 9 10 "pkg.rbrt.fr/vow/internal/helpers" 10 11 "pkg.rbrt.fr/vow/models" 11 - "github.com/ipfs/go-cid" 12 12 ) 13 13 14 14 type ComAtprotoRepoListMissingBlobsResponse struct {
+14 -22
server/handle_root.go
··· 46 46 ctx := r.Context() 47 47 48 48 var stats homeStats 49 - 50 - var totalAccounts int64 51 - s.db.Raw(ctx, "SELECT COUNT(*) FROM repos", nil).Scan(&totalAccounts) 52 - stats.TotalAccounts = totalAccounts 53 - 54 - var activeAccounts int64 55 - s.db.Raw(ctx, "SELECT COUNT(*) FROM repos WHERE deactivated = 0", nil).Scan(&activeAccounts) 56 - stats.ActiveAccounts = activeAccounts 57 - 58 - var totalRecords int64 59 - s.db.Raw(ctx, "SELECT COUNT(*) FROM records", nil).Scan(&totalRecords) 60 - stats.TotalRecords = totalRecords 61 - 62 - var totalBlobs int64 63 - s.db.Raw(ctx, "SELECT COUNT(*) FROM blobs", nil).Scan(&totalBlobs) 64 - stats.TotalBlobs = totalBlobs 65 - 66 - var totalInviteCodes int64 67 - s.db.Raw(ctx, "SELECT COUNT(*) FROM invite_codes WHERE disabled = 0 AND remaining_use_count > 0", nil).Scan(&totalInviteCodes) 68 - stats.TotalInviteCodes = totalInviteCodes 49 + if err := s.db.Raw(ctx, ` 50 + SELECT 51 + (SELECT COUNT(*) FROM repos) AS total_accounts, 52 + (SELECT COUNT(*) FROM repos WHERE deactivated = 0) AS active_accounts, 53 + (SELECT COUNT(*) FROM records) AS total_records, 54 + (SELECT COUNT(*) FROM blobs) AS total_blobs, 55 + (SELECT COUNT(*) FROM invite_codes WHERE disabled = 0 AND remaining_use_count > 0) AS total_invite_codes 56 + `, nil).Scan(&stats).Error; err != nil { 57 + s.logger.Error("error fetching home stats", "error", err) 58 + } 69 59 70 - _ = s.renderTemplate(w, "home.html", homeData{ 60 + if err := s.renderTemplate(w, "home.html", homeData{ 71 61 Hostname: s.config.Hostname, 72 62 Did: s.config.Did, 73 63 ContactEmail: s.config.ContactEmail, 74 64 Version: s.config.Version, 75 65 RequireInvite: s.config.RequireInvite, 76 66 Stats: stats, 77 - }) 67 + }); err != nil { 68 + s.logger.Error("failed to render template", "error", err) 69 + } 78 70 }
+4 -1
server/handle_server_create_account.go
··· 75 75 return 76 76 } 77 77 } 78 + 79 + helpers.InputError(w, nil) 80 + return 78 81 } 79 82 80 83 var signupDid string ··· 178 181 signupDid = did 179 182 } 180 183 181 - hashed, err := bcrypt.GenerateFromPassword([]byte(request.Password), 10) 184 + hashed, err := bcrypt.GenerateFromPassword([]byte(request.Password), 12) 182 185 if err != nil { 183 186 logger.Error("error hashing password", "error", err) 184 187 helpers.ServerError(w, nil)
+3
server/handle_server_create_session.go
··· 53 53 return 54 54 } 55 55 } 56 + 57 + helpers.InputError(w, nil) 58 + return 56 59 } 57 60 58 61 req.Identifier = strings.ToLower(req.Identifier)
+6 -1
server/handle_server_get_session.go
··· 3 3 import ( 4 4 "net/http" 5 5 6 + "pkg.rbrt.fr/vow/internal/helpers" 6 7 "pkg.rbrt.fr/vow/models" 7 8 ) 8 9 ··· 16 17 } 17 18 18 19 func (s *Server) handleGetSession(w http.ResponseWriter, r *http.Request) { 19 - repo, _ := getContextValue[*models.RepoActor](r, contextKeyRepo) 20 + repo, ok := getContextValue[*models.RepoActor](r, contextKeyRepo) 21 + if !ok { 22 + helpers.UnauthorizedError(w, nil) 23 + return 24 + } 20 25 21 26 s.writeJSON(w, 200, ComAtprotoServerGetSessionResponse{ 22 27 Handle: repo.Handle,
+1 -1
server/handle_signer_connect.go
··· 46 46 // wsSignJWTRequest is the JSON envelope pushed to the signer for service-auth 47 47 // JWT signing (CompatMode). 48 48 type wsSignJWTRequest struct { 49 - Type string `json:"type"` // always "sign_jwt_request" 49 + Type string `json:"type"` // always "sign_jwt_request" 50 50 RequestID string `json:"requestId"` 51 51 JWTPayload string `json:"jwtPayload"` // base64url-encoded header.payload 52 52 Aud string `json:"aud"` // the audience (service DID)
+1 -1
server/handle_sync_get_latest_commit.go
··· 3 3 import ( 4 4 "net/http" 5 5 6 - "pkg.rbrt.fr/vow/internal/helpers" 7 6 "github.com/ipfs/go-cid" 7 + "pkg.rbrt.fr/vow/internal/helpers" 8 8 ) 9 9 10 10 type ComAtprotoSyncGetLatestCommitResponse struct {
+2 -2
server/handle_sync_get_record.go
··· 5 5 "net/http" 6 6 7 7 "github.com/bluesky-social/indigo/carstore" 8 - "pkg.rbrt.fr/vow/internal/helpers" 9 - "pkg.rbrt.fr/vow/models" 10 8 "github.com/ipfs/go-cid" 11 9 cbor "github.com/ipfs/go-ipld-cbor" 12 10 "github.com/ipld/go-car" 11 + "pkg.rbrt.fr/vow/internal/helpers" 12 + "pkg.rbrt.fr/vow/models" 13 13 ) 14 14 15 15 func (s *Server) handleSyncGetRecord(w http.ResponseWriter, r *http.Request) {
+3 -2
server/middleware.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "crypto/subtle" 5 6 "errors" 6 7 "fmt" 7 8 "net/http" ··· 45 46 func (s *Server) handleAdminMiddleware(next http.Handler) http.Handler { 46 47 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 47 48 username, password, ok := r.BasicAuth() 48 - if !ok || username != "admin" || password != s.config.AdminPassword { 49 + if !ok || username != "admin" || subtle.ConstantTimeCompare([]byte(password), []byte(s.config.AdminPassword)) != 1 { 49 50 helpers.InputError(w, new("Unauthorized")) 50 51 return 51 52 } ··· 276 277 277 278 exp, ok := claims["exp"].(float64) 278 279 if !ok { 279 - logger.Error("error getting iat from token") 280 + logger.Error("error getting exp from token") 280 281 helpers.ServerError(w, nil) 281 282 return 282 283 }
+3 -1
server/persist.go
··· 4 4 "bytes" 5 5 "context" 6 6 "fmt" 7 + "log/slog" 7 8 "sync" 8 9 "time" 9 10 ··· 173 174 } 174 175 175 176 func (p *DbPersister) cleanupRoutine() { 177 + logger := slog.Default().With("component", "event-cleanup") 176 178 ticker := time.NewTicker(time.Hour) 177 179 defer ticker.Stop() 178 180 179 181 for range ticker.C { 180 182 cutoff := time.Now().Add(-p.Retention) 181 183 if err := p.Db.Where("created_at < ?", cutoff).Delete(&models.EventRecord{}).Error; err != nil { 182 - continue 184 + logger.Error("failed to cleanup old events", "error", err) 183 185 } 184 186 } 185 187 }
+11 -7
server/server.go
··· 46 46 ) 47 47 48 48 const ( 49 - AccountSessionMaxAge = 30 * 24 * time.Hour // one week 49 + AccountSessionMaxAge = 30 * 24 * time.Hour 50 50 ) 51 51 52 52 // IPFSConfig holds configuration for the IPFS node that the PDS runs ··· 264 264 } 265 265 266 266 if args.SessionSecret == "" { 267 - panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ") 267 + return nil, fmt.Errorf("session secret is required") 268 268 } 269 269 270 270 r := chi.NewRouter() ··· 316 316 httpd := &http.Server{ 317 317 Addr: args.Addr, 318 318 Handler: r, 319 - // shitty defaults but okay for now, needed for import repo 319 + // Extended timeouts to accommodate repo imports and large blob uploads. 320 320 ReadTimeout: 5 * time.Minute, 321 321 WriteTimeout: 5 * time.Minute, 322 322 IdleTimeout: 5 * time.Minute, ··· 328 328 if err != nil { 329 329 return nil, fmt.Errorf("failed to open sqlite database: %w", err) 330 330 } 331 - gdb.Exec("PRAGMA journal_mode=WAL") 332 - gdb.Exec("PRAGMA synchronous=NORMAL") 331 + if err := gdb.Exec("PRAGMA journal_mode=WAL").Error; err != nil { 332 + return nil, fmt.Errorf("failed to set journal_mode=WAL: %w", err) 333 + } 334 + if err := gdb.Exec("PRAGMA synchronous=NORMAL").Error; err != nil { 335 + return nil, fmt.Errorf("failed to set synchronous=NORMAL: %w", err) 336 + } 333 337 logger.Info("connected to SQLite database", "path", args.DbName) 334 338 dbw := db.NewDB(gdb) 335 339 ··· 459 463 460 464 s.loadTemplates() 461 465 462 - s.repoman = NewRepoMan(s) // TODO: this is way too lazy, stop it 466 + s.repoman = NewRepoMan(s) 463 467 464 468 // TODO: should validate these args 465 469 if args.SmtpUser == "" || args.SmtpPass == "" || args.SmtpHost == "" || args.SmtpPort == "" || args.SmtpEmail == "" || args.SmtpName == "" { ··· 657 661 658 662 <-ctx.Done() 659 663 660 - fmt.Println("shut down") 664 + s.logger.Info("shut down") 661 665 662 666 return nil 663 667 }
+23 -4
server/service_auth.go
··· 24 24 if err != nil { 25 25 return err 26 26 } 27 - return key.(atcrypto.PublicKey).HashAndVerifyLenient([]byte(signingString), signatureBytes) 27 + k, ok := key.(atcrypto.PublicKey) 28 + if !ok { 29 + return fmt.Errorf("invalid key type for ES256K verification") 30 + } 31 + return k.HashAndVerifyLenient([]byte(signingString), signatureBytes) 28 32 } 29 33 30 34 func (m *ES256KSigningMethod) Sign(signingString string, key any) (string, error) { ··· 42 46 token := strings.TrimSpace(rawToken) 43 47 44 48 parsedToken, err := jwt.ParseWithClaims(token, jwt.MapClaims{}, func(token *jwt.Token) (any, error) { 45 - did := syntax.DID(token.Claims.(jwt.MapClaims)["iss"].(string)) 49 + claims, ok := token.Claims.(jwt.MapClaims) 50 + if !ok { 51 + return nil, fmt.Errorf("invalid token claims") 52 + } 53 + iss, ok := claims["iss"].(string) 54 + if !ok { 55 + return nil, fmt.Errorf("missing iss claim in token") 56 + } 57 + did := syntax.DID(iss) 46 58 didDoc, err := s.passport.FetchDoc(ctx, did.String()) 47 59 if err != nil { 48 60 return nil, fmt.Errorf("unable to resolve did %s: %s", did, err) ··· 93 105 return "", fmt.Errorf("invalid token: %s", err) 94 106 } 95 107 96 - claims := parsedToken.Claims.(jwt.MapClaims) 108 + claims, ok := parsedToken.Claims.(jwt.MapClaims) 109 + if !ok { 110 + return "", fmt.Errorf("invalid token claims") 111 + } 97 112 if claims["lxm"] != nsid { 98 113 return "", fmt.Errorf("bad jwt lexicon method (\"lxm\"). must match: %s", nsid) 99 114 } 100 - return claims["iss"].(string), nil 115 + iss, ok := claims["iss"].(string) 116 + if !ok { 117 + return "", fmt.Errorf("missing iss claim in token") 118 + } 119 + return iss, nil 101 120 }
+1 -1
server/session.go
··· 32 32 if err != nil { 33 33 return "", fmt.Errorf("marshaling payload: %w", err) 34 34 } 35 - encpayload := strings.TrimRight(base64.RawURLEncoding.EncodeToString(pj), "=") 35 + encpayload := base64.RawURLEncoding.EncodeToString(pj) 36 36 37 37 signingString := fmt.Sprintf("%s.%s", encheader, encpayload) 38 38