Don't forget to lycansubscribe 🐺
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

ah fuck it, just use the appview

Kuba Suder 0793e2f3 71d241c0

+70 -55
+4 -4
app/importer.rb
··· 6 6 require_relative 'models/user' 7 7 8 8 class Importer 9 - attr_accessor :post_queue, :report 9 + attr_accessor :like_queue, :report 10 10 11 11 def initialize(user_did) 12 12 @did = DID.new(user_did) ··· 68 68 else 69 69 like_stub = @user.likes.create!(rkey: like_rkey, time: like_time, post_uri: post_uri) 70 70 71 - if @post_queue 72 - @post_queue.push(like_stub) 73 - @report&.update(queue: { length: @post_queue.length }) 71 + if @like_queue 72 + @like_queue.push(like_stub) 73 + @report&.update(queue: { length: @like_queue.length }) 74 74 end 75 75 end 76 76 rescue StandardError => e
+28
app/like_queue.rb
··· 1 + class LikeQueue 2 + BATCH_SIZE = 25 3 + 4 + def initialize 5 + @mutex = Mutex.new 6 + @queue = [] 7 + end 8 + 9 + def push(like) 10 + @mutex.synchronize { 11 + @queue << like 12 + } 13 + end 14 + 15 + def pop_batch 16 + @mutex.synchronize { 17 + batch = @queue[0...BATCH_SIZE] 18 + @queue = @queue[BATCH_SIZE..-1] || [] 19 + batch 20 + } 21 + end 22 + 23 + def length 24 + @mutex.synchronize { 25 + @queue.length 26 + } 27 + end 28 + end
+31 -24
app/post_downloader.rb
··· 3 3 4 4 require_relative 'models/post' 5 5 require_relative 'models/user' 6 - require_relative 'user_pds_cache' 7 6 8 7 class PostDownloader 9 - attr_accessor :report, :name 8 + attr_accessor :report 10 9 11 10 def initialize 12 - @name = :downloader 11 + @sky = Minisky.new(ENV['APPVIEW'] || 'public.api.bsky.app', nil) 13 12 end 14 13 15 14 def import_from_queue(queue) 16 15 count = 0 16 + oldest = Time.now 17 17 18 18 loop do 19 - like = queue.pop 19 + likes = queue.pop_batch 20 + 21 + if likes.empty? 22 + sleep 1 23 + next 24 + end 25 + 20 26 @report&.update(queue: { length: queue.length }) 21 27 22 28 begin 23 - post = import_post(like.post_uri) 24 - like.update!(post: post, post_uri: nil) 25 - count += 1 29 + response = @sky.get_request('app.bsky.feed.getPosts', { uris: likes.map(&:post_uri) }) 30 + 31 + response['posts'].each do |data| 32 + begin 33 + like = likes.detect { |x| x.post_uri == data['uri'] } 34 + likes.delete(like) 35 + 36 + post = save_post(data['uri'], data['record']) 37 + like.update!(post: post, post_uri: nil) 38 + count += 1 39 + oldest = [oldest, like.time].min 26 40 27 - @report&.update(@name => { downloaded_posts: count, oldest_date: like.time }) 41 + @report&.update(downloader: { downloaded_posts: count, oldest_date: oldest }) 42 + rescue StandardError => e 43 + puts "Error in PostDownloader: #{like.post_uri}: #{e}" 44 + end 45 + end 28 46 rescue StandardError => e 29 - puts "Error in PostDownloader: #{like.post_uri}: #{e}" 47 + puts "Error in PostDownloader: #{e}" 30 48 end 31 49 end 32 50 end 33 51 34 - def import_post(post_uri) 52 + def save_post(post_uri, record) 35 53 did, _, rkey = post_uri.split('/')[2..4] 36 - post_record = fetch_post_record(did, rkey) 37 54 38 - json = post_record['value'] 39 - text = json.delete('text') 40 - created = json.delete('createdAt') 55 + text = record.delete('text') 56 + created = record.delete('createdAt') 41 57 42 58 author = User.find_or_create_by!(did: did) 43 59 ··· 46 62 rkey: rkey, 47 63 time: Time.parse(created), 48 64 text: text, 49 - data: JSON.generate(json) 65 + data: JSON.generate(record) 50 66 ) 51 - end 52 - 53 - def fetch_post_record(did, rkey) 54 - UserPDSCache[did] ||= DID.new(did).document.pds_host 55 - 56 - pds = UserPDSCache[did] 57 - sky = Minisky.new(pds, nil) 58 - 59 - sky.get_request('com.atproto.repo.getRecord', { repo: did, collection: 'app.bsky.feed.post', rkey: rkey }) 60 67 end 61 68 end
-13
app/user_pds_cache.rb
··· 1 - class UserPDSCache 2 - class << self 3 - def [](did) 4 - @pds_cache ||= {} 5 - @pds_cache[did] 6 - end 7 - 8 - def []=(did, pds) 9 - @pds_cache ||= {} 10 - @pds_cache[did] = pds 11 - end 12 - end 13 - end
+7 -14
lib/tasks/import.rake
··· 1 1 require_relative '../../app/importer' 2 + require_relative '../../app/like_queue' 2 3 require_relative '../../app/post_downloader' 3 4 4 5 class ImportReport ··· 24 25 puts "Importer: imported likes = #{@data.dig(:importer, :imported_likes) || 0} (until: #{@data.dig(:importer, :oldest_date)})" 25 26 26 27 print " " * 80 + "\r" 27 - downloaders = @data.select { |k, v| k.start_with?('downloader') } 28 - sum = downloaders.map { |k, v| v[:downloaded_posts] }.reduce(0, &:+) 29 - oldest = downloaders.map { |k, v| v[:oldest_date] }.sort.first 30 - puts "Downloaders: imported posts = #{sum} (until: #{oldest})" 28 + puts "Downloader: imported posts = #{@data.dig(:downloader, :downloaded_posts) || 0} (until: #{@data.dig(:downloader, :oldest_date)})" 31 29 32 30 print " " * 80 + "\r" 33 31 puts "Queue size: #{@data.dig(:queue, :length) || 0}" ··· 41 39 raise "Required USER parameter missing" 42 40 end 43 41 44 - queue = Queue.new 42 + queue = LikeQueue.new 45 43 report = ImportReport.new 46 44 47 45 importer = Importer.new(ENV['USER']) 48 - importer.post_queue = queue 46 + importer.like_queue = queue 49 47 importer.report = report 50 48 51 - workers = ENV['WORKERS']&.to_i || 1 52 - 53 - 1.upto(workers) do |i| 54 - downloader = PostDownloader.new 55 - downloader.report = report 56 - downloader.name = "downloader#{i}".to_sym 49 + downloader = PostDownloader.new 50 + downloader.report = report 57 51 58 - Thread.new { downloader.import_from_queue(queue) } 59 - end 52 + Thread.new { downloader.import_from_queue(queue) } 60 53 61 54 importer.run_import(ENV['UNTIL']) 62 55 end