A tool for measuring the coverage of Bluesky/ATProto relays
1#!/usr/bin/env ruby
2
3require 'bundler/setup'
4require 'json'
5require 'skyfall'
6require 'yaml'
7
8require_relative 'init'
9require_relative 'opts'
10require_relative 'report'
11require_relative 'test_run'
12
13def get_hosts(entries)
14 if entries
15 entries.map { |x| x.is_a?(Hash) ? x['host'] : x }
16 else
17 []
18 end
19end
20
21config = YAML.load(File.read(SOURCES))
22options = parse_options(ARGV)
23
24if options[:relays] || options[:jetstreams]
25 relays = options[:relays] || []
26 jetstreams = options[:jetstreams] || []
27else
28 relays = get_hosts(config['relays'])
29 jetstreams = get_hosts(config['jetstreams'])
30end
31
32maxlen = (relays + jetstreams).map(&:length).max
33
34verbose = options[:verbose]
35duration = options[:duration] || config['duration']&.to_i || 60 * 15
36test_start_time = Time.now
37
38Worker = Struct.new(:host, :type, :pid, :pipe)
39workers = []
40
41sources = relays.map { |h| [:firehose, h] } + jetstreams.map { |h| [:jetstream, h] }
42
43sources.each do |type, host|
44 input, output = IO.pipe
45
46 pid = fork do
47 input.close
48 sky = (type == :firehose) ? Skyfall::Firehose.new(host) : Skyfall::Jetstream.new(host)
49
50 events = 0
51 users = Set.new
52 minute = Time.now.to_i / 60
53 connected = false
54 reconnects = 0
55 errors = 0
56
57 sky.on_connecting { puts "[#{Time.now}] #{host}: Connecting..." }
58 sky.on_connect { puts "[#{Time.now}] #{host}: Connected ✓"; connected = true }
59 sky.on_reconnect { puts "[#{Time.now}] #{host}: Connection lost, reconnecting..."; reconnects += 1 }
60 sky.on_error { |e| puts "[#{Time.now}] #{host}: ERROR: #{e.message}"; errors += 1 }
61
62 sky.on_message do |msg|
63 events += 1
64 users << msg.did
65
66 if verbose
67 now = Time.now.to_i / 60
68 if now > minute
69 puts "[#{Time.now}] #{host.ljust(maxlen)} | events: #{events.to_s.ljust(8)} | users: #{users.size}"
70 minute = now
71 end
72 end
73 end
74
75 trap('SIGINT') { sky.disconnect }
76
77 sky.connect
78 puts "[#{Time.now}] #{host}: Finished."
79
80 output.puts(JSON.generate({ events: events, users: users.size, connected: connected, errors: errors, reconnects: reconnects }))
81 end
82
83 output.close
84
85 workers << Worker.new(host, type, pid, input)
86end
87
88begin
89 sleep(duration)
90
91 Process.kill('SIGINT', *workers.map(&:pid))
92
93 unless options[:dont_save]
94 test = TestRun.create!(start_time: test_start_time, duration: duration)
95 end
96
97 while !workers.empty?
98 pid = Process.wait
99 worker = workers.detect { |w| w.pid == pid }
100 workers.delete(worker)
101
102 line = worker.pipe.gets
103 next if line.nil?
104
105 result = JSON.parse(line)
106 puts "#{worker.host}: #{result.inspect}" if verbose
107
108 unless options[:dont_save]
109 test.reports.create!(
110 host: worker.host,
111 source_type: worker.type,
112 users: result['users'],
113 events: result['events'],
114 connected: result['connected'],
115 error_count: result['errors'],
116 reconnect_count: result['reconnects']
117 )
118 end
119 end
120
121 test&.update_max_users
122rescue Interrupt
123 puts
124 puts "Stopping..."
125
126 Process.kill('SIGINT', *workers.map(&:pid))
127
128 while !workers.empty?
129 pid = Process.wait
130 worker = workers.detect { |w| w.pid == pid }
131 workers.delete(worker)
132
133 line = worker.pipe.gets
134 end
135end