this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

tidy

dholms ebc228fb b4aecdb4

+8 -26
+5 -21
nexus/backfill.go
··· 66 66 } 67 67 68 68 func (n *Nexus) backfillDid(ctx context.Context, did string) error { 69 - if err := n.db.Model(&models.Did{}). 70 - Where("did = ?", did). 71 - Updates(map[string]interface{}{ 72 - "state": models.RepoStateBackfilling, 73 - "rev": "", 74 - "error_msg": "", 75 - }).Error; err != nil { 76 - return fmt.Errorf("failed to update state to backfilling: %w", err) 77 - } 78 - 79 69 n.logger.Info("starting backfill", "did", did) 80 70 81 71 rev, err := n.doBackfill(ctx, did) ··· 108 98 } 109 99 110 100 func (n *Nexus) doBackfill(ctx context.Context, did string) (string, error) { 111 - // Resolve DID to PDS 112 101 ident, err := n.Dir.LookupDID(ctx, syntax.DID(did)) 113 102 if err != nil { 114 103 return "", fmt.Errorf("failed to resolve DID: %w", err) ··· 121 110 122 111 n.logger.Info("fetching repo from PDS", "did", did, "pds", pdsURL) 123 112 124 - // Create XRPC client 125 113 client := &xrpc.Client{ 126 114 Client: &http.Client{}, 127 115 Host: pdsURL, 128 116 } 129 117 130 - // Call com.atproto.sync.getRepo 131 118 repoBytes, err := comatproto.SyncGetRepo(ctx, client, did, "") 132 119 if err != nil { 133 120 return "", fmt.Errorf("failed to get repo: %w", err) ··· 135 122 136 123 n.logger.Info("parsing repo CAR", "did", did, "size", len(repoBytes)) 137 124 138 - // Parse the repo from CAR 139 125 r, err := repo.ReadRepoFromCar(ctx, io.NopCloser(bytes.NewReader(repoBytes))) 140 126 if err != nil { 141 127 return "", fmt.Errorf("failed to read repo from CAR: %w", err) ··· 144 130 rev := r.SignedCommit().Rev 145 131 n.logger.Info("iterating repo records", "did", did, "rev", rev) 146 132 147 - // Pre-load existing CID mappings for this DID into memory 148 133 var existingRecords []models.RepoRecord 149 134 if err := n.db.Find(&existingRecords, "did = ?", did).Error; err != nil { 150 135 return "", fmt.Errorf("failed to load existing records: %w", err) 151 136 } 152 137 153 - // Build map: "collection/rkey" -> CID 154 138 existingCids := make(map[string]string, len(existingRecords)) 155 139 for _, rec := range existingRecords { 156 140 key := rec.Collection + "/" + rec.Rkey ··· 172 156 cidStr := recCid.String() 173 157 174 158 existingCid, exists := existingCids[recPath] 159 + if exists && existingCid == cidStr { 160 + return nil 161 + } 162 + 175 163 action := "create" 176 164 if exists { 177 - if existingCid == cidStr { 178 - return nil 179 - } else { 180 - action = "update" 181 - } 165 + action = "update" 182 166 } 183 167 184 168 blk, err := r.Blockstore().Get(ctx, recCid)
-1
nexus/main.go
··· 26 26 } 27 27 }() 28 28 29 - // Start server in goroutine 30 29 go func() { 31 30 if err := nexus.Start(context.Background(), ":8080"); err != nil { 32 31 log.Printf("Server error: %v", err)
+3 -4
nexus/processor.go
··· 187 187 ep.Logger.Info("processing buffered backfill events", "did", did, "count", len(bufferedEvts)) 188 188 189 189 for _, evt := range bufferedEvts { 190 - var commit *Commit 191 - err := json.Unmarshal([]byte(evt.Data), commit) 192 - if err != nil { 190 + var commit Commit 191 + if err := json.Unmarshal([]byte(evt.Data), &commit); err != nil { 193 192 return fmt.Errorf("failed to unmarshal buffered event: %w", err) 194 193 } 195 194 ··· 200 199 } 201 200 } 202 201 203 - if err := ep.updateRepoState(commit); err != nil { 202 + if err := ep.updateRepoState(&commit); err != nil { 204 203 ep.Logger.Error("failed to update repo state", "did", commit.Did, "rev", commit.Rev, "error", err) 205 204 return err 206 205 }