BYOK Personal Data Server (PDS) written in Go
ipfs vow atproto pds go
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: Add mime type to blobs and use in sync response

+21 -61
+1
models/models.go
··· 104 104 Did string `gorm:"index;index:idx_blob_did_cid"` 105 105 Cid []byte `gorm:"index;index:idx_blob_did_cid"` 106 106 RefCount int 107 + MimeType string 107 108 } 108 109 109 110 type EventRecord struct {
+13 -50
server/handle_repo_upload_blob.go
··· 2 2 3 3 import ( 4 4 "bytes" 5 + "context" 5 6 "fmt" 6 7 "io" 7 - "mime/multipart" 8 8 "net/http" 9 9 10 + "github.com/ipfs/boxo/files" 10 11 "github.com/ipfs/go-cid" 12 + caopts "github.com/ipfs/kubo/core/coreiface/options" 11 13 "pkg.rbrt.fr/vow/internal/helpers" 12 14 "pkg.rbrt.fr/vow/models" 13 15 ) ··· 64 66 } 65 67 } 66 68 67 - c, err := s.addBlobToIPFS(fulldata.Bytes(), mime) 69 + c, err := s.addBlobToIPFS(ctx, fulldata.Bytes()) 68 70 if err != nil { 69 71 logger.Error("error adding blob to ipfs", "error", err) 70 72 helpers.ServerError(w, nil) ··· 78 80 RefCount: 0, 79 81 CreatedAt: s.repoman.clock.Next().String(), 80 82 Cid: c.Bytes(), 83 + MimeType: mime, 81 84 } 82 85 83 86 if err := s.db.Create(ctx, &blob, nil).Error; err != nil { ··· 96 99 } 97 100 98 101 // addBlobToIPFS adds raw blob data to the configured IPFS node via the Kubo 99 - // HTTP RPC API (/api/v0/add) and returns the resulting CID. 100 - func (s *Server) addBlobToIPFS(data []byte, mimeType string) (cid.Cid, error) { 101 - endpoint := s.ipfsConfig.NodeURL + "/api/v0/add?cid-version=1&hash=sha2-256&pin=true&quieter=true" 102 - 103 - body := new(bytes.Buffer) 104 - writer := multipart.NewWriter(body) 102 + // RPC client and returns the resulting CID. 103 + func (s *Server) addBlobToIPFS(ctx context.Context, data []byte) (cid.Cid, error) { 104 + s.logger.Debug("adding blob to ipfs", "size", len(data)) 105 105 106 - part, err := writer.CreateFormFile("file", "blob") 106 + p, err := s.ipfsAPI.Unixfs().Add(ctx, files.NewBytesFile(data), 107 + caopts.Unixfs.CidVersion(1), 108 + ) 107 109 if err != nil { 108 - return cid.Undef, fmt.Errorf("error creating multipart field: %w", err) 109 - } 110 - 111 - if _, err := part.Write(data); err != nil { 112 - return cid.Undef, fmt.Errorf("error writing blob data to multipart: %w", err) 113 - } 114 - 115 - if err := writer.Close(); err != nil { 116 - return cid.Undef, fmt.Errorf("error closing multipart writer: %w", err) 110 + return cid.Undef, fmt.Errorf("error adding blob to ipfs: %w", err) 117 111 } 118 112 119 - req, err := http.NewRequest(http.MethodPost, endpoint, body) 120 - if err != nil { 121 - return cid.Undef, fmt.Errorf("error building ipfs add request: %w", err) 122 - } 123 - req.Header.Set("Content-Type", writer.FormDataContentType()) 124 - 125 - resp, err := s.http.Do(req) 126 - if err != nil { 127 - return cid.Undef, fmt.Errorf("error calling ipfs add: %w", err) 128 - } 129 - defer func() { _ = resp.Body.Close() }() 130 - 131 - if resp.StatusCode != http.StatusOK { 132 - msg, _ := io.ReadAll(resp.Body) 133 - return cid.Undef, fmt.Errorf("ipfs add returned status %d: %s", resp.StatusCode, string(msg)) 134 - } 135 - 136 - // Kubo with ?quieter=true returns a single JSON line: {"Hash":"<cid>","Size":"<n>"} 137 - var result struct { 138 - Hash string `json:"Hash"` 139 - } 140 - 141 - if err := readJSON(resp.Body, &result); err != nil { 142 - return cid.Undef, fmt.Errorf("error decoding ipfs add response: %w", err) 143 - } 144 - 145 - c, err := cid.Parse(result.Hash) 146 - if err != nil { 147 - return cid.Undef, fmt.Errorf("error parsing cid from ipfs add response: %w", err) 148 - } 149 - 150 - return c, nil 113 + return p.RootCid(), nil 151 114 }
+5 -4
server/handle_sync_get_blob.go
··· 7 7 8 8 "github.com/ipfs/go-cid" 9 9 "pkg.rbrt.fr/vow/internal/helpers" 10 + "pkg.rbrt.fr/vow/models" 10 11 ) 11 12 12 13 func (s *Server) handleSyncGetBlob(w http.ResponseWriter, r *http.Request) { ··· 47 48 48 49 // Verify this blob is registered to the given DID. We don't store the 49 50 // blob bytes here — just the metadata row that proves ownership. 50 - var count int64 51 - if err := s.db.Raw(ctx, "SELECT COUNT(*) FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&count).Error; err != nil { 51 + var blob models.Blob 52 + if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&blob).Error; err != nil { 52 53 logger.Error("error looking up blob", "error", err) 53 54 helpers.ServerError(w, nil) 54 55 return 55 56 } 56 - if count == 0 { 57 + if blob.Did == "" { 57 58 helpers.InputError(w, new("BlobNotFound")) 58 59 return 59 60 } ··· 94 95 } 95 96 96 97 w.Header().Set("Content-Disposition", "attachment; filename="+c.String()) 97 - w.Header().Set("Content-Type", "application/octet-stream") 98 + w.Header().Set("Content-Type", blob.MimeType) 98 99 w.WriteHeader(http.StatusOK) 99 100 if _, err := io.Copy(w, resp.Body); err != nil { 100 101 logger.Error("failed to stream blob response", "error", err)
-7
server/ipfs.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 6 - "io" 7 5 8 6 "github.com/ipfs/boxo/path" 9 7 "github.com/ipfs/go-cid" 10 8 caopts "github.com/ipfs/kubo/core/coreiface/options" 11 9 ) 12 - 13 - // readJSON decodes a single JSON value from r into dst. 14 - func readJSON(r io.Reader, dst any) error { 15 - return json.NewDecoder(r).Decode(dst) 16 - } 17 10 18 11 // unpinFromIPFS asks the local Kubo node to remove the recursive pin for the 19 12 // given CID so the content becomes eligible for garbage collection.
+2
server/repo.go
··· 350 350 // or an error occurs. Standard ATProto clients see a normal (slightly slower) 351 351 // response; the signing round-trip is invisible to them. 352 352 func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 353 + _ = swapCommit // TODO: eventually use this. 354 + 353 355 rootcid, err := cid.Cast(urepo.Root) 354 356 if err != nil { 355 357 return nil, err