this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add tarfile store

+698 -14
+5 -2
cmd/butterfly/main.go
··· 16 16 var ( 17 17 carFile = flag.String("car", "", "Path to CAR file to read") 18 18 did = flag.String("did", "", "DID to fetch (required)") 19 - outputMode = flag.String("output", "stats", "Output mode: stats or passthrough") 19 + outputMode = flag.String("output", "stats", "Output mode: stats, passthrough, or tarfiles") 20 + outputDir = flag.String("output-dir", "./output", "Output directory for tarfiles mode") 20 21 help = flag.Bool("help", false, "Show help") 21 22 ) 22 23 flag.Parse() 23 24 24 25 if *help || *carFile == "" || *did == "" { 25 - fmt.Fprintf(os.Stderr, "Usage: butterfly -car <path> -did <did> [-output stats|passthrough]\n") 26 + fmt.Fprintf(os.Stderr, "Usage: butterfly -car <path> -did <did> [-output stats|passthrough|tarfiles] [-output-dir <dir>]\n") 26 27 flag.PrintDefaults() 27 28 os.Exit(1) 28 29 } ··· 40 41 s = &store.StdoutStore{Mode: store.StdoutStoreModePassthrough} 41 42 case "stats": 42 43 s = &store.StdoutStore{Mode: store.StdoutStoreModeStats} 44 + case "tarfiles": 45 + s = store.NewTarfilesStore(*outputDir) 43 46 default: 44 47 logger.Fatalf("unknown output mode: %s", *outputMode) 45 48 }
+295 -12
cmd/butterfly/store/tarfiles.go
··· 1 - /* 2 - Write-to-tarfiles storage interface 3 - */ 4 - 1 + // Package store provides a tar file implementation of the Store interface 5 2 package store 6 3 7 4 import ( 5 + "archive/tar" 6 + "bytes" 7 + "context" 8 + "encoding/json" 8 9 "fmt" 10 + "io" 11 + "os" 12 + "path/filepath" 13 + "strings" 14 + "sync" 15 + "time" 9 16 10 17 "github.com/bluesky-social/indigo/cmd/butterfly/remote" 11 18 ) 12 19 20 + // TarfilesStore implements Store by writing repository data to tar files 13 21 type TarfilesStore struct { 14 22 // The directory to store the .tar files 15 23 // Each repository is stored as a single .tar file 16 24 // The contents of the .tar file is a collection of json files 17 - // The directory structure is based on the cllections 18 - dirpath string 25 + // The directory structure is based on the collections 26 + Dirpath string 27 + 28 + // Internal state 29 + mu sync.Mutex 30 + writers map[string]*tarWriter 31 + tempDir string 32 + } 33 + 34 + // tarWriter manages writing to a single tar file 35 + type tarWriter struct { 36 + file *os.File 37 + writer *tar.Writer 38 + entries map[string]bool // Track existing entries 39 + tempFile string 40 + finalFile string 41 + } 42 + 43 + // NewTarfilesStore creates a new TarfilesStore 44 + func NewTarfilesStore(dirpath string) *TarfilesStore { 45 + return &TarfilesStore{ 46 + Dirpath: dirpath, 47 + writers: make(map[string]*tarWriter), 48 + } 49 + } 50 + 51 + // Setup creates the directory if it doesn't exist 52 + func (t *TarfilesStore) Setup(ctx context.Context) error { 53 + if err := os.MkdirAll(t.Dirpath, 0755); err != nil { 54 + return fmt.Errorf("failed to create directory %s: %w", t.Dirpath, err) 55 + } 56 + 57 + // Create temp directory for atomic writes 58 + tempDir := filepath.Join(t.Dirpath, ".tmp") 59 + if err := os.MkdirAll(tempDir, 0755); err != nil { 60 + return fmt.Errorf("failed to create temp directory: %w", err) 61 + } 62 + t.tempDir = tempDir 63 + 64 + return nil 65 + } 66 + 67 + // Close finalizes all tar files and cleans up 68 + func (t *TarfilesStore) Close() error { 69 + t.mu.Lock() 70 + defer t.mu.Unlock() 71 + 72 + var errs []error 73 + for did, tw := range t.writers { 74 + if err := t.finalizeTarWriter(tw); err != nil { 75 + errs = append(errs, fmt.Errorf("failed to finalize tar for %s: %w", did, err)) 76 + } 77 + } 78 + 79 + // Clean up temp directory 80 + if t.tempDir != "" { 81 + os.RemoveAll(t.tempDir) 82 + } 83 + 84 + if len(errs) > 0 { 85 + return fmt.Errorf("errors closing tar files: %v", errs) 86 + } 87 + return nil 88 + } 89 + 90 + // Receive processes events from the stream 91 + func (t *TarfilesStore) Receive(ctx context.Context, stream *remote.RemoteStream) error { 92 + for event := range stream.Ch { 93 + select { 94 + case <-ctx.Done(): 95 + return ctx.Err() 96 + default: 97 + } 98 + 99 + if event.Kind != remote.EventKindCommit || event.Commit == nil { 100 + continue 101 + } 102 + 103 + if err := t.processCommit(ctx, event.Did, event.Commit); err != nil { 104 + // Log error but continue processing 105 + fmt.Fprintf(os.Stderr, "tarfiles: error processing commit for %s: %v\n", event.Did, err) 106 + } 107 + } 108 + return nil 109 + } 110 + 111 + // processCommit handles a single commit event 112 + func (t *TarfilesStore) processCommit(ctx context.Context, did string, commit *remote.StreamEventCommit) error { 113 + t.mu.Lock() 114 + tw, err := t.getTarWriter(did) 115 + t.mu.Unlock() 116 + if err != nil { 117 + return fmt.Errorf("failed to get tar writer: %w", err) 118 + } 119 + 120 + entryPath := fmt.Sprintf("%s/%s.json", commit.Collection, commit.Rkey) 121 + 122 + switch commit.Operation { 123 + case remote.OpDelete: 124 + // Write a JSON object with _deleted: true 125 + deletedRecord := map[string]any{"_deleted": true} 126 + data, err := json.MarshalIndent(deletedRecord, "", " ") 127 + if err != nil { 128 + return fmt.Errorf("failed to marshal deleted record: %w", err) 129 + } 130 + return t.writeTarEntry(tw, entryPath, data) 131 + 132 + case remote.OpCreate, remote.OpUpdate: 133 + // Marshal record to JSON 134 + data, err := json.MarshalIndent(commit.Record, "", " ") 135 + if err != nil { 136 + return fmt.Errorf("failed to marshal record: %w", err) 137 + } 138 + return t.writeTarEntry(tw, entryPath, data) 139 + 140 + default: 141 + return fmt.Errorf("unknown operation: %s", commit.Operation) 142 + } 143 + } 144 + 145 + // getTarWriter gets or creates a tar writer for a DID 146 + func (t *TarfilesStore) getTarWriter(did string) (*tarWriter, error) { 147 + if tw, exists := t.writers[did]; exists { 148 + return tw, nil 149 + } 150 + 151 + // Sanitize DID for filename 152 + filename := strings.ReplaceAll(did, ":", "_") 153 + finalPath := filepath.Join(t.Dirpath, filename+".tar") 154 + tempPath := filepath.Join(t.tempDir, filename+".tar.tmp") 155 + 156 + // Check if tar file already exists and load entries 157 + entries := make(map[string]bool) 158 + if _, err := os.Stat(finalPath); err == nil { 159 + if err := t.loadExistingEntries(finalPath, entries); err != nil { 160 + return nil, fmt.Errorf("failed to load existing tar: %w", err) 161 + } 162 + } 163 + 164 + // Create new temp file 165 + file, err := os.Create(tempPath) 166 + if err != nil { 167 + return nil, fmt.Errorf("failed to create tar file: %w", err) 168 + } 169 + 170 + // Create tar writer first 171 + newTarWriter := tar.NewWriter(file) 172 + 173 + // If we had existing entries, copy them to the new tar 174 + if len(entries) > 0 { 175 + if err := t.copyExistingTarEntries(finalPath, newTarWriter, entries); err != nil { 176 + newTarWriter.Close() 177 + file.Close() 178 + os.Remove(tempPath) 179 + return nil, fmt.Errorf("failed to copy existing tar: %w", err) 180 + } 181 + } 182 + 183 + tw := &tarWriter{ 184 + file: file, 185 + writer: newTarWriter, 186 + entries: entries, 187 + tempFile: tempPath, 188 + finalFile: finalPath, 189 + } 190 + 191 + t.writers[did] = tw 192 + return tw, nil 193 + } 194 + 195 + // loadExistingEntries reads the list of entries from an existing tar file 196 + func (t *TarfilesStore) loadExistingEntries(path string, entries map[string]bool) error { 197 + file, err := os.Open(path) 198 + if err != nil { 199 + return err 200 + } 201 + defer file.Close() 202 + 203 + reader := tar.NewReader(file) 204 + for { 205 + header, err := reader.Next() 206 + if err == io.EOF { 207 + break 208 + } 209 + if err != nil { 210 + return err 211 + } 212 + entries[header.Name] = true 213 + } 214 + return nil 215 + } 216 + 217 + // copyExistingTarEntries copies entries from an existing tar file to a tar writer 218 + func (t *TarfilesStore) copyExistingTarEntries(srcPath string, writer *tar.Writer, entries map[string]bool) error { 219 + src, err := os.Open(srcPath) 220 + if err != nil { 221 + return err 222 + } 223 + defer src.Close() 224 + 225 + reader := tar.NewReader(src) 226 + 227 + for { 228 + header, err := reader.Next() 229 + if err == io.EOF { 230 + break 231 + } 232 + if err != nil { 233 + return err 234 + } 235 + 236 + // Copy the entry 237 + if err := writer.WriteHeader(header); err != nil { 238 + return err 239 + } 240 + if _, err := io.Copy(writer, reader); err != nil { 241 + return err 242 + } 243 + } 244 + 245 + return nil 19 246 } 20 247 21 - func (self *TarfilesStore) Setup() error { 22 - return fmt.Errorf("Not yet implemented") 248 + // writeTarEntry writes a single entry to the tar file 249 + func (t *TarfilesStore) writeTarEntry(tw *tarWriter, path string, data []byte) error { 250 + header := &tar.Header{ 251 + Name: path, 252 + Mode: 0644, 253 + Size: int64(len(data)), 254 + ModTime: time.Now(), 255 + } 256 + 257 + if err := tw.writer.WriteHeader(header); err != nil { 258 + return fmt.Errorf("failed to write tar header: %w", err) 259 + } 260 + 261 + if _, err := tw.writer.Write(data); err != nil { 262 + return fmt.Errorf("failed to write tar data: %w", err) 263 + } 264 + 265 + tw.entries[path] = true 266 + return nil 23 267 } 24 268 25 - func (self *TarfilesStore) Close() error { 26 - return fmt.Errorf("Not yet implemented") 269 + // finalizeTarWriter closes a tar writer and moves the temp file to final location 270 + func (t *TarfilesStore) finalizeTarWriter(tw *tarWriter) error { 271 + if err := tw.writer.Close(); err != nil { 272 + return fmt.Errorf("failed to close tar writer: %w", err) 273 + } 274 + if err := tw.file.Close(); err != nil { 275 + return fmt.Errorf("failed to close file: %w", err) 276 + } 277 + 278 + // Atomically move temp file to final location 279 + if err := os.Rename(tw.tempFile, tw.finalFile); err != nil { 280 + return fmt.Errorf("failed to move tar file: %w", err) 281 + } 282 + 283 + return nil 27 284 } 28 285 29 - func (self *TarfilesStore) Receive(s *remote.RemoteStream) error { 30 - return fmt.Errorf("Not yet implemented") 286 + // ReadTarFile reads a tar file and returns its contents (for debugging/testing) 287 + func ReadTarFile(path string) (map[string][]byte, error) { 288 + file, err := os.Open(path) 289 + if err != nil { 290 + return nil, err 291 + } 292 + defer file.Close() 293 + 294 + contents := make(map[string][]byte) 295 + reader := tar.NewReader(file) 296 + 297 + for { 298 + header, err := reader.Next() 299 + if err == io.EOF { 300 + break 301 + } 302 + if err != nil { 303 + return nil, err 304 + } 305 + 306 + buf := new(bytes.Buffer) 307 + if _, err := io.Copy(buf, reader); err != nil { 308 + return nil, err 309 + } 310 + contents[header.Name] = buf.Bytes() 311 + } 312 + 313 + return contents, nil 31 314 }
+398
cmd/butterfly/store/tarfiles_test.go
··· 1 + package store 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "path/filepath" 7 + "strings" 8 + "testing" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/cmd/butterfly/remote" 12 + "github.com/stretchr/testify/assert" 13 + "github.com/stretchr/testify/require" 14 + ) 15 + 16 + func TestTarfilesStore_Setup(t *testing.T) { 17 + tmpDir := t.TempDir() 18 + store := NewTarfilesStore(tmpDir) 19 + 20 + ctx := context.Background() 21 + err := store.Setup(ctx) 22 + require.NoError(t, err) 23 + 24 + // Check that directories were created 25 + assert.DirExists(t, tmpDir) 26 + assert.DirExists(t, filepath.Join(tmpDir, ".tmp")) 27 + } 28 + 29 + func TestTarfilesStore_BasicOperations(t *testing.T) { 30 + tmpDir := t.TempDir() 31 + store := NewTarfilesStore(tmpDir) 32 + 33 + ctx := context.Background() 34 + err := store.Setup(ctx) 35 + require.NoError(t, err) 36 + 37 + // Create a test stream 38 + stream := &remote.RemoteStream{ 39 + Ch: make(chan remote.StreamEvent, 10), 40 + } 41 + 42 + testDID := "did:plc:testuser123" 43 + 44 + // Send some test events 45 + go func() { 46 + defer close(stream.Ch) 47 + 48 + // Create a post 49 + stream.Ch <- remote.StreamEvent{ 50 + Did: testDID, 51 + Timestamp: time.Now(), 52 + Kind: remote.EventKindCommit, 53 + Commit: &remote.StreamEventCommit{ 54 + Rev: "rev123", 55 + Operation: remote.OpCreate, 56 + Collection: "app.bsky.feed.post", 57 + Rkey: "3jui7kd54zh2y", 58 + Record: map[string]any{ 59 + "text": "Hello, world!", 60 + "createdAt": time.Now().Format(time.RFC3339), 61 + }, 62 + Cid: "bafyreigvcqpnqk3dqg", 63 + }, 64 + } 65 + 66 + // Create a follow 67 + stream.Ch <- remote.StreamEvent{ 68 + Did: testDID, 69 + Timestamp: time.Now(), 70 + Kind: remote.EventKindCommit, 71 + Commit: &remote.StreamEventCommit{ 72 + Rev: "rev124", 73 + Operation: remote.OpCreate, 74 + Collection: "app.bsky.graph.follow", 75 + Rkey: "3jui7kd54zh3z", 76 + Record: map[string]any{ 77 + "subject": "did:plc:alice", 78 + "createdAt": time.Now().Format(time.RFC3339), 79 + }, 80 + Cid: "bafyreigvcqpnqk3dqh", 81 + }, 82 + } 83 + 84 + // Update the post 85 + stream.Ch <- remote.StreamEvent{ 86 + Did: testDID, 87 + Timestamp: time.Now(), 88 + Kind: remote.EventKindCommit, 89 + Commit: &remote.StreamEventCommit{ 90 + Rev: "rev125", 91 + Operation: remote.OpUpdate, 92 + Collection: "app.bsky.feed.post", 93 + Rkey: "3jui7kd54zh2y", 94 + Record: map[string]any{ 95 + "text": "Hello, world! (edited)", 96 + "createdAt": time.Now().Format(time.RFC3339), 97 + }, 98 + Cid: "bafyreigvcqpnqk3dqi", 99 + }, 100 + } 101 + 102 + // Delete the follow 103 + stream.Ch <- remote.StreamEvent{ 104 + Did: testDID, 105 + Timestamp: time.Now(), 106 + Kind: remote.EventKindCommit, 107 + Commit: &remote.StreamEventCommit{ 108 + Rev: "rev126", 109 + Operation: remote.OpDelete, 110 + Collection: "app.bsky.graph.follow", 111 + Rkey: "3jui7kd54zh3z", 112 + }, 113 + } 114 + }() 115 + 116 + // Process the stream 117 + err = store.Receive(ctx, stream) 118 + require.NoError(t, err) 119 + 120 + // Close the store to finalize tar files 121 + err = store.Close() 122 + require.NoError(t, err) 123 + 124 + // Verify the tar file was created 125 + expectedFile := filepath.Join(tmpDir, "did_plc_testuser123.tar") 126 + assert.FileExists(t, expectedFile) 127 + 128 + // Read and verify the tar contents 129 + contents, err := ReadTarFile(expectedFile) 130 + require.NoError(t, err) 131 + 132 + // Check that the post exists and was updated 133 + postData, exists := contents["app.bsky.feed.post/3jui7kd54zh2y.json"] 134 + assert.True(t, exists) 135 + 136 + var post map[string]any 137 + err = json.Unmarshal(postData, &post) 138 + require.NoError(t, err) 139 + assert.Equal(t, "Hello, world! (edited)", post["text"]) 140 + 141 + // Check that the follow was deleted (contains _deleted: true) 142 + followData, hasFollow := contents["app.bsky.graph.follow/3jui7kd54zh3z.json"] 143 + assert.True(t, hasFollow, "Follow record should exist with deletion marker") 144 + 145 + var follow map[string]any 146 + err = json.Unmarshal(followData, &follow) 147 + require.NoError(t, err) 148 + assert.Equal(t, true, follow["_deleted"], "Follow should be marked as deleted") 149 + } 150 + 151 + func TestTarfilesStore_MultipleRepos(t *testing.T) { 152 + tmpDir := t.TempDir() 153 + store := NewTarfilesStore(tmpDir) 154 + 155 + ctx := context.Background() 156 + err := store.Setup(ctx) 157 + require.NoError(t, err) 158 + 159 + // Create a test stream 160 + stream := &remote.RemoteStream{ 161 + Ch: make(chan remote.StreamEvent, 10), 162 + } 163 + 164 + testDIDs := []string{"did:plc:user1", "did:plc:user2", "did:plc:user3"} 165 + 166 + // Send events for multiple DIDs 167 + go func() { 168 + defer close(stream.Ch) 169 + 170 + for i, did := range testDIDs { 171 + stream.Ch <- remote.StreamEvent{ 172 + Did: did, 173 + Timestamp: time.Now(), 174 + Kind: remote.EventKindCommit, 175 + Commit: &remote.StreamEventCommit{ 176 + Rev: "rev" + string(rune(i)), 177 + Operation: remote.OpCreate, 178 + Collection: "app.bsky.actor.profile", 179 + Rkey: "self", 180 + Record: map[string]any{ 181 + "displayName": "User " + string(rune(i+1)), 182 + "createdAt": time.Now().Format(time.RFC3339), 183 + }, 184 + }, 185 + } 186 + } 187 + }() 188 + 189 + // Process the stream 190 + err = store.Receive(ctx, stream) 191 + require.NoError(t, err) 192 + 193 + // Close the store 194 + err = store.Close() 195 + require.NoError(t, err) 196 + 197 + // Verify tar files were created for each DID 198 + for _, did := range testDIDs { 199 + filename := strings.ReplaceAll(did, ":", "_") + ".tar" 200 + expectedFile := filepath.Join(tmpDir, filename) 201 + assert.FileExists(t, expectedFile) 202 + } 203 + } 204 + 205 + func TestTarfilesStore_ContextCancellation(t *testing.T) { 206 + tmpDir := t.TempDir() 207 + store := NewTarfilesStore(tmpDir) 208 + 209 + ctx, cancel := context.WithCancel(context.Background()) 210 + err := store.Setup(ctx) 211 + require.NoError(t, err) 212 + 213 + // Create a test stream 214 + stream := &remote.RemoteStream{ 215 + Ch: make(chan remote.StreamEvent, 10), 216 + } 217 + 218 + // Send events indefinitely 219 + go func() { 220 + defer close(stream.Ch) 221 + for i := 0; i < 100; i++ { 222 + stream.Ch <- remote.StreamEvent{ 223 + Did: "did:plc:testuser", 224 + Timestamp: time.Now(), 225 + Kind: remote.EventKindCommit, 226 + Commit: &remote.StreamEventCommit{ 227 + Operation: remote.OpCreate, 228 + Collection: "app.bsky.feed.post", 229 + Rkey: "post" + string(rune(i+1)), 230 + Record: map[string]any{"text": "Test post"}, 231 + }, 232 + } 233 + time.Sleep(10 * time.Millisecond) 234 + } 235 + }() 236 + 237 + // Cancel context after a short time 238 + go func() { 239 + time.Sleep(50 * time.Millisecond) 240 + cancel() 241 + }() 242 + 243 + // Process should stop when context is cancelled 244 + err = store.Receive(ctx, stream) 245 + assert.ErrorIs(t, err, context.Canceled) 246 + } 247 + 248 + func TestTarfilesStore_AppendToExisting(t *testing.T) { 249 + tmpDir := t.TempDir() 250 + testDID := "did:plc:testuser" 251 + 252 + // First run - create initial records 253 + { 254 + store := NewTarfilesStore(tmpDir) 255 + ctx := context.Background() 256 + err := store.Setup(ctx) 257 + require.NoError(t, err) 258 + 259 + stream := &remote.RemoteStream{ 260 + Ch: make(chan remote.StreamEvent, 2), 261 + } 262 + 263 + stream.Ch <- remote.StreamEvent{ 264 + Did: testDID, 265 + Timestamp: time.Now(), 266 + Kind: remote.EventKindCommit, 267 + Commit: &remote.StreamEventCommit{ 268 + Operation: remote.OpCreate, 269 + Collection: "app.bsky.feed.post", 270 + Rkey: "post1", 271 + Record: map[string]any{"text": "First post"}, 272 + }, 273 + } 274 + close(stream.Ch) 275 + 276 + err = store.Receive(ctx, stream) 277 + require.NoError(t, err) 278 + err = store.Close() 279 + require.NoError(t, err) 280 + } 281 + 282 + // Second run - append more records 283 + { 284 + store := NewTarfilesStore(tmpDir) 285 + ctx := context.Background() 286 + err := store.Setup(ctx) 287 + require.NoError(t, err) 288 + 289 + stream := &remote.RemoteStream{ 290 + Ch: make(chan remote.StreamEvent, 2), 291 + } 292 + 293 + stream.Ch <- remote.StreamEvent{ 294 + Did: testDID, 295 + Timestamp: time.Now(), 296 + Kind: remote.EventKindCommit, 297 + Commit: &remote.StreamEventCommit{ 298 + Operation: remote.OpCreate, 299 + Collection: "app.bsky.feed.post", 300 + Rkey: "post2", 301 + Record: map[string]any{"text": "Second post"}, 302 + }, 303 + } 304 + close(stream.Ch) 305 + 306 + err = store.Receive(ctx, stream) 307 + require.NoError(t, err) 308 + err = store.Close() 309 + require.NoError(t, err) 310 + } 311 + 312 + // Verify both records exist in the tar file 313 + expectedFile := filepath.Join(tmpDir, "did_plc_testuser.tar") 314 + contents, err := ReadTarFile(expectedFile) 315 + require.NoError(t, err) 316 + 317 + assert.Contains(t, contents, "app.bsky.feed.post/post1.json") 318 + assert.Contains(t, contents, "app.bsky.feed.post/post2.json") 319 + } 320 + 321 + func TestTarfilesStore_ErrorHandling(t *testing.T) { 322 + tmpDir := t.TempDir() 323 + store := NewTarfilesStore(tmpDir) 324 + 325 + ctx := context.Background() 326 + err := store.Setup(ctx) 327 + require.NoError(t, err) 328 + 329 + // Create a test stream with invalid events 330 + stream := &remote.RemoteStream{ 331 + Ch: make(chan remote.StreamEvent, 10), 332 + } 333 + 334 + go func() { 335 + defer close(stream.Ch) 336 + 337 + // Valid event 338 + stream.Ch <- remote.StreamEvent{ 339 + Did: "did:plc:testuser", 340 + Timestamp: time.Now(), 341 + Kind: remote.EventKindCommit, 342 + Commit: &remote.StreamEventCommit{ 343 + Operation: remote.OpCreate, 344 + Collection: "app.bsky.feed.post", 345 + Rkey: "valid", 346 + Record: map[string]any{"text": "Valid post"}, 347 + }, 348 + } 349 + 350 + // Event with nil commit (should be skipped) 351 + stream.Ch <- remote.StreamEvent{ 352 + Did: "did:plc:testuser", 353 + Timestamp: time.Now(), 354 + Kind: remote.EventKindCommit, 355 + Commit: nil, 356 + } 357 + 358 + // Non-commit event (should be skipped) 359 + stream.Ch <- remote.StreamEvent{ 360 + Did: "did:plc:testuser", 361 + Timestamp: time.Now(), 362 + Kind: remote.EventKindIdentity, 363 + Identity: &remote.StreamEventIdentity{ 364 + Did: "did:plc:testuser", 365 + Handle: "testuser.bsky.social", 366 + }, 367 + } 368 + 369 + // Another valid event 370 + stream.Ch <- remote.StreamEvent{ 371 + Did: "did:plc:testuser", 372 + Timestamp: time.Now(), 373 + Kind: remote.EventKindCommit, 374 + Commit: &remote.StreamEventCommit{ 375 + Operation: remote.OpCreate, 376 + Collection: "app.bsky.feed.post", 377 + Rkey: "valid2", 378 + Record: map[string]any{"text": "Another valid post"}, 379 + }, 380 + } 381 + }() 382 + 383 + // Should process without error, skipping invalid events 384 + err = store.Receive(ctx, stream) 385 + require.NoError(t, err) 386 + 387 + err = store.Close() 388 + require.NoError(t, err) 389 + 390 + // Verify only valid events were processed 391 + expectedFile := filepath.Join(tmpDir, "did_plc_testuser.tar") 392 + contents, err := ReadTarFile(expectedFile) 393 + require.NoError(t, err) 394 + 395 + assert.Len(t, contents, 2) 396 + assert.Contains(t, contents, "app.bsky.feed.post/valid.json") 397 + assert.Contains(t, contents, "app.bsky.feed.post/valid2.json") 398 + }