A tool for measuring the coverage of Bluesky/ATProto relays
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 135 lines 3.2 kB view raw
1#!/usr/bin/env ruby 2 3require 'bundler/setup' 4require 'json' 5require 'skyfall' 6require 'yaml' 7 8require_relative 'init' 9require_relative 'opts' 10require_relative 'report' 11require_relative 'test_run' 12 13def get_hosts(entries) 14 if entries 15 entries.map { |x| x.is_a?(Hash) ? x['host'] : x } 16 else 17 [] 18 end 19end 20 21config = YAML.load(File.read(SOURCES)) 22options = parse_options(ARGV) 23 24if options[:relays] || options[:jetstreams] 25 relays = options[:relays] || [] 26 jetstreams = options[:jetstreams] || [] 27else 28 relays = get_hosts(config['relays']) 29 jetstreams = get_hosts(config['jetstreams']) 30end 31 32maxlen = (relays + jetstreams).map(&:length).max 33 34verbose = options[:verbose] 35duration = options[:duration] || config['duration']&.to_i || 60 * 15 36test_start_time = Time.now 37 38Worker = Struct.new(:host, :type, :pid, :pipe) 39workers = [] 40 41sources = relays.map { |h| [:firehose, h] } + jetstreams.map { |h| [:jetstream, h] } 42 43sources.each do |type, host| 44 input, output = IO.pipe 45 46 pid = fork do 47 input.close 48 sky = (type == :firehose) ? Skyfall::Firehose.new(host) : Skyfall::Jetstream.new(host) 49 50 events = 0 51 users = Set.new 52 minute = Time.now.to_i / 60 53 connected = false 54 reconnects = 0 55 errors = 0 56 57 sky.on_connecting { puts "[#{Time.now}] #{host}: Connecting..." } 58 sky.on_connect { puts "[#{Time.now}] #{host}: Connected ✓"; connected = true } 59 sky.on_reconnect { puts "[#{Time.now}] #{host}: Connection lost, reconnecting..."; reconnects += 1 } 60 sky.on_error { |e| puts "[#{Time.now}] #{host}: ERROR: #{e.message}"; errors += 1 } 61 62 sky.on_message do |msg| 63 events += 1 64 users << msg.did 65 66 if verbose 67 now = Time.now.to_i / 60 68 if now > minute 69 puts "[#{Time.now}] #{host.ljust(maxlen)} | events: #{events.to_s.ljust(8)} | users: #{users.size}" 70 minute = now 71 end 72 end 73 end 74 75 trap('SIGINT') { sky.disconnect } 76 77 sky.connect 78 puts "[#{Time.now}] #{host}: Finished." 79 80 output.puts(JSON.generate({ events: events, users: users.size, connected: connected, errors: errors, reconnects: reconnects })) 81 end 82 83 output.close 84 85 workers << Worker.new(host, type, pid, input) 86end 87 88begin 89 sleep(duration) 90 91 Process.kill('SIGINT', *workers.map(&:pid)) 92 93 unless options[:dont_save] 94 test = TestRun.create!(start_time: test_start_time, duration: duration) 95 end 96 97 while !workers.empty? 98 pid = Process.wait 99 worker = workers.detect { |w| w.pid == pid } 100 workers.delete(worker) 101 102 line = worker.pipe.gets 103 next if line.nil? 104 105 result = JSON.parse(line) 106 puts "#{worker.host}: #{result.inspect}" if verbose 107 108 unless options[:dont_save] 109 test.reports.create!( 110 host: worker.host, 111 source_type: worker.type, 112 users: result['users'], 113 events: result['events'], 114 connected: result['connected'], 115 error_count: result['errors'], 116 reconnect_count: result['reconnects'] 117 ) 118 end 119 end 120 121 test&.update_max_users 122rescue Interrupt 123 puts 124 puts "Stopping..." 125 126 Process.kill('SIGINT', *workers.map(&:pid)) 127 128 while !workers.empty? 129 pid = Process.wait 130 worker = workers.detect { |w| w.pid == pid } 131 workers.delete(worker) 132 133 line = worker.pipe.gets 134 end 135end