this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

WIP identity resolution

+636 -149
+49 -127
cmd/butterfly/cmd_discover.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 6 5 "fmt" 7 6 "log" 8 7 "os" 9 8 9 + "github.com/bluesky-social/indigo/cmd/butterfly/identity" 10 10 "github.com/bluesky-social/indigo/cmd/butterfly/remote" 11 + "github.com/bluesky-social/indigo/cmd/butterfly/store" 11 12 "github.com/urfave/cli/v2" 12 13 ) 13 14 ··· 46 47 Value: 100, 47 48 Usage: "Maximum number of repositories to return", 48 49 }, 49 - // Output format 50 + // Common flags 50 51 &cli.StringFlag{ 51 - Name: "format", 52 - Value: "text", 53 - Usage: "Output format: text, json, or csv", 52 + Name: "store", 53 + Value: "stdout", 54 + Usage: "Storage mode: stdout, tarfiles, or duckdb", 54 55 }, 55 - // Identity resolution 56 - &cli.BoolFlag{ 57 - Name: "resolve", 58 - Usage: "Resolve DIDs to handles using identity resolution", 56 + &cli.StringFlag{ 57 + Name: "storage-dir", 58 + Value: "./output", 59 + Usage: "Output directory for tarfiles mode", 59 60 }, 60 - &cli.BoolFlag{ 61 - Name: "cache", 62 - Usage: "Enable identity resolution caching", 63 - Value: true, 61 + &cli.StringFlag{ 62 + Name: "db", 63 + Value: "./butterfly.db", 64 + Usage: "Path to DuckDB database file", 64 65 }, 65 66 }, 66 67 Action: runDiscover, ··· 77 78 collection := c.String("collection") 78 79 cursor := c.String("cursor") 79 80 limit := c.Int("limit") 80 - format := c.String("format") 81 - resolve := c.Bool("resolve") 82 - enableCache := c.Bool("cache") 81 + storeMode := c.String("store") 82 + storageDir := c.String("storage-dir") 83 + dbPath := c.String("db") 83 84 84 85 // Create remote based on input mode 85 86 var r remote.Remote ··· 101 102 // Create context 102 103 ctx := context.Background() 103 104 105 + // Create store based on storage mode 106 + var s store.Store 107 + switch storeMode { 108 + case "stdout": 109 + s = &store.StdoutStore{Mode: store.StdoutStoreModeStats} 110 + case "tarfiles": 111 + s = store.NewTarfilesStore(storageDir) 112 + case "duckdb": 113 + s = store.NewDuckdbStore(dbPath) 114 + default: 115 + return fmt.Errorf("unknown output mode: %s", storeMode) 116 + } 117 + 118 + // Initialize store 119 + if err := s.Setup(ctx); err != nil { 120 + return fmt.Errorf("failed to setup store: %w", err) 121 + } 122 + defer func() { 123 + if err := s.Close(); err != nil { 124 + logger.Printf("failed to close store: %v", err) 125 + } 126 + }() 127 + 128 + // Set up identity resolver 129 + var resolver *identity.IdentityResolver = identity.NewIdentityResolver(identity.IdentityResolverConfig{ 130 + Store: s, 131 + }) 132 + 104 133 // Prepare ListRepos parameters 105 134 params := remote.ListReposParams{ 106 135 Collection: collection, ··· 114 143 return fmt.Errorf("failed to list repos: %w", err) 115 144 } 116 145 117 - // Set up identity resolver if requested 118 - var resolver *IdentityResolver 119 - if resolve { 120 - resolver = NewIdentityResolverWithConfig(IdentityResolverConfig{ 121 - EnableCache: enableCache, 122 - }) 123 - } 124 - 125 - // Format and output results 126 - switch format { 127 - case "text": 128 - return outputDiscoverText(result, resolver, logger) 129 - case "json": 130 - return outputDiscoverJSON(result, resolver) 131 - case "csv": 132 - return outputDiscoverCSV(result, resolver) 133 - default: 134 - return fmt.Errorf("unknown output format: %s", format) 135 - } 136 - } 137 - 138 - func outputDiscoverText(result *remote.ListReposResult, resolver *IdentityResolver, logger *log.Logger) error { 146 + // Resolve all results 139 147 fmt.Printf("Found %d repositories\n\n", len(result.Dids)) 140 - 141 - for i, did := range result.Dids { 142 - fmt.Printf("%d. %s", i+1, did) 143 - 144 - // Resolve to handle if requested 145 - if resolver != nil { 146 - ctx := context.Background() 147 - ident, err := resolver.ResolveDID(ctx, did) 148 - if err != nil { 149 - logger.Printf("Failed to resolve %s: %v", did, err) 150 - fmt.Println() 151 - } else { 152 - fmt.Printf(" (%s)", ident.Handle) 153 - if pds, err := resolver.GetPDSEndpoint(ident); err == nil { 154 - fmt.Printf(" - PDS: %s", pds) 155 - } 156 - fmt.Println() 157 - } 158 - } else { 159 - fmt.Println() 160 - } 161 - } 162 - 163 - if result.Cursor != "" { 164 - fmt.Printf("\nNext cursor: %s\n", result.Cursor) 165 - } 166 - 167 - return nil 168 - } 169 - 170 - func outputDiscoverJSON(result *remote.ListReposResult, resolver *IdentityResolver) error { 171 - output := map[string]interface{}{ 172 - "count": len(result.Dids), 173 - "cursor": result.Cursor, 174 - } 175 - 176 - if resolver != nil { 177 - // Resolve DIDs and include additional info 178 - repos := make([]map[string]interface{}, 0, len(result.Dids)) 179 - ctx := context.Background() 180 - 181 - for _, did := range result.Dids { 182 - repo := map[string]interface{}{ 183 - "did": did, 184 - } 185 - 186 - if ident, err := resolver.ResolveDID(ctx, did); err == nil { 187 - repo["handle"] = ident.Handle.String() 188 - if pds, err := resolver.GetPDSEndpoint(ident); err == nil { 189 - repo["pds"] = pds 190 - } 191 - } 192 - 193 - repos = append(repos, repo) 194 - } 195 - output["repos"] = repos 196 - } else { 197 - output["dids"] = result.Dids 198 - } 199 - 200 - // Pretty print JSON 201 - encoder := json.NewEncoder(os.Stdout) 202 - encoder.SetIndent("", " ") 203 - return encoder.Encode(output) 204 - } 205 - 206 - func outputDiscoverCSV(result *remote.ListReposResult, resolver *IdentityResolver) error { 207 - // Print CSV header 208 - if resolver != nil { 209 - fmt.Println("did,handle,pds") 210 - } else { 211 - fmt.Println("did") 212 - } 213 - 214 - // Print rows 215 - ctx := context.Background() 216 148 for _, did := range result.Dids { 217 - if resolver != nil { 218 - ident, err := resolver.ResolveDID(ctx, did) 219 - if err != nil { 220 - fmt.Printf("%s,,\n", did) 221 - } else { 222 - pds := "" 223 - if endpoint, err := resolver.GetPDSEndpoint(ident); err == nil { 224 - pds = endpoint 225 - } 226 - fmt.Printf("%s,%s,%s\n", did, ident.Handle, pds) 227 - } 228 - } else { 229 - fmt.Println(did) 149 + _, err := resolver.ResolveDID(ctx, did) 150 + if err != nil { 151 + fmt.Printf("Failed to resolve %s: %v", did, err) 230 152 } 231 153 } 232 154
+11 -13
cmd/butterfly/cmd_sync.go
··· 46 46 Usage: "DID to fetch (required for carfile/pds modes)", 47 47 }, 48 48 &cli.StringFlag{ 49 - Name: "output", 50 - Value: "stats", 51 - Usage: "Output mode: stats, passthrough, tarfiles, or duckdb", 49 + Name: "store", 50 + Value: "stdout", 51 + Usage: "Storage mode: stdout, tarfiles, or duckdb", 52 52 }, 53 53 &cli.StringFlag{ 54 - Name: "output-dir", 54 + Name: "storage-dir", 55 55 Value: "./output", 56 56 Usage: "Output directory for tarfiles mode", 57 57 }, ··· 75 75 relayService := c.String("relay") 76 76 jetService := c.String("jetstream") 77 77 did := c.String("did") 78 - outputMode := c.String("output") 79 - outputDir := c.String("output-dir") 78 + storeMode := c.String("store") 79 + storageDir := c.String("storage-dir") 80 80 dbPath := c.String("db") 81 81 82 82 // Create remote based on input mode ··· 106 106 return fmt.Errorf("unknown input mode: %s", inputMode) 107 107 } 108 108 109 - // Create store based on output mode 109 + // Create store based on storage mode 110 110 var s store.Store 111 - switch outputMode { 112 - case "passthrough": 113 - s = &store.StdoutStore{Mode: store.StdoutStoreModePassthrough} 114 - case "stats": 111 + switch storeMode { 112 + case "stdout": 115 113 s = &store.StdoutStore{Mode: store.StdoutStoreModeStats} 116 114 case "tarfiles": 117 - s = store.NewTarfilesStore(outputDir) 115 + s = store.NewTarfilesStore(storageDir) 118 116 case "duckdb": 119 117 s = store.NewDuckdbStore(dbPath) 120 118 default: 121 - return fmt.Errorf("unknown output mode: %s", outputMode) 119 + return fmt.Errorf("unknown output mode: %s", storeMode) 122 120 } 123 121 124 122 // Create context
-9
cmd/butterfly/identity.go
··· 1 - // Package main provides identity resolution infrastructure for Butterfly 2 - package main 3 - 4 - // TODO: Implement identity resolution functionality 5 - // This will likely include: 6 - // - DID resolution 7 - // - Handle resolution 8 - // - Identity caching 9 - // - Integration with the atproto identity package
+172
cmd/butterfly/identity/identity.go
··· 1 + package identity 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/atproto/identity" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + "golang.org/x/time/rate" 12 + 13 + "github.com/bluesky-social/indigo/cmd/butterfly/store" 14 + ) 15 + 16 + // IdentityResolver wraps the atproto identity directory with butterfly-specific configuration 17 + type IdentityResolver struct { 18 + directory identity.Directory 19 + } 20 + 21 + // IdentityResolverConfig contains configuration options for identity resolution 22 + type IdentityResolverConfig struct { 23 + // URL for PLC directory (defaults to https://plc.directory) 24 + PLCURL string 25 + // Store for caching 26 + Store store.Store 27 + // Cache TTL for successful lookups 28 + CacheTTL time.Duration 29 + // HTTP client timeout 30 + HTTPTimeout time.Duration 31 + // User agent string 32 + UserAgent string 33 + // Rate limit for PLC requests (requests per second) 34 + PLCRateLimit float64 35 + // Enable authoritative DNS fallback 36 + TryAuthoritativeDNS bool 37 + } 38 + 39 + // NewIdentityResolver creates a new identity resolver with custom configuration 40 + func NewIdentityResolver(config IdentityResolverConfig) *IdentityResolver { 41 + // Set defaults 42 + if config.PLCURL == "" { 43 + config.PLCURL = "https://plc.directory" 44 + } 45 + if config.HTTPTimeout == 0 { 46 + config.HTTPTimeout = 30 * time.Second 47 + } 48 + if config.UserAgent == "" { 49 + config.UserAgent = "butterfly/0.0.1" 50 + } 51 + if config.PLCRateLimit == 0 { 52 + config.PLCRateLimit = 10 // 10 requests per second default 53 + } 54 + 55 + // Create base directory 56 + baseDir := &identity.BaseDirectory{ 57 + PLCURL: config.PLCURL, 58 + PLCLimiter: rate.NewLimiter(rate.Limit(config.PLCRateLimit), 1), 59 + HTTPClient: http.Client{ 60 + Timeout: config.HTTPTimeout, 61 + }, 62 + UserAgent: config.UserAgent, 63 + TryAuthoritativeDNS: config.TryAuthoritativeDNS, 64 + } 65 + 66 + // Wrap with store caching 67 + if config.CacheTTL == 0 { 68 + config.CacheTTL = 5 * time.Minute 69 + } 70 + cacheDir := NewStoreDirectory(baseDir, config.Store, config.CacheTTL, time.Minute, 5*time.Minute) 71 + 72 + return &IdentityResolver{ 73 + directory: &cacheDir, 74 + } 75 + } 76 + 77 + // ResolveHandle looks up a handle and returns the associated identity 78 + func (r *IdentityResolver) ResolveHandle(ctx context.Context, handle string) (*identity.Identity, error) { 79 + h, err := syntax.ParseHandle(handle) 80 + if err != nil { 81 + return nil, fmt.Errorf("invalid handle %q: %w", handle, err) 82 + } 83 + 84 + ident, err := r.directory.LookupHandle(ctx, h) 85 + if err != nil { 86 + return nil, fmt.Errorf("failed to resolve handle %q: %w", handle, err) 87 + } 88 + 89 + return ident, nil 90 + } 91 + 92 + // ResolveDID looks up a DID and returns the associated identity 93 + func (r *IdentityResolver) ResolveDID(ctx context.Context, did string) (*identity.Identity, error) { 94 + d, err := syntax.ParseDID(did) 95 + if err != nil { 96 + return nil, fmt.Errorf("invalid DID %q: %w", did, err) 97 + } 98 + 99 + ident, err := r.directory.LookupDID(ctx, d) 100 + if err != nil { 101 + return nil, fmt.Errorf("failed to resolve DID %q: %w", did, err) 102 + } 103 + 104 + return ident, nil 105 + } 106 + 107 + // Resolve looks up either a handle or DID and returns the associated identity 108 + func (r *IdentityResolver) Resolve(ctx context.Context, identifier string) (*identity.Identity, error) { 109 + atid, err := syntax.ParseAtIdentifier(identifier) 110 + if err != nil { 111 + return nil, fmt.Errorf("invalid identifier %q: %w", identifier, err) 112 + } 113 + 114 + ident, err := r.directory.Lookup(ctx, *atid) 115 + if err != nil { 116 + return nil, fmt.Errorf("failed to resolve identifier %q: %w", identifier, err) 117 + } 118 + 119 + return ident, nil 120 + } 121 + 122 + // GetPDSEndpoint returns the PDS endpoint for a given identity 123 + func (r *IdentityResolver) GetPDSEndpoint(ident *identity.Identity) (string, error) { 124 + if ident == nil { 125 + return "", fmt.Errorf("identity is nil") 126 + } 127 + 128 + // Look for the atproto_pds service endpoint 129 + if svc, ok := ident.Services["atproto_pds"]; ok { 130 + return svc.URL, nil 131 + } 132 + 133 + return "", fmt.Errorf("no PDS endpoint found for DID %s", ident.DID) 134 + } 135 + 136 + // GetPublicKey returns the public key for a given identity and key ID 137 + func (r *IdentityResolver) GetPublicKey(ident *identity.Identity, keyID string) (*identity.VerificationMethod, error) { 138 + if ident == nil { 139 + return nil, fmt.Errorf("identity is nil") 140 + } 141 + 142 + if key, ok := ident.Keys[keyID]; ok { 143 + return &key, nil 144 + } 145 + 146 + return nil, fmt.Errorf("key %q not found for DID %s", keyID, ident.DID) 147 + } 148 + 149 + // GetSigningKey returns the primary signing key for a given identity 150 + func (r *IdentityResolver) GetSigningKey(ident *identity.Identity) (*identity.VerificationMethod, error) { 151 + // Try the atproto signing key first 152 + if key, err := r.GetPublicKey(ident, "atproto"); err == nil { 153 + return key, nil 154 + } 155 + 156 + // Fall back to the first available key 157 + for _, key := range ident.Keys { 158 + return &key, nil 159 + } 160 + 161 + return nil, fmt.Errorf("no signing key found for DID %s", ident.DID) 162 + } 163 + 164 + // Purge removes an identifier from the cache (if caching is enabled) 165 + func (r *IdentityResolver) Purge(ctx context.Context, identifier string) error { 166 + atid, err := syntax.ParseAtIdentifier(identifier) 167 + if err != nil { 168 + return fmt.Errorf("invalid identifier %q: %w", identifier, err) 169 + } 170 + 171 + return r.directory.Purge(ctx, *atid) 172 + }
+350
cmd/butterfly/identity/store_directory.go
··· 1 + package identity 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "sync" 8 + "time" 9 + 10 + "github.com/bluesky-social/indigo/atproto/identity" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + 13 + "github.com/bluesky-social/indigo/cmd/butterfly/store" 14 + ) 15 + 16 + const identityCache = "identity-cache" 17 + const handleCache = "handle-cache" 18 + 19 + // StoreDirectory is an implementation of identity.Directory with cache of Handle and DID in the provided Store 20 + type StoreDirectory struct { 21 + Inner identity.Directory 22 + ErrTTL time.Duration 23 + InvalidHandleTTL time.Duration 24 + store store.Store 25 + didLookupChans sync.Map 26 + handleLookupChans sync.Map 27 + } 28 + 29 + type handleEntry struct { 30 + Updated time.Time 31 + DID syntax.DID 32 + Err error 33 + } 34 + 35 + type identityEntry struct { 36 + Updated time.Time 37 + Identity *identity.Identity 38 + Err error 39 + } 40 + 41 + // var _ identity.Directory = (*StoreDirectory)(nil) TODO is this needed? 42 + 43 + // Ttl of zero means unlimited duration. 44 + func NewStoreDirectory(inner identity.Directory, store store.Store, hitTTL, errTTL, invalidHandleTTL time.Duration) StoreDirectory { 45 + return StoreDirectory{ 46 + ErrTTL: errTTL, 47 + InvalidHandleTTL: invalidHandleTTL, 48 + Inner: inner, 49 + store: store, 50 + } 51 + } 52 + 53 + func (d *StoreDirectory) isHandleStale(e *handleEntry) bool { 54 + if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 55 + return true 56 + } 57 + return false 58 + } 59 + 60 + func (d *StoreDirectory) isIdentityStale(e *identityEntry) bool { 61 + if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 62 + return true 63 + } 64 + if e.Identity != nil && e.Identity.Handle.IsInvalidHandle() && time.Since(e.Updated) > d.InvalidHandleTTL { 65 + return true 66 + } 67 + return false 68 + } 69 + 70 + func (d *StoreDirectory) updateHandle(ctx context.Context, h syntax.Handle) handleEntry { 71 + ident, err := d.Inner.LookupHandle(ctx, h) 72 + if err != nil { 73 + he := handleEntry{ 74 + Updated: time.Now(), 75 + DID: "", 76 + Err: err, 77 + } 78 + putHandle(d.store, h, &he) 79 + return he 80 + } 81 + 82 + entry := identityEntry{ 83 + Updated: time.Now(), 84 + Identity: ident, 85 + Err: nil, 86 + } 87 + he := handleEntry{ 88 + Updated: time.Now(), 89 + DID: ident.DID, 90 + Err: nil, 91 + } 92 + 93 + putIdent(d.store, ident.DID, &entry) 94 + putHandle(d.store, ident.Handle, &he) 95 + return he 96 + } 97 + 98 + func (d *StoreDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { 99 + h = h.Normalize() 100 + if h.IsInvalidHandle() { 101 + return "", fmt.Errorf("can not resolve handle: %w", identity.ErrInvalidHandle) 102 + } 103 + // start := time.Now() TODO 104 + entry, err := getHandle(d.store, h) 105 + if err == nil && !d.isHandleStale(entry) { 106 + // TODO 107 + // handleCacheHits.Inc() 108 + // handleResolution.WithLabelValues("lru", "cached").Inc() 109 + // handleResolutionDuration.WithLabelValues("lru", "cached").Observe(time.Since(start).Seconds()) 110 + return entry.DID, entry.Err 111 + } 112 + // handleCacheMisses.Inc() TODO 113 + 114 + // Coalesce multiple requests for the same Handle 115 + res := make(chan struct{}) 116 + val, loaded := d.handleLookupChans.LoadOrStore(h.String(), res) 117 + if loaded { 118 + // TODO 119 + // handleRequestsCoalesced.Inc() 120 + // handleResolution.WithLabelValues("lru", "coalesced").Inc() 121 + // handleResolutionDuration.WithLabelValues("lru", "coalesced").Observe(time.Since(start).Seconds()) 122 + // Wait for the result from the pending request 123 + select { 124 + case <-val.(chan struct{}): 125 + // The result should now be in the cache 126 + entry, err := getHandle(d.store, h) 127 + if err == nil && !d.isHandleStale(entry) { 128 + return entry.DID, entry.Err 129 + } 130 + return "", fmt.Errorf("identity not found in cache after coalesce returned") 131 + case <-ctx.Done(): 132 + return "", ctx.Err() 133 + } 134 + } 135 + 136 + // Update the Handle Entry from PLC and cache the result 137 + newEntry := d.updateHandle(ctx, h) 138 + 139 + // Cleanup the coalesce map and close the results channel 140 + d.handleLookupChans.Delete(h.String()) 141 + // Callers waiting will now get the result from the cache 142 + close(res) 143 + 144 + if newEntry.Err != nil { 145 + // TODO 146 + // handleResolution.WithLabelValues("lru", "error").Inc() 147 + // handleResolutionDuration.WithLabelValues("lru", "error").Observe(time.Since(start).Seconds()) 148 + return "", newEntry.Err 149 + } 150 + if newEntry.DID != "" { 151 + // TODO 152 + // handleResolution.WithLabelValues("lru", "success").Inc() 153 + // handleResolutionDuration.WithLabelValues("lru", "success").Observe(time.Since(start).Seconds()) 154 + return newEntry.DID, nil 155 + } 156 + return "", fmt.Errorf("unexpected control-flow error") 157 + } 158 + 159 + func (d *StoreDirectory) updateDID(ctx context.Context, did syntax.DID) identityEntry { 160 + ident, err := d.Inner.LookupDID(ctx, did) 161 + // persist the identity lookup error, instead of processing it immediately 162 + entry := identityEntry{ 163 + Updated: time.Now(), 164 + Identity: ident, 165 + Err: err, 166 + } 167 + var he *handleEntry 168 + // if *not* an error, then also update the handle cache 169 + if nil == err && !ident.Handle.IsInvalidHandle() { 170 + he = &handleEntry{ 171 + Updated: time.Now(), 172 + DID: did, 173 + Err: nil, 174 + } 175 + } 176 + 177 + putIdent(d.store, did, &entry) 178 + if he != nil { 179 + putHandle(d.store, ident.Handle, he) 180 + } 181 + return entry 182 + } 183 + 184 + func (d *StoreDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identity.Identity, error) { 185 + id, _, err := d.LookupDIDWithCacheState(ctx, did) 186 + return id, err 187 + } 188 + 189 + func (d *StoreDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax.DID) (*identity.Identity, bool, error) { 190 + // start := time.Now() TODO 191 + entry, err := getIdent(d.store, did) 192 + if err == nil && !d.isIdentityStale(entry) { 193 + // TODO 194 + // identityCacheHits.Inc() 195 + // didResolution.WithLabelValues("lru", "cached").Inc() 196 + // didResolutionDuration.WithLabelValues("lru", "cached").Observe(time.Since(start).Seconds()) 197 + return entry.Identity, true, entry.Err 198 + } 199 + // identityCacheMisses.Inc() TODO 200 + 201 + // Coalesce multiple requests for the same DID 202 + res := make(chan struct{}) 203 + val, loaded := d.didLookupChans.LoadOrStore(did.String(), res) 204 + if loaded { 205 + // TODO 206 + // identityRequestsCoalesced.Inc() 207 + // didResolution.WithLabelValues("lru", "coalesced").Inc() 208 + // didResolutionDuration.WithLabelValues("lru", "coalesced").Observe(time.Since(start).Seconds()) 209 + // Wait for the result from the pending request 210 + select { 211 + case <-val.(chan struct{}): 212 + // The result should now be in the cache 213 + entry, err := getIdent(d.store, did) 214 + if err == nil && !d.isIdentityStale(entry) { 215 + return entry.Identity, false, entry.Err 216 + } 217 + return nil, false, fmt.Errorf("identity not found in cache after coalesce returned") 218 + case <-ctx.Done(): 219 + return nil, false, ctx.Err() 220 + } 221 + } 222 + 223 + // Update the Identity Entry from PLC and cache the result 224 + newEntry := d.updateDID(ctx, did) 225 + 226 + // Cleanup the coalesce map and close the results channel 227 + d.didLookupChans.Delete(did.String()) 228 + // Callers waiting will now get the result from the cache 229 + close(res) 230 + 231 + if newEntry.Err != nil { 232 + // TODO 233 + // didResolution.WithLabelValues("lru", "error").Inc() 234 + // didResolutionDuration.WithLabelValues("lru", "error").Observe(time.Since(start).Seconds()) 235 + return nil, false, newEntry.Err 236 + } 237 + if newEntry.Identity != nil { 238 + // TODO 239 + // didResolution.WithLabelValues("lru", "success").Inc() 240 + // didResolutionDuration.WithLabelValues("lru", "success").Observe(time.Since(start).Seconds()) 241 + return newEntry.Identity, false, nil 242 + } 243 + return nil, false, fmt.Errorf("unexpected control-flow error") 244 + } 245 + 246 + func (d *StoreDirectory) LookupHandle(ctx context.Context, h syntax.Handle) (*identity.Identity, error) { 247 + ident, _, err := d.LookupHandleWithCacheState(ctx, h) 248 + return ident, err 249 + } 250 + 251 + func (d *StoreDirectory) LookupHandleWithCacheState(ctx context.Context, h syntax.Handle) (*identity.Identity, bool, error) { 252 + h = h.Normalize() 253 + did, err := d.ResolveHandle(ctx, h) 254 + if err != nil { 255 + return nil, false, err 256 + } 257 + ident, hit, err := d.LookupDIDWithCacheState(ctx, did) 258 + if err != nil { 259 + return nil, hit, err 260 + } 261 + 262 + declared, err := ident.DeclaredHandle() 263 + if err != nil { 264 + return nil, hit, fmt.Errorf("could not verify handle/DID mapping: %w", err) 265 + } 266 + // NOTE: DeclaredHandle() returns a normalized handle, and we already normalized 'h' above 267 + if declared != h { 268 + return nil, hit, fmt.Errorf("%w: %s != %s", identity.ErrHandleMismatch, declared, h) 269 + } 270 + return ident, hit, nil 271 + } 272 + 273 + func (d *StoreDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*identity.Identity, error) { 274 + handle, err := a.AsHandle() 275 + if nil == err { // if not an error, is a handle 276 + return d.LookupHandle(ctx, handle) 277 + } 278 + did, err := a.AsDID() 279 + if nil == err { // if not an error, is a DID 280 + return d.LookupDID(ctx, did) 281 + } 282 + return nil, fmt.Errorf("at-identifier neither a Handle nor a DID") 283 + } 284 + 285 + func (d *StoreDirectory) Purge(ctx context.Context, atid syntax.AtIdentifier) error { 286 + handle, err := atid.AsHandle() 287 + if nil == err { // if not an error, is a handle 288 + handle = handle.Normalize() 289 + delHandle(d.store, handle) 290 + return nil 291 + } 292 + did, err := atid.AsDID() 293 + if nil == err { // if not an error, is a DID 294 + delIdent(d.store, did) 295 + return nil 296 + } 297 + return fmt.Errorf("at-identifier neither a Handle nor a DID") 298 + } 299 + 300 + func getHandle(store store.Store, handle syntax.Handle) (*handleEntry, error) { 301 + entryJSON, err := store.KvGet(handleCache, string(handle)) 302 + if entryJSON != "" { 303 + // TODO - is this parse safe? do we need to be checking the output or anything? 304 + var entry handleEntry 305 + if err := json.Unmarshal([]byte(entryJSON), &entry); err != nil { 306 + return nil, err 307 + } 308 + return &entry, nil 309 + } 310 + return nil, err 311 + } 312 + 313 + func putHandle(store store.Store, handle syntax.Handle, entry *handleEntry) error { 314 + // TODO - is this right? 315 + entryJSON, err := json.Marshal(entry) 316 + if err != nil { 317 + return nil 318 + } 319 + return store.KvPut(handleCache, string(handle), string(entryJSON)) 320 + } 321 + 322 + func delHandle(store store.Store, handle syntax.Handle) error { 323 + return store.KvDel(handleCache, string(handle)) 324 + } 325 + 326 + func getIdent(store store.Store, did syntax.DID) (*identityEntry, error) { 327 + entryJSON, err := store.KvGet(identityCache, string(did)) 328 + if entryJSON != "" { 329 + // TODO - is this parse safe? do we need to be checking the output or anything? 330 + var entry identityEntry 331 + if err := json.Unmarshal([]byte(entryJSON), &entry); err != nil { 332 + return nil, err 333 + } 334 + return &entry, nil 335 + } 336 + return nil, err 337 + } 338 + 339 + func putIdent(store store.Store, did syntax.DID, entry *identityEntry) error { 340 + // TODO - is this right? 341 + entryJSON, err := json.Marshal(entry) 342 + if err != nil { 343 + return nil 344 + } 345 + return store.KvPut(identityCache, string(did), string(entryJSON)) 346 + } 347 + 348 + func delIdent(store store.Store, did syntax.DID) error { 349 + return store.KvDel(identityCache, string(did)) 350 + }
+15
cmd/butterfly/store/duckdb.go
··· 477 477 478 478 return stats, nil 479 479 } 480 + 481 + // KvGet retrieves a value from general KV storage (not yet implemented) 482 + func (d *DuckdbStore) KvGet(namespace string, key string) (string, error) { 483 + return "", fmt.Errorf("KvGet not yet implemented for duckdb store") 484 + } 485 + 486 + // KvPut stores a value in general KV storage (not yet implemented) 487 + func (d *DuckdbStore) KvPut(namespace string, key string, value string) error { 488 + return fmt.Errorf("KvPut not yet implemented for duckdb store") 489 + } 490 + 491 + // KvDel deletes a value from general KV storage (not yet implemented) 492 + func (d *DuckdbStore) KvDel(namespace string, key string) error { 493 + return fmt.Errorf("KvDel not yet implemented for duckdb store") 494 + }
+15
cmd/butterfly/store/stdout.go
··· 108 108 } 109 109 } 110 110 } 111 + 112 + // KvGet retrieves a value from general KV storage (not yet implemented) 113 + func (s *StdoutStore) KvGet(namespace string, key string) (string, error) { 114 + return "", fmt.Errorf("KvGet not yet implemented for stdout store") 115 + } 116 + 117 + // KvPut stores a value in general KV storage (not yet implemented) 118 + func (s *StdoutStore) KvPut(namespace string, key string, value string) error { 119 + return fmt.Errorf("KvPut not yet implemented for stdout store") 120 + } 121 + 122 + // KvDel deletes a value from general KV storage (not yet implemented) 123 + func (s *StdoutStore) KvDel(namespace string, key string) error { 124 + return fmt.Errorf("KvDel not yet implemented for stdout store") 125 + }
+9
cmd/butterfly/store/store.go
··· 22 22 // ActiveSync processes live update events from a remote stream 23 23 // The implementation should handle context cancellation appropriately 24 24 ActiveSync(ctx context.Context, stream *remote.RemoteStream) error 25 + 26 + // General-purpose KV storage 27 + 28 + // Get from general kv storage 29 + KvGet(namespace string, key string) (string, error) 30 + // Put to general kv storage 31 + KvPut(namespace string, key string, value string) error 32 + // Delete from general kv storage 33 + KvDel(namespace string, key string) error 25 34 } 26 35 27 36 // StoreType identifies the type of store
+15
cmd/butterfly/store/tarfiles.go
··· 347 347 348 348 return contents, nil 349 349 } 350 + 351 + // KvGet retrieves a value from general KV storage (not yet implemented) 352 + func (t *TarfilesStore) KvGet(namespace string, key string) (string, error) { 353 + return "", fmt.Errorf("KvGet not yet implemented for tarfiles store") 354 + } 355 + 356 + // KvPut stores a value in general KV storage (not yet implemented) 357 + func (t *TarfilesStore) KvPut(namespace string, key string, value string) error { 358 + return fmt.Errorf("KvPut not yet implemented for tarfiles store") 359 + } 360 + 361 + // KvDel deletes a value from general KV storage (not yet implemented) 362 + func (t *TarfilesStore) KvDel(namespace string, key string) error { 363 + return fmt.Errorf("KvDel not yet implemented for tarfiles store") 364 + }