search and/or read your saved and liked bluesky posts
wails
go
svelte
sqlite
desktop
bluesky
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "reflect"
8 "sync"
9 "sync/atomic"
10 "time"
11
12 "github.com/bluesky-social/indigo/api/bsky"
13 "github.com/bluesky-social/indigo/atproto/auth/oauth"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/wailsapp/wails/v2/pkg/runtime"
16)
17
18// IndexService provides indexing functionality via Wails bindings
19type IndexService struct {
20 ctx context.Context
21 indexing atomic.Bool
22 stats IndexStats
23 statsMu sync.RWMutex
24}
25
26// IndexStats tracks indexing progress
27type IndexStats struct {
28 Fetched int `json:"fetched"`
29 Inserted int `json:"inserted"`
30 Errors int `json:"errors"`
31 Total int `json:"total"`
32}
33
34// IndexResult contains the final indexing result
35type IndexResult struct {
36 Total int `json:"total"`
37 Errors int `json:"errors"`
38 Elapsed time.Duration `json:"elapsed"`
39}
40
41// PostResult carries either a Post or an error from fetching
42type PostResult struct {
43 Post *Post
44 Error error
45}
46
47// NewIndexService creates a new IndexService instance
48func NewIndexService() *IndexService {
49 return &IndexService{}
50}
51
52func (s *IndexService) setContext(ctx context.Context) {
53 s.ctx = ctx
54}
55
56// IsIndexing returns true if indexing is currently in progress
57func (s *IndexService) IsIndexing() bool {
58 return s.indexing.Load()
59}
60
61// Refresh fetches bookmarks and likes concurrently and indexes them
62func (s *IndexService) Refresh(limit int) error {
63 if !s.indexing.CompareAndSwap(false, true) {
64 return fmt.Errorf("indexing already in progress")
65 }
66 defer s.indexing.Store(false)
67
68 start := time.Now()
69 LogInfof("index refresh started: limit=%d", limit)
70
71 s.statsMu.Lock()
72 s.stats = IndexStats{}
73 s.statsMu.Unlock()
74
75 s.emitEvent("index:started", map[string]any{})
76
77 client, err := s.createClient()
78 if err != nil {
79 s.emitEvent("index:done", IndexResult{Errors: 1, Elapsed: time.Since(start)})
80 return err
81 }
82
83 postCh := make(chan *PostResult, 100)
84 batchSize := 10
85
86 var wg sync.WaitGroup
87 wg.Add(2)
88
89 go func() {
90 defer wg.Done()
91 client.fetchBookmarks(limit, postCh, s)
92 }()
93
94 go func() {
95 defer wg.Done()
96 client.fetchLikes(limit, postCh, s)
97 }()
98
99 go func() {
100 wg.Wait()
101 close(postCh)
102 }()
103
104 successCount, errorCount := s.batchWriter(postCh, batchSize)
105
106 result := IndexResult{
107 Total: successCount + errorCount,
108 Errors: errorCount,
109 Elapsed: time.Since(start),
110 }
111
112 LogInfof("index refresh completed: total=%d errors=%d elapsed=%s", result.Total, result.Errors, result.Elapsed)
113 s.emitEvent("index:done", result)
114 return nil
115}
116
117// emitEvent emits a Wails event with the given name and data
118func (s *IndexService) emitEvent(name string, data any) {
119 if s.ctx != nil {
120 runtime.EventsEmit(s.ctx, name, data)
121 }
122}
123
124// updateProgress updates stats and emits progress event
125func (s *IndexService) updateProgress(fetched, inserted, errors int) {
126 s.statsMu.Lock()
127 s.stats.Fetched += fetched
128 s.stats.Inserted += inserted
129 s.stats.Errors += errors
130 stats := s.stats
131 s.statsMu.Unlock()
132
133 s.emitEvent("index:progress", stats)
134}
135
136// createClient creates an authenticated Bluesky client
137func (s *IndexService) createClient() (*BlueskyClient, error) {
138 ctx := context.Background()
139
140 auth, err := GetAuth()
141 if err != nil {
142 return nil, fmt.Errorf("failed to load auth: %w", err)
143 }
144 if auth == nil {
145 return nil, fmt.Errorf("not authenticated")
146 }
147
148 if auth.SessionID == "" {
149 return nil, fmt.Errorf("session not found")
150 }
151
152 did, err := syntax.ParseDID(auth.DID)
153 if err != nil {
154 return nil, fmt.Errorf("invalid DID: %w", err)
155 }
156
157 store := NewSQLiteOAuthStore()
158 app := newOAuthApp(store)
159
160 session, err := app.ResumeSession(ctx, did, auth.SessionID)
161 if err != nil {
162 return nil, fmt.Errorf("failed to resume session: %w", err)
163 }
164
165 return &BlueskyClient{
166 session: session,
167 auth: auth,
168 }, nil
169}
170
171// batchWriter reads from channel and inserts posts in batches
172func (s *IndexService) batchWriter(ch <-chan *PostResult, batchSize int) (int, int) {
173 batch := make([]*Post, 0, batchSize)
174 successCount := 0
175 errorCount := 0
176
177 flushBatch := func() {
178 if len(batch) == 0 {
179 return
180 }
181
182 for _, post := range batch {
183 if err := InsertPost(post); err != nil {
184 errorCount++
185 s.updateProgress(0, 0, 1)
186 } else {
187 successCount++
188 s.updateProgress(0, 1, 0)
189 }
190 }
191 batch = batch[:0]
192 }
193
194 for result := range ch {
195 if result.Error != nil {
196 errorCount++
197 s.updateProgress(0, 0, 1)
198 continue
199 }
200
201 if result.Post != nil {
202 batch = append(batch, result.Post)
203
204 if len(batch) >= batchSize {
205 flushBatch()
206 }
207 }
208 }
209
210 flushBatch()
211 return successCount, errorCount
212}
213
214// BlueskyClient wraps an authenticated OAuth session
215type BlueskyClient struct {
216 session *oauth.ClientSession
217 auth *Auth
218}
219
220// fetchBookmarks writes bookmarks to the provided channel in batches
221func (c *BlueskyClient) fetchBookmarks(maxPosts int, ch chan<- *PostResult, svc *IndexService) {
222 ctx := context.Background()
223 apiClient := c.session.APIClient()
224 var cursor string
225 seenCursors := make(map[string]struct{})
226 batchSize := int64(100)
227 count := 0
228
229 for {
230 LogInfof("fetching bookmarks page: cursor=%q", cursor)
231 resp, err := bsky.BookmarkGetBookmarks(ctx, apiClient, cursor, batchSize)
232 if err != nil {
233 ch <- &PostResult{Error: fmt.Errorf("failed to fetch bookmarks: %w", err)}
234 return
235 }
236 nextCursor := ""
237 if resp.Cursor != nil {
238 nextCursor = *resp.Cursor
239 }
240 LogInfof("fetched bookmarks page: items=%d next_cursor=%q", len(resp.Bookmarks), nextCursor)
241
242 for _, bookmark := range resp.Bookmarks {
243 if bookmark.Item == nil {
244 continue
245 }
246
247 if bookmark.Item.FeedDefs_PostView != nil {
248 svc.updateProgress(1, 0, 0)
249 pv := bookmark.Item.FeedDefs_PostView
250
251 exists, err := PostExists(pv.Uri)
252 if err != nil {
253 continue
254 }
255 if exists {
256 continue
257 }
258
259 post := c.convertPostView(pv, "saved")
260 if post != nil {
261 ch <- &PostResult{Post: post}
262 count++
263
264 if maxPosts > 0 && count >= maxPosts {
265 return
266 }
267 }
268 }
269 }
270
271 if resp.Cursor == nil || *resp.Cursor == "" {
272 LogInfof("bookmark fetch complete: processed=%d", count)
273 break
274 }
275
276 nextCursor = *resp.Cursor
277 if nextCursor == cursor {
278 LogWarnf("stopping bookmark pagination because cursor repeated: %s", nextCursor)
279 break
280 }
281 if _, seen := seenCursors[nextCursor]; seen {
282 LogWarnf("stopping bookmark pagination because cursor loop detected: %s", nextCursor)
283 break
284 }
285 seenCursors[nextCursor] = struct{}{}
286 cursor = nextCursor
287 }
288}
289
290// fetchLikes writes likes to the provided channel in batches
291func (c *BlueskyClient) fetchLikes(maxPosts int, ch chan<- *PostResult, svc *IndexService) {
292 ctx := context.Background()
293 apiClient := c.session.APIClient()
294 var cursor string
295 seenCursors := make(map[string]struct{})
296 batchSize := int64(100)
297 count := 0
298
299 for {
300 LogInfof("fetching likes page: cursor=%q", cursor)
301 resp, err := bsky.FeedGetActorLikes(ctx, apiClient, c.auth.DID, cursor, batchSize)
302 if err != nil {
303 ch <- &PostResult{Error: fmt.Errorf("failed to fetch likes: %w", err)}
304 return
305 }
306 nextCursor := ""
307 if resp.Cursor != nil {
308 nextCursor = *resp.Cursor
309 }
310 LogInfof("fetched likes page: items=%d next_cursor=%q", len(resp.Feed), nextCursor)
311
312 for _, feedView := range resp.Feed {
313 if feedView.Post != nil {
314 svc.updateProgress(1, 0, 0)
315 pv := feedView.Post
316
317 exists, err := PostExists(pv.Uri)
318 if err != nil {
319 continue
320 }
321 if exists {
322 continue
323 }
324
325 post := c.convertPostView(pv, "liked")
326 if post != nil {
327 ch <- &PostResult{Post: post}
328 count++
329
330 if maxPosts > 0 && count >= maxPosts {
331 return
332 }
333 }
334 }
335 }
336
337 if resp.Cursor == nil || *resp.Cursor == "" {
338 LogInfof("likes fetch complete: processed=%d", count)
339 break
340 }
341
342 nextCursor = *resp.Cursor
343 if nextCursor == cursor {
344 LogWarnf("stopping likes pagination because cursor repeated: %s", nextCursor)
345 break
346 }
347 if _, seen := seenCursors[nextCursor]; seen {
348 LogWarnf("stopping likes pagination because cursor loop detected: %s", nextCursor)
349 break
350 }
351 seenCursors[nextCursor] = struct{}{}
352 cursor = nextCursor
353 }
354}
355
356// convertPostView converts a FeedDefs_PostView to our Post struct
357func (c *BlueskyClient) convertPostView(pv *bsky.FeedDefs_PostView, source string) *Post {
358 if pv == nil {
359 return nil
360 }
361
362 record, facets, err := c.parsePostRecord(pv.Record)
363 if err != nil {
364 record = &postRecord{Text: "", CreatedAt: pv.IndexedAt}
365 }
366
367 var authorDID, authorHandle string
368 if pv.Author != nil {
369 authorDID = pv.Author.Did
370 authorHandle = pv.Author.Handle
371 }
372
373 likeCount := 0
374 if pv.LikeCount != nil {
375 likeCount = int(*pv.LikeCount)
376 }
377
378 repostCount := 0
379 if pv.RepostCount != nil {
380 repostCount = int(*pv.RepostCount)
381 }
382
383 replyCount := 0
384 if pv.ReplyCount != nil {
385 replyCount = int(*pv.ReplyCount)
386 }
387
388 createdAt, err := syntax.ParseDatetimeLenient(record.CreatedAt)
389 if err != nil {
390 createdAt, _ = syntax.ParseDatetimeLenient(pv.IndexedAt)
391 }
392
393 return &Post{
394 URI: pv.Uri,
395 CID: pv.Cid,
396 AuthorDID: authorDID,
397 AuthorHandle: authorHandle,
398 Text: record.Text,
399 CreatedAt: createdAt.Time(),
400 LikeCount: likeCount,
401 RepostCount: repostCount,
402 ReplyCount: replyCount,
403 Source: source,
404 Facets: facets,
405 }
406}
407
408// postRecord represents the expected structure of a post record
409type postRecord struct {
410 Text string `json:"text"`
411 CreatedAt string `json:"createdAt"`
412}
413
414// parsePostRecord extracts post data and facets from the LexiconTypeDecoder
415func (c *BlueskyClient) parsePostRecord(decoder any) (*postRecord, string, error) {
416 if decoder == nil {
417 return &postRecord{Text: "", CreatedAt: ""}, "", nil
418 }
419
420 type lexDecoder struct{ Val any }
421
422 d, ok := decoder.(*lexDecoder)
423 if !ok {
424 switch v := decoder.(type) {
425 case *bsky.FeedPost:
426 facets := c.extractFacets(v)
427 return &postRecord{
428 Text: v.Text,
429 CreatedAt: v.CreatedAt,
430 }, facets, nil
431 case bsky.FeedPost:
432 facets := c.extractFacets(&v)
433 return &postRecord{
434 Text: v.Text,
435 CreatedAt: v.CreatedAt,
436 }, facets, nil
437 default:
438 return c.parsePostRecordWithReflection(decoder)
439 }
440 }
441
442 if d.Val == nil {
443 return &postRecord{Text: "", CreatedAt: ""}, "", nil
444 }
445
446 if feedPost, ok := d.Val.(*bsky.FeedPost); ok {
447 facets := c.extractFacets(feedPost)
448 return &postRecord{
449 Text: feedPost.Text,
450 CreatedAt: feedPost.CreatedAt,
451 }, facets, nil
452 }
453
454 return &postRecord{Text: "", CreatedAt: ""}, "", fmt.Errorf("unknown record type: %T", d.Val)
455}
456
457// extractFacets extracts and serializes facets from a FeedPost
458func (c *BlueskyClient) extractFacets(feedPost *bsky.FeedPost) string {
459 if feedPost == nil || len(feedPost.Facets) == 0 {
460 return ""
461 }
462
463 facetsJSON, err := json.Marshal(feedPost.Facets)
464 if err != nil {
465 return ""
466 }
467
468 return string(facetsJSON)
469}
470
471// parsePostRecordWithReflection uses reflection to access the Val field
472func (c *BlueskyClient) parsePostRecordWithReflection(decoder any) (*postRecord, string, error) {
473 val := reflect.ValueOf(decoder)
474 if val.Kind() == reflect.Pointer {
475 val = val.Elem()
476 }
477
478 valField := val.FieldByName("Val")
479 if !valField.IsValid() {
480 return &postRecord{Text: "", CreatedAt: ""}, "", fmt.Errorf("no Val field found")
481 }
482
483 actualVal := valField.Interface()
484 if actualVal == nil {
485 return &postRecord{Text: "", CreatedAt: ""}, "", nil
486 }
487
488 if feedPost, ok := actualVal.(*bsky.FeedPost); ok {
489 facets := c.extractFacets(feedPost)
490 return &postRecord{
491 Text: feedPost.Text,
492 CreatedAt: feedPost.CreatedAt,
493 }, facets, nil
494 }
495
496 return &postRecord{Text: "", CreatedAt: ""}, "", fmt.Errorf("unknown record type in Val: %T", actualVal)
497}