search and/or read your saved and liked bluesky posts
wails go svelte sqlite desktop bluesky
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 497 lines 12 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "reflect" 8 "sync" 9 "sync/atomic" 10 "time" 11 12 "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/bluesky-social/indigo/atproto/auth/oauth" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/wailsapp/wails/v2/pkg/runtime" 16) 17 18// IndexService provides indexing functionality via Wails bindings 19type IndexService struct { 20 ctx context.Context 21 indexing atomic.Bool 22 stats IndexStats 23 statsMu sync.RWMutex 24} 25 26// IndexStats tracks indexing progress 27type IndexStats struct { 28 Fetched int `json:"fetched"` 29 Inserted int `json:"inserted"` 30 Errors int `json:"errors"` 31 Total int `json:"total"` 32} 33 34// IndexResult contains the final indexing result 35type IndexResult struct { 36 Total int `json:"total"` 37 Errors int `json:"errors"` 38 Elapsed time.Duration `json:"elapsed"` 39} 40 41// PostResult carries either a Post or an error from fetching 42type PostResult struct { 43 Post *Post 44 Error error 45} 46 47// NewIndexService creates a new IndexService instance 48func NewIndexService() *IndexService { 49 return &IndexService{} 50} 51 52func (s *IndexService) setContext(ctx context.Context) { 53 s.ctx = ctx 54} 55 56// IsIndexing returns true if indexing is currently in progress 57func (s *IndexService) IsIndexing() bool { 58 return s.indexing.Load() 59} 60 61// Refresh fetches bookmarks and likes concurrently and indexes them 62func (s *IndexService) Refresh(limit int) error { 63 if !s.indexing.CompareAndSwap(false, true) { 64 return fmt.Errorf("indexing already in progress") 65 } 66 defer s.indexing.Store(false) 67 68 start := time.Now() 69 LogInfof("index refresh started: limit=%d", limit) 70 71 s.statsMu.Lock() 72 s.stats = IndexStats{} 73 s.statsMu.Unlock() 74 75 s.emitEvent("index:started", map[string]any{}) 76 77 client, err := s.createClient() 78 if err != nil { 79 s.emitEvent("index:done", IndexResult{Errors: 1, Elapsed: time.Since(start)}) 80 return err 81 } 82 83 postCh := make(chan *PostResult, 100) 84 batchSize := 10 85 86 var wg sync.WaitGroup 87 wg.Add(2) 88 89 go func() { 90 defer wg.Done() 91 client.fetchBookmarks(limit, postCh, s) 92 }() 93 94 go func() { 95 defer wg.Done() 96 client.fetchLikes(limit, postCh, s) 97 }() 98 99 go func() { 100 wg.Wait() 101 close(postCh) 102 }() 103 104 successCount, errorCount := s.batchWriter(postCh, batchSize) 105 106 result := IndexResult{ 107 Total: successCount + errorCount, 108 Errors: errorCount, 109 Elapsed: time.Since(start), 110 } 111 112 LogInfof("index refresh completed: total=%d errors=%d elapsed=%s", result.Total, result.Errors, result.Elapsed) 113 s.emitEvent("index:done", result) 114 return nil 115} 116 117// emitEvent emits a Wails event with the given name and data 118func (s *IndexService) emitEvent(name string, data any) { 119 if s.ctx != nil { 120 runtime.EventsEmit(s.ctx, name, data) 121 } 122} 123 124// updateProgress updates stats and emits progress event 125func (s *IndexService) updateProgress(fetched, inserted, errors int) { 126 s.statsMu.Lock() 127 s.stats.Fetched += fetched 128 s.stats.Inserted += inserted 129 s.stats.Errors += errors 130 stats := s.stats 131 s.statsMu.Unlock() 132 133 s.emitEvent("index:progress", stats) 134} 135 136// createClient creates an authenticated Bluesky client 137func (s *IndexService) createClient() (*BlueskyClient, error) { 138 ctx := context.Background() 139 140 auth, err := GetAuth() 141 if err != nil { 142 return nil, fmt.Errorf("failed to load auth: %w", err) 143 } 144 if auth == nil { 145 return nil, fmt.Errorf("not authenticated") 146 } 147 148 if auth.SessionID == "" { 149 return nil, fmt.Errorf("session not found") 150 } 151 152 did, err := syntax.ParseDID(auth.DID) 153 if err != nil { 154 return nil, fmt.Errorf("invalid DID: %w", err) 155 } 156 157 store := NewSQLiteOAuthStore() 158 app := newOAuthApp(store) 159 160 session, err := app.ResumeSession(ctx, did, auth.SessionID) 161 if err != nil { 162 return nil, fmt.Errorf("failed to resume session: %w", err) 163 } 164 165 return &BlueskyClient{ 166 session: session, 167 auth: auth, 168 }, nil 169} 170 171// batchWriter reads from channel and inserts posts in batches 172func (s *IndexService) batchWriter(ch <-chan *PostResult, batchSize int) (int, int) { 173 batch := make([]*Post, 0, batchSize) 174 successCount := 0 175 errorCount := 0 176 177 flushBatch := func() { 178 if len(batch) == 0 { 179 return 180 } 181 182 for _, post := range batch { 183 if err := InsertPost(post); err != nil { 184 errorCount++ 185 s.updateProgress(0, 0, 1) 186 } else { 187 successCount++ 188 s.updateProgress(0, 1, 0) 189 } 190 } 191 batch = batch[:0] 192 } 193 194 for result := range ch { 195 if result.Error != nil { 196 errorCount++ 197 s.updateProgress(0, 0, 1) 198 continue 199 } 200 201 if result.Post != nil { 202 batch = append(batch, result.Post) 203 204 if len(batch) >= batchSize { 205 flushBatch() 206 } 207 } 208 } 209 210 flushBatch() 211 return successCount, errorCount 212} 213 214// BlueskyClient wraps an authenticated OAuth session 215type BlueskyClient struct { 216 session *oauth.ClientSession 217 auth *Auth 218} 219 220// fetchBookmarks writes bookmarks to the provided channel in batches 221func (c *BlueskyClient) fetchBookmarks(maxPosts int, ch chan<- *PostResult, svc *IndexService) { 222 ctx := context.Background() 223 apiClient := c.session.APIClient() 224 var cursor string 225 seenCursors := make(map[string]struct{}) 226 batchSize := int64(100) 227 count := 0 228 229 for { 230 LogInfof("fetching bookmarks page: cursor=%q", cursor) 231 resp, err := bsky.BookmarkGetBookmarks(ctx, apiClient, cursor, batchSize) 232 if err != nil { 233 ch <- &PostResult{Error: fmt.Errorf("failed to fetch bookmarks: %w", err)} 234 return 235 } 236 nextCursor := "" 237 if resp.Cursor != nil { 238 nextCursor = *resp.Cursor 239 } 240 LogInfof("fetched bookmarks page: items=%d next_cursor=%q", len(resp.Bookmarks), nextCursor) 241 242 for _, bookmark := range resp.Bookmarks { 243 if bookmark.Item == nil { 244 continue 245 } 246 247 if bookmark.Item.FeedDefs_PostView != nil { 248 svc.updateProgress(1, 0, 0) 249 pv := bookmark.Item.FeedDefs_PostView 250 251 exists, err := PostExists(pv.Uri) 252 if err != nil { 253 continue 254 } 255 if exists { 256 continue 257 } 258 259 post := c.convertPostView(pv, "saved") 260 if post != nil { 261 ch <- &PostResult{Post: post} 262 count++ 263 264 if maxPosts > 0 && count >= maxPosts { 265 return 266 } 267 } 268 } 269 } 270 271 if resp.Cursor == nil || *resp.Cursor == "" { 272 LogInfof("bookmark fetch complete: processed=%d", count) 273 break 274 } 275 276 nextCursor = *resp.Cursor 277 if nextCursor == cursor { 278 LogWarnf("stopping bookmark pagination because cursor repeated: %s", nextCursor) 279 break 280 } 281 if _, seen := seenCursors[nextCursor]; seen { 282 LogWarnf("stopping bookmark pagination because cursor loop detected: %s", nextCursor) 283 break 284 } 285 seenCursors[nextCursor] = struct{}{} 286 cursor = nextCursor 287 } 288} 289 290// fetchLikes writes likes to the provided channel in batches 291func (c *BlueskyClient) fetchLikes(maxPosts int, ch chan<- *PostResult, svc *IndexService) { 292 ctx := context.Background() 293 apiClient := c.session.APIClient() 294 var cursor string 295 seenCursors := make(map[string]struct{}) 296 batchSize := int64(100) 297 count := 0 298 299 for { 300 LogInfof("fetching likes page: cursor=%q", cursor) 301 resp, err := bsky.FeedGetActorLikes(ctx, apiClient, c.auth.DID, cursor, batchSize) 302 if err != nil { 303 ch <- &PostResult{Error: fmt.Errorf("failed to fetch likes: %w", err)} 304 return 305 } 306 nextCursor := "" 307 if resp.Cursor != nil { 308 nextCursor = *resp.Cursor 309 } 310 LogInfof("fetched likes page: items=%d next_cursor=%q", len(resp.Feed), nextCursor) 311 312 for _, feedView := range resp.Feed { 313 if feedView.Post != nil { 314 svc.updateProgress(1, 0, 0) 315 pv := feedView.Post 316 317 exists, err := PostExists(pv.Uri) 318 if err != nil { 319 continue 320 } 321 if exists { 322 continue 323 } 324 325 post := c.convertPostView(pv, "liked") 326 if post != nil { 327 ch <- &PostResult{Post: post} 328 count++ 329 330 if maxPosts > 0 && count >= maxPosts { 331 return 332 } 333 } 334 } 335 } 336 337 if resp.Cursor == nil || *resp.Cursor == "" { 338 LogInfof("likes fetch complete: processed=%d", count) 339 break 340 } 341 342 nextCursor = *resp.Cursor 343 if nextCursor == cursor { 344 LogWarnf("stopping likes pagination because cursor repeated: %s", nextCursor) 345 break 346 } 347 if _, seen := seenCursors[nextCursor]; seen { 348 LogWarnf("stopping likes pagination because cursor loop detected: %s", nextCursor) 349 break 350 } 351 seenCursors[nextCursor] = struct{}{} 352 cursor = nextCursor 353 } 354} 355 356// convertPostView converts a FeedDefs_PostView to our Post struct 357func (c *BlueskyClient) convertPostView(pv *bsky.FeedDefs_PostView, source string) *Post { 358 if pv == nil { 359 return nil 360 } 361 362 record, facets, err := c.parsePostRecord(pv.Record) 363 if err != nil { 364 record = &postRecord{Text: "", CreatedAt: pv.IndexedAt} 365 } 366 367 var authorDID, authorHandle string 368 if pv.Author != nil { 369 authorDID = pv.Author.Did 370 authorHandle = pv.Author.Handle 371 } 372 373 likeCount := 0 374 if pv.LikeCount != nil { 375 likeCount = int(*pv.LikeCount) 376 } 377 378 repostCount := 0 379 if pv.RepostCount != nil { 380 repostCount = int(*pv.RepostCount) 381 } 382 383 replyCount := 0 384 if pv.ReplyCount != nil { 385 replyCount = int(*pv.ReplyCount) 386 } 387 388 createdAt, err := syntax.ParseDatetimeLenient(record.CreatedAt) 389 if err != nil { 390 createdAt, _ = syntax.ParseDatetimeLenient(pv.IndexedAt) 391 } 392 393 return &Post{ 394 URI: pv.Uri, 395 CID: pv.Cid, 396 AuthorDID: authorDID, 397 AuthorHandle: authorHandle, 398 Text: record.Text, 399 CreatedAt: createdAt.Time(), 400 LikeCount: likeCount, 401 RepostCount: repostCount, 402 ReplyCount: replyCount, 403 Source: source, 404 Facets: facets, 405 } 406} 407 408// postRecord represents the expected structure of a post record 409type postRecord struct { 410 Text string `json:"text"` 411 CreatedAt string `json:"createdAt"` 412} 413 414// parsePostRecord extracts post data and facets from the LexiconTypeDecoder 415func (c *BlueskyClient) parsePostRecord(decoder any) (*postRecord, string, error) { 416 if decoder == nil { 417 return &postRecord{Text: "", CreatedAt: ""}, "", nil 418 } 419 420 type lexDecoder struct{ Val any } 421 422 d, ok := decoder.(*lexDecoder) 423 if !ok { 424 switch v := decoder.(type) { 425 case *bsky.FeedPost: 426 facets := c.extractFacets(v) 427 return &postRecord{ 428 Text: v.Text, 429 CreatedAt: v.CreatedAt, 430 }, facets, nil 431 case bsky.FeedPost: 432 facets := c.extractFacets(&v) 433 return &postRecord{ 434 Text: v.Text, 435 CreatedAt: v.CreatedAt, 436 }, facets, nil 437 default: 438 return c.parsePostRecordWithReflection(decoder) 439 } 440 } 441 442 if d.Val == nil { 443 return &postRecord{Text: "", CreatedAt: ""}, "", nil 444 } 445 446 if feedPost, ok := d.Val.(*bsky.FeedPost); ok { 447 facets := c.extractFacets(feedPost) 448 return &postRecord{ 449 Text: feedPost.Text, 450 CreatedAt: feedPost.CreatedAt, 451 }, facets, nil 452 } 453 454 return &postRecord{Text: "", CreatedAt: ""}, "", fmt.Errorf("unknown record type: %T", d.Val) 455} 456 457// extractFacets extracts and serializes facets from a FeedPost 458func (c *BlueskyClient) extractFacets(feedPost *bsky.FeedPost) string { 459 if feedPost == nil || len(feedPost.Facets) == 0 { 460 return "" 461 } 462 463 facetsJSON, err := json.Marshal(feedPost.Facets) 464 if err != nil { 465 return "" 466 } 467 468 return string(facetsJSON) 469} 470 471// parsePostRecordWithReflection uses reflection to access the Val field 472func (c *BlueskyClient) parsePostRecordWithReflection(decoder any) (*postRecord, string, error) { 473 val := reflect.ValueOf(decoder) 474 if val.Kind() == reflect.Pointer { 475 val = val.Elem() 476 } 477 478 valField := val.FieldByName("Val") 479 if !valField.IsValid() { 480 return &postRecord{Text: "", CreatedAt: ""}, "", fmt.Errorf("no Val field found") 481 } 482 483 actualVal := valField.Interface() 484 if actualVal == nil { 485 return &postRecord{Text: "", CreatedAt: ""}, "", nil 486 } 487 488 if feedPost, ok := actualVal.(*bsky.FeedPost); ok { 489 facets := c.extractFacets(feedPost) 490 return &postRecord{ 491 Text: feedPost.Text, 492 CreatedAt: feedPost.CreatedAt, 493 }, facets, nil 494 } 495 496 return &postRecord{Text: "", CreatedAt: ""}, "", fmt.Errorf("unknown record type in Val: %T", actualVal) 497}