this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement kvstore in duckdb

+100 -37
+2 -3
cmd/butterfly/cmd_discover.go
··· 145 145 146 146 // Resolve all results 147 147 fmt.Printf("Discovered %d repositories\n", len(result.Dids)) 148 + fmt.Println("Resolving identities...") 148 149 for _, did := range result.Dids { 149 150 _, err := resolver.ResolveDID(ctx, did) 150 151 if err != nil { 151 152 fmt.Printf("Failed to resolve %s: %v", did, err) 152 153 } 153 154 } 154 - 155 - v, err := resolver.ResolveDID(ctx, "did:plc:upo6iq6ekh66d4mbhmiy6se4") 156 - fmt.Println(v) 155 + fmt.Println("Done!") 157 156 158 157 return nil 159 158 }
+29 -28
cmd/butterfly/identity/store_directory.go
··· 101 101 return "", fmt.Errorf("can not resolve handle: %w", identity.ErrInvalidHandle) 102 102 } 103 103 // start := time.Now() TODO 104 - entry, err := getHandle(d.store, h) 105 - if err == nil && !d.isHandleStale(entry) { 104 + entry, _ := getHandle(d.store, h) 105 + if entry != nil && !d.isHandleStale(entry) { 106 106 // TODO 107 107 // handleCacheHits.Inc() 108 108 // handleResolution.WithLabelValues("lru", "cached").Inc() ··· 123 123 select { 124 124 case <-val.(chan struct{}): 125 125 // The result should now be in the cache 126 - entry, err := getHandle(d.store, h) 127 - if err == nil && !d.isHandleStale(entry) { 126 + entry, _ := getHandle(d.store, h) 127 + if entry != nil && !d.isHandleStale(entry) { 128 128 return entry.DID, entry.Err 129 129 } 130 130 return "", fmt.Errorf("identity not found in cache after coalesce returned") ··· 188 188 189 189 func (d *StoreDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax.DID) (*identity.Identity, bool, error) { 190 190 // start := time.Now() TODO 191 - entry, err := getIdent(d.store, did) 192 - if err == nil && !d.isIdentityStale(entry) { 191 + entry, _ := getIdent(d.store, did) 192 + if entry != nil && !d.isIdentityStale(entry) { 193 193 // TODO 194 194 // identityCacheHits.Inc() 195 195 // didResolution.WithLabelValues("lru", "cached").Inc() ··· 210 210 select { 211 211 case <-val.(chan struct{}): 212 212 // The result should now be in the cache 213 - entry, err := getIdent(d.store, did) 214 - if err == nil && !d.isIdentityStale(entry) { 213 + entry, _ := getIdent(d.store, did) 214 + if entry != nil && !d.isIdentityStale(entry) { 215 215 return entry.Identity, false, entry.Err 216 216 } 217 217 return nil, false, fmt.Errorf("identity not found in cache after coalesce returned") ··· 299 299 300 300 func getHandle(store store.Store, handle syntax.Handle) (*handleEntry, error) { 301 301 entryJSON, err := store.KvGet(handleCache, string(handle)) 302 - if entryJSON != "" { 303 - var entry handleEntry 304 - if err := json.Unmarshal([]byte(entryJSON), &entry); err != nil { 305 - return nil, err 306 - } 307 - if err := validateHandleEntry(&entry); err != nil { 308 - return nil, err 309 - } 310 - return &entry, nil 302 + if entryJSON == "" { 303 + return nil, err 304 + } 305 + var entry handleEntry 306 + if err := json.Unmarshal([]byte(entryJSON), &entry); err != nil { 307 + return nil, err 308 + } 309 + if err := validateHandleEntry(&entry); err != nil { 310 + return nil, err 311 311 } 312 - return nil, err 312 + return &entry, nil 313 + 313 314 } 314 315 315 316 func putHandle(store store.Store, handle syntax.Handle, entry *handleEntry) error { ··· 326 327 327 328 func getIdent(store store.Store, did syntax.DID) (*identityEntry, error) { 328 329 entryJSON, err := store.KvGet(identityCache, string(did)) 329 - if entryJSON != "" { 330 - var entry identityEntry 331 - if err := json.Unmarshal([]byte(entryJSON), &entry); err != nil { 332 - return nil, err 333 - } 334 - if err := validateIdentityEntry(&entry); err != nil { 335 - return nil, err 336 - } 337 - return &entry, nil 330 + if entryJSON == "" { 331 + return nil, err 338 332 } 339 - return nil, err 333 + var entry identityEntry 334 + if err := json.Unmarshal([]byte(entryJSON), &entry); err != nil { 335 + return nil, err 336 + } 337 + if err := validateIdentityEntry(&entry); err != nil { 338 + return nil, err 339 + } 340 + return &entry, nil 340 341 } 341 342 342 343 func putIdent(store store.Store, did syntax.DID, entry *identityEntry) error {
+69 -6
cmd/butterfly/store/duckdb.go
··· 89 89 } 90 90 } 91 91 92 + // Create KV store table 93 + createKVTableSQL := ` 94 + CREATE TABLE IF NOT EXISTS kvstore ( 95 + namespace VARCHAR NOT NULL, 96 + key VARCHAR NOT NULL, 97 + value TEXT NOT NULL, 98 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 99 + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 100 + PRIMARY KEY (namespace, key) 101 + )` 102 + 103 + if _, err := d.db.ExecContext(ctx, createKVTableSQL); err != nil { 104 + return fmt.Errorf("failed to create kvstore table: %w", err) 105 + } 106 + 107 + // Create compound index for KV store 108 + if _, err := d.db.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS idx_kvstore_namespace_key ON kvstore(namespace, key)"); err != nil { 109 + return fmt.Errorf("failed to create kvstore index: %w", err) 110 + } 111 + 92 112 // Prepare statements 93 113 d.insertStmt, err = d.db.PrepareContext(ctx, ` 94 114 INSERT INTO records (did, collection, rkey, cid, rev, record, deleted, updated_at) ··· 478 498 return stats, nil 479 499 } 480 500 481 - // KvGet retrieves a value from general KV storage (not yet implemented) 501 + // KvGet retrieves a value from general KV storage 482 502 func (d *DuckdbStore) KvGet(namespace string, key string) (string, error) { 483 - return "", fmt.Errorf("KvGet not yet implemented for duckdb store") 503 + d.mu.Lock() 504 + defer d.mu.Unlock() 505 + 506 + var value string 507 + err := d.db.QueryRow( 508 + "SELECT value FROM kvstore WHERE namespace = ? AND key = ?", 509 + namespace, key, 510 + ).Scan(&value) 511 + 512 + if err == sql.ErrNoRows { 513 + return "", fmt.Errorf("key %q not found in namespace %q", key, namespace) 514 + } 515 + if err != nil { 516 + return "", fmt.Errorf("failed to query kvstore: %w", err) 517 + } 518 + 519 + return value, nil 484 520 } 485 521 486 - // KvPut stores a value in general KV storage (not yet implemented) 522 + // KvPut stores a value in general KV storage 487 523 func (d *DuckdbStore) KvPut(namespace string, key string, value string) error { 488 - return fmt.Errorf("KvPut not yet implemented for duckdb store") 524 + d.mu.Lock() 525 + defer d.mu.Unlock() 526 + 527 + _, err := d.db.Exec(` 528 + INSERT INTO kvstore (namespace, key, value, updated_at) 529 + VALUES (?, ?, ?, ?) 530 + ON CONFLICT (namespace, key) DO UPDATE SET 531 + value = excluded.value, 532 + updated_at = excluded.updated_at 533 + `, namespace, key, value, time.Now()) 534 + 535 + if err != nil { 536 + return fmt.Errorf("failed to insert/update kvstore: %w", err) 537 + } 538 + 539 + return nil 489 540 } 490 541 491 - // KvDel deletes a value from general KV storage (not yet implemented) 542 + // KvDel deletes a value from general KV storage 492 543 func (d *DuckdbStore) KvDel(namespace string, key string) error { 493 - return fmt.Errorf("KvDel not yet implemented for duckdb store") 544 + d.mu.Lock() 545 + defer d.mu.Unlock() 546 + 547 + _, err := d.db.Exec( 548 + "DELETE FROM kvstore WHERE namespace = ? AND key = ?", 549 + namespace, key, 550 + ) 551 + 552 + if err != nil { 553 + return fmt.Errorf("failed to delete from kvstore: %w", err) 554 + } 555 + 556 + return nil 494 557 }