package main import ( "context" "errors" "fmt" "io" "log" "log/slog" "net/http" "os" "strings" "time" "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/xrpc" "github.com/gorilla/websocket" "github.com/joho/godotenv" ) func main() { err := godotenv.Load(".env") if err != nil { if !os.IsNotExist(err) { log.Fatal("Error loading .env file") } } srv := Server{ upgrader: websocket.Upgrader{}, conns: make(map[string]*websocket.Conn), relays: strings.Split(os.Getenv("RELAYS"), ","), hostname: os.Getenv("HOSTNAME"), } go srv.requestCrawl(context.Background()) mux := http.NewServeMux() mux.HandleFunc("/xrpc/com.atproto.sync.subscribeRepos", srv.handleSyncSubscribeRepos) mux.HandleFunc("POST /events", srv.handleNewEvent) if err := http.ListenAndServe(":3000", mux); err != nil { slog.Error("http listen and serve", "error", err) } } type Server struct { upgrader websocket.Upgrader conns map[string]*websocket.Conn lastRequestCrawl time.Time relays []string hostname string } func (s *Server) handleSyncSubscribeRepos(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithCancel(r.Context()) defer cancel() conn, err := s.upgrader.Upgrade(w, r, w.Header()) if err != nil { slog.Error("unable to establish websocket with relay", "err", err) w.Write([]byte("fail to upgrade to websocket")) return } ident := r.RemoteAddr + "-" + r.UserAgent() slog.Info("new connection established", "ident", ident) s.conns[ident] = conn defer func() { // we should tell the relay to request a new crawl at this point if we got disconnected // use a new context since the old one might be cancelled at this point go func() { retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) defer retryCancel() if err := s.requestCrawl(retryCtx); err != nil { slog.Error("error requesting crawls", "err", err) } }() }() for { select { case <-ctx.Done(): return default: if _, _, err := conn.ReadMessage(); err != nil { slog.Warn("websocket error", "err", err) cancel() return } } } } func (s *Server) handleNewEvent(w http.ResponseWriter, r *http.Request) { b, err := io.ReadAll(r.Body) if err != nil { slog.Error("read event body", "error", err) w.Write([]byte("read event body")) return } slog.Info("got event") for ident, conn := range s.conns { wc, err := conn.NextWriter(websocket.BinaryMessage) if err != nil { slog.Error("error writing message to relay", "err", err) if errors.Is(err, websocket.ErrCloseSent) { delete(s.conns, ident) } continue } _, err = wc.Write(b) if err != nil { slog.Error("writing data to conn", "error", err, "ident", ident) continue } if err := wc.Close(); err != nil { slog.Error("failed to flush-close our event write", "err", err, "ident", ident) continue } } } func (s *Server) requestCrawl(ctx context.Context) error { if time.Since(s.lastRequestCrawl) <= 1*time.Minute { return fmt.Errorf("a crawl request has already been made within the last minute") } for _, relay := range s.relays { slog.Info("requesting crawl from relay", "relay", relay) cli := xrpc.Client{Host: relay} if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{ Hostname: s.hostname, }); err != nil { slog.Error("error requesting crawl", "err", err) } else { slog.Info("crawl requested successfully") } } s.lastRequestCrawl = time.Now() return nil }