A POC for a push based relay connection service
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 150 lines 3.6 kB view raw
1package main 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log" 9 "log/slog" 10 "net/http" 11 "os" 12 "strings" 13 "time" 14 15 "github.com/bluesky-social/indigo/api/atproto" 16 "github.com/bluesky-social/indigo/xrpc" 17 "github.com/gorilla/websocket" 18 "github.com/joho/godotenv" 19) 20 21func main() { 22 err := godotenv.Load(".env") 23 if err != nil { 24 if !os.IsNotExist(err) { 25 log.Fatal("Error loading .env file") 26 } 27 } 28 29 srv := Server{ 30 upgrader: websocket.Upgrader{}, 31 conns: make(map[string]*websocket.Conn), 32 relays: strings.Split(os.Getenv("RELAYS"), ","), 33 hostname: os.Getenv("HOSTNAME"), 34 } 35 36 go srv.requestCrawl(context.Background()) 37 38 mux := http.NewServeMux() 39 mux.HandleFunc("/xrpc/com.atproto.sync.subscribeRepos", srv.handleSyncSubscribeRepos) 40 mux.HandleFunc("POST /events", srv.handleNewEvent) 41 42 if err := http.ListenAndServe(":3000", mux); err != nil { 43 slog.Error("http listen and serve", "error", err) 44 } 45} 46 47type Server struct { 48 upgrader websocket.Upgrader 49 conns map[string]*websocket.Conn 50 lastRequestCrawl time.Time 51 relays []string 52 hostname string 53} 54 55func (s *Server) handleSyncSubscribeRepos(w http.ResponseWriter, r *http.Request) { 56 ctx, cancel := context.WithCancel(r.Context()) 57 defer cancel() 58 59 conn, err := s.upgrader.Upgrade(w, r, w.Header()) 60 if err != nil { 61 slog.Error("unable to establish websocket with relay", "err", err) 62 w.Write([]byte("fail to upgrade to websocket")) 63 return 64 } 65 66 ident := r.RemoteAddr + "-" + r.UserAgent() 67 slog.Info("new connection established", "ident", ident) 68 69 s.conns[ident] = conn 70 71 defer func() { 72 // we should tell the relay to request a new crawl at this point if we got disconnected 73 // use a new context since the old one might be cancelled at this point 74 go func() { 75 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 76 defer retryCancel() 77 if err := s.requestCrawl(retryCtx); err != nil { 78 slog.Error("error requesting crawls", "err", err) 79 } 80 }() 81 }() 82 83 for { 84 select { 85 case <-ctx.Done(): 86 return 87 default: 88 if _, _, err := conn.ReadMessage(); err != nil { 89 slog.Warn("websocket error", "err", err) 90 cancel() 91 return 92 } 93 } 94 } 95} 96 97func (s *Server) handleNewEvent(w http.ResponseWriter, r *http.Request) { 98 b, err := io.ReadAll(r.Body) 99 if err != nil { 100 slog.Error("read event body", "error", err) 101 w.Write([]byte("read event body")) 102 return 103 } 104 105 slog.Info("got event") 106 107 for ident, conn := range s.conns { 108 wc, err := conn.NextWriter(websocket.BinaryMessage) 109 if err != nil { 110 slog.Error("error writing message to relay", "err", err) 111 if errors.Is(err, websocket.ErrCloseSent) { 112 delete(s.conns, ident) 113 } 114 continue 115 } 116 117 _, err = wc.Write(b) 118 if err != nil { 119 slog.Error("writing data to conn", "error", err, "ident", ident) 120 continue 121 } 122 123 if err := wc.Close(); err != nil { 124 slog.Error("failed to flush-close our event write", "err", err, "ident", ident) 125 continue 126 } 127 } 128} 129 130func (s *Server) requestCrawl(ctx context.Context) error { 131 if time.Since(s.lastRequestCrawl) <= 1*time.Minute { 132 return fmt.Errorf("a crawl request has already been made within the last minute") 133 } 134 135 for _, relay := range s.relays { 136 slog.Info("requesting crawl from relay", "relay", relay) 137 cli := xrpc.Client{Host: relay} 138 if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{ 139 Hostname: s.hostname, 140 }); err != nil { 141 slog.Error("error requesting crawl", "err", err) 142 } else { 143 slog.Info("crawl requested successfully") 144 } 145 } 146 147 s.lastRequestCrawl = time.Now() 148 149 return nil 150}