A POC for a push based relay connection service
1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log"
9 "log/slog"
10 "net/http"
11 "os"
12 "strings"
13 "time"
14
15 "github.com/bluesky-social/indigo/api/atproto"
16 "github.com/bluesky-social/indigo/xrpc"
17 "github.com/gorilla/websocket"
18 "github.com/joho/godotenv"
19)
20
21func main() {
22 err := godotenv.Load(".env")
23 if err != nil {
24 if !os.IsNotExist(err) {
25 log.Fatal("Error loading .env file")
26 }
27 }
28
29 srv := Server{
30 upgrader: websocket.Upgrader{},
31 conns: make(map[string]*websocket.Conn),
32 relays: strings.Split(os.Getenv("RELAYS"), ","),
33 hostname: os.Getenv("HOSTNAME"),
34 }
35
36 go srv.requestCrawl(context.Background())
37
38 mux := http.NewServeMux()
39 mux.HandleFunc("/xrpc/com.atproto.sync.subscribeRepos", srv.handleSyncSubscribeRepos)
40 mux.HandleFunc("POST /events", srv.handleNewEvent)
41
42 if err := http.ListenAndServe(":3000", mux); err != nil {
43 slog.Error("http listen and serve", "error", err)
44 }
45}
46
47type Server struct {
48 upgrader websocket.Upgrader
49 conns map[string]*websocket.Conn
50 lastRequestCrawl time.Time
51 relays []string
52 hostname string
53}
54
55func (s *Server) handleSyncSubscribeRepos(w http.ResponseWriter, r *http.Request) {
56 ctx, cancel := context.WithCancel(r.Context())
57 defer cancel()
58
59 conn, err := s.upgrader.Upgrade(w, r, w.Header())
60 if err != nil {
61 slog.Error("unable to establish websocket with relay", "err", err)
62 w.Write([]byte("fail to upgrade to websocket"))
63 return
64 }
65
66 ident := r.RemoteAddr + "-" + r.UserAgent()
67 slog.Info("new connection established", "ident", ident)
68
69 s.conns[ident] = conn
70
71 defer func() {
72 // we should tell the relay to request a new crawl at this point if we got disconnected
73 // use a new context since the old one might be cancelled at this point
74 go func() {
75 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second)
76 defer retryCancel()
77 if err := s.requestCrawl(retryCtx); err != nil {
78 slog.Error("error requesting crawls", "err", err)
79 }
80 }()
81 }()
82
83 for {
84 select {
85 case <-ctx.Done():
86 return
87 default:
88 if _, _, err := conn.ReadMessage(); err != nil {
89 slog.Warn("websocket error", "err", err)
90 cancel()
91 return
92 }
93 }
94 }
95}
96
97func (s *Server) handleNewEvent(w http.ResponseWriter, r *http.Request) {
98 b, err := io.ReadAll(r.Body)
99 if err != nil {
100 slog.Error("read event body", "error", err)
101 w.Write([]byte("read event body"))
102 return
103 }
104
105 slog.Info("got event")
106
107 for ident, conn := range s.conns {
108 wc, err := conn.NextWriter(websocket.BinaryMessage)
109 if err != nil {
110 slog.Error("error writing message to relay", "err", err)
111 if errors.Is(err, websocket.ErrCloseSent) {
112 delete(s.conns, ident)
113 }
114 continue
115 }
116
117 _, err = wc.Write(b)
118 if err != nil {
119 slog.Error("writing data to conn", "error", err, "ident", ident)
120 continue
121 }
122
123 if err := wc.Close(); err != nil {
124 slog.Error("failed to flush-close our event write", "err", err, "ident", ident)
125 continue
126 }
127 }
128}
129
130func (s *Server) requestCrawl(ctx context.Context) error {
131 if time.Since(s.lastRequestCrawl) <= 1*time.Minute {
132 return fmt.Errorf("a crawl request has already been made within the last minute")
133 }
134
135 for _, relay := range s.relays {
136 slog.Info("requesting crawl from relay", "relay", relay)
137 cli := xrpc.Client{Host: relay}
138 if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{
139 Hostname: s.hostname,
140 }); err != nil {
141 slog.Error("error requesting crawl", "err", err)
142 } else {
143 slog.Info("crawl requested successfully")
144 }
145 }
146
147 s.lastRequestCrawl = time.Now()
148
149 return nil
150}