backend for xcvr appview
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add jetstream, refactor xrpc clients, implement post channel endpoint

rachel-mp4 1b7e57ec a93b0b5a

+781 -380
+2 -2
migrations/001_init.up.sql
··· 71 71 pkce_verifier TEXT, 72 72 dpop_auth_server_nonce TEXT, 73 73 dpop_private_jwk TEXT, 74 - UNIQUE(did, state) 74 + UNIQUE(state) 75 75 ); 76 76 77 77 CREATE TABLE oauthsessions ( ··· 87 87 access_token TEXT, 88 88 refresh_token TEXT, 89 89 expiration TIMESTAMPTZ, 90 - UNIQUE(did, state) 90 + UNIQUE(state) 91 91 );
+24 -59
server/cmd/main.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 5 + "errors" 4 6 "net/http" 5 7 "os" 6 8 "time" 9 + "xcvr-backend/internal/atplistener" 7 10 "xcvr-backend/internal/db" 8 11 "xcvr-backend/internal/handler" 9 12 "xcvr-backend/internal/log" ··· 40 43 panic(err) 41 44 } 42 45 h := handler.New(store, &logger, oauthclient) 46 + go consumeLoop(context.Background(), store, &logger) 43 47 http.ListenAndServe(":8080", h.WithCORSAll()) 44 48 45 49 } 46 50 47 - // func initChannel(w http.ResponseWriter, r *http.Request) { 48 - // decoder := json.NewDecoder(r.Body) 49 - // var c channel 50 - // err := decoder.Decode(&c) 51 - // if err != nil { 52 - // http.Error(w, "invalid json", http.StatusBadRequest) 53 - // } 54 - // switch isValidInit(c) { 55 - // case ieNoBand: 56 - // http.Error(w, "must give a band", http.StatusBadRequest) 57 - // return 58 - // case ieLongBand: 59 - // http.Error(w, "band must be shorter than 32 bytes", http.StatusBadRequest) 60 - // return 61 - // case ieCollision: 62 - // http.Error(w, "band must be unique", http.StatusBadRequest) 63 - // return 64 - // case ieLongSign: 65 - // http.Error(w, "sign must be shorter than 51 code points", http.StatusBadRequest) 66 - // return 67 - // case ieOK: 68 - // c, err = createChannel(c, false) 69 - // } 70 - // if err != nil { 71 - // http.Error(w, "uh oh", http.StatusTeapot) 72 - // } 73 - // fmt.Printf("created a channel on band: %s and call sign: %s\n", c.Band, c.Sign) 74 - // encoder := json.NewEncoder(w) 75 - // encoder.Encode(c) 76 - // } 51 + const ( 52 + defaultServerAddr = "wss://jetstream.atproto.tools/subscribe" 53 + ) 77 54 78 - // type initError = int 79 - 80 - // const ( 81 - // ieOK initError = iota 82 - // ieNoBand 83 - // ieLongBand 84 - // ieCollision 85 - // ieLongSign 86 - // ) 87 - 88 - // // TODO: can changes to bandToServer after unlock create data race? 89 - // func isValidInit(c channel) initError { 90 - // if c.Band == "" { 91 - // return ieNoBand 92 - // } 93 - // if len(c.Band) > 31 { 94 - // return ieLongBand 95 - // } 96 - // channelsMu.Lock() 97 - // _, ok := bandToServer[c.Band] 98 - // channelsMu.Unlock() 99 - // if ok { 100 - // return ieCollision 101 - // } 102 - // if utf8.RuneCountInString(c.Sign) > 50 { 103 - // return ieLongSign 104 - // } 105 - // return ieOK 106 - // } 55 + func consumeLoop(ctx context.Context, db *db.Store, l *log.Logger) { 56 + jsServerAddr := os.Getenv("JS_SERVER_ADDR") 57 + if jsServerAddr == "" { 58 + jsServerAddr = defaultServerAddr 59 + } 60 + consumer := atplistener.NewConsumer(jsServerAddr, l, db) 61 + for { 62 + err := consumer.Consume(ctx) 63 + if err != nil { 64 + l.Deprintf("error in consume loop: %s", err.Error()) 65 + if errors.Is(err, context.Canceled) { 66 + l.Deprintf("exiting consume loop") 67 + return 68 + } 69 + } 70 + } 71 + }
+9 -8
server/go.mod
··· 4 4 5 5 require ( 6 6 github.com/bluesky-social/indigo v0.0.0-20250616202859-d4516ea1d6cf 7 + github.com/bluesky-social/jetstream v0.0.0-20250414024304-d17bd81a945e 7 8 github.com/gorilla/sessions v1.4.0 8 9 github.com/haileyok/atproto-oauth-golang v0.0.2 10 + github.com/ipfs/go-cid v0.4.1 9 11 github.com/jackc/pgx/v5 v5.7.4 10 12 github.com/joho/godotenv v1.5.1 11 13 github.com/lestrrat-go/jwx/v2 v2.0.12 12 14 github.com/rachel-mp4/lrcd v0.0.0-20250603192958-089ba44e79a5 13 15 github.com/rivo/uniseg v0.4.7 14 16 github.com/whyrusleeping/cbor-gen v0.3.1 17 + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 15 18 ) 16 19 17 20 require ( 18 21 github.com/beorn7/perks v1.0.1 // indirect 19 22 github.com/carlmjohnson/versioninfo v0.22.5 // indirect 20 - github.com/cespare/xxhash/v2 v2.2.0 // indirect 23 + github.com/cespare/xxhash/v2 v2.3.0 // indirect 21 24 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect 22 25 github.com/felixge/httpsnoop v1.0.4 // indirect 23 26 github.com/go-logr/logr v1.4.2 // indirect ··· 34 37 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 35 38 github.com/ipfs/bbloom v0.0.4 // indirect 36 39 github.com/ipfs/go-block-format v0.2.0 // indirect 37 - github.com/ipfs/go-cid v0.4.1 // indirect 38 40 github.com/ipfs/go-datastore v0.6.0 // indirect 39 41 github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect 40 42 github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect ··· 48 50 github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect 49 51 github.com/jackc/puddle/v2 v2.2.2 // indirect 50 52 github.com/jbenet/goprocess v0.1.4 // indirect 53 + github.com/klauspost/compress v1.17.9 // indirect 51 54 github.com/klauspost/cpuid/v2 v2.2.7 // indirect 52 55 github.com/lestrrat-go/blackmagic v1.0.2 // indirect 53 56 github.com/lestrrat-go/httpcc v1.0.1 // indirect ··· 55 58 github.com/lestrrat-go/iter v1.0.2 // indirect 56 59 github.com/lestrrat-go/option v1.0.1 // indirect 57 60 github.com/mattn/go-isatty v0.0.20 // indirect 58 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect 59 61 github.com/minio/sha256-simd v1.0.1 // indirect 60 62 github.com/mr-tron/base58 v1.2.0 // indirect 61 63 github.com/multiformats/go-base32 v0.1.0 // indirect ··· 65 67 github.com/multiformats/go-varint v0.0.7 // indirect 66 68 github.com/opentracing/opentracing-go v1.2.0 // indirect 67 69 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 68 - github.com/prometheus/client_golang v1.17.0 // indirect 69 - github.com/prometheus/client_model v0.5.0 // indirect 70 - github.com/prometheus/common v0.45.0 // indirect 71 - github.com/prometheus/procfs v0.12.0 // indirect 70 + github.com/prometheus/client_golang v1.19.1 // indirect 71 + github.com/prometheus/client_model v0.6.1 // indirect 72 + github.com/prometheus/common v0.54.0 // indirect 73 + github.com/prometheus/procfs v0.15.1 // indirect 72 74 github.com/rachel-mp4/lrcproto v0.0.0-20250527205756-58da8216f98c // indirect 73 75 github.com/segmentio/asm v1.2.0 // indirect 74 76 github.com/spaolacci/murmur3 v1.1.0 // indirect ··· 86 88 golang.org/x/sys v0.28.0 // indirect 87 89 golang.org/x/text v0.21.0 // indirect 88 90 golang.org/x/time v0.8.0 // indirect 89 - golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect 90 91 google.golang.org/protobuf v1.36.6 // indirect 91 92 lukechampine.com/blake3 v1.2.1 // indirect 92 93 )
+14 -12
server/go.sum
··· 4 4 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= 5 5 github.com/bluesky-social/indigo v0.0.0-20250616202859-d4516ea1d6cf h1:LFlwtY9r95lAI1yYKolCLTQnwK5VjgWO87mNsKdj3Qs= 6 6 github.com/bluesky-social/indigo v0.0.0-20250616202859-d4516ea1d6cf/go.mod h1:8FlFpF5cIq3DQG0kEHqyTkPV/5MDQoaWLcVwza5ZPJU= 7 + github.com/bluesky-social/jetstream v0.0.0-20250414024304-d17bd81a945e h1:P/O6TDHs53gwgV845uDHI+Nri889ixksRrh4bCkCdxo= 8 + github.com/bluesky-social/jetstream v0.0.0-20250414024304-d17bd81a945e/go.mod h1:WiYEeyJSdUwqoaZ71KJSpTblemUCpwJfh5oVXplK6T4= 7 9 github.com/carlmjohnson/versioninfo v0.22.5 h1:O00sjOLUAFxYQjlN/bzYTuZiS0y6fWDQjMRvwtKgwwc= 8 10 github.com/carlmjohnson/versioninfo v0.22.5/go.mod h1:QT9mph3wcVfISUKd0i9sZfVrPviHuSF+cUtLjm2WSf8= 9 - github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= 10 - github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 11 + github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 12 + github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 11 13 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= 12 14 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 13 15 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= ··· 99 101 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= 100 102 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= 101 103 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= 104 + github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= 105 + github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= 102 106 github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= 103 107 github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= 104 108 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= ··· 125 129 github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= 126 130 github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= 127 131 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= 128 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= 129 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= 130 132 github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= 131 133 github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= 132 134 github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= ··· 149 151 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 150 152 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f h1:VXTQfuJj9vKR4TCkEuWIckKvdHFeJH/huIFJ9/cXOB0= 151 153 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= 152 - github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= 153 - github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= 154 - github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= 155 - github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= 156 - github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= 157 - github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= 158 - github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= 159 - github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= 154 + github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= 155 + github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= 156 + github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= 157 + github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= 158 + github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= 159 + github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= 160 + github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= 161 + github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= 160 162 github.com/rachel-mp4/atproto-oauth-golang v0.0.0-20250616212213-a55a5f62b82d h1:FQ8YKfXnKmyEbKnO/blj3qWGhYdw+l3DtQCqSboJRvA= 161 163 github.com/rachel-mp4/atproto-oauth-golang v0.0.0-20250616212213-a55a5f62b82d/go.mod h1:vVRo6BPEmWOZnYk9LtXLzBPzfkY63fUaBahA+o4h55Q= 162 164 github.com/rachel-mp4/lrcd v0.0.0-20250603192958-089ba44e79a5 h1:NMDkC4XYysiYebcoFDnsPdBVr8/NEuahKM6xqQJITp0=
+117
server/internal/atplistener/jetstream.go
··· 1 + package atplistener 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "fmt" 8 + "github.com/bluesky-social/indigo/atproto/syntax" 9 + "github.com/bluesky-social/jetstream/pkg/client" 10 + "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 11 + "github.com/bluesky-social/jetstream/pkg/models" 12 + "time" 13 + "xcvr-backend/internal/db" 14 + "xcvr-backend/internal/lex" 15 + "xcvr-backend/internal/log" 16 + "xcvr-backend/internal/types" 17 + ) 18 + 19 + type Consumer struct { 20 + cfg *client.ClientConfig 21 + logger *log.Logger 22 + handler *handler 23 + } 24 + 25 + type handler struct { 26 + db *db.Store 27 + } 28 + 29 + func NewConsumer(jsAddr string, l *log.Logger, db *db.Store) *Consumer { 30 + cfg := client.DefaultClientConfig() 31 + if jsAddr != "" { 32 + cfg.WebsocketURL = jsAddr 33 + } 34 + cfg.WantedCollections = []string{ 35 + "org.xcvr.actor.profile", 36 + "org.xcvr.feed.channel", 37 + "org.xcvr.lrc.message", 38 + "org.xcvr.lrc.signet", 39 + } 40 + cfg.WantedDids = []string{} 41 + return &Consumer{ 42 + cfg: cfg, 43 + logger: l, 44 + handler: &handler{db: db}, 45 + } 46 + } 47 + 48 + func (c *Consumer) Consume(ctx context.Context) error { 49 + scheduler := sequential.NewScheduler("jetstream_localdev", c.logger.Slog, c.handler.HandleEvent) 50 + defer scheduler.Shutdown() 51 + client, err := client.NewClient(c.cfg, c.logger.Slog, scheduler) 52 + if err != nil { 53 + return errors.New("failed to create client: " + err.Error()) 54 + } 55 + cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 56 + err = client.ConnectAndRead(ctx, &cursor) 57 + if err != nil { 58 + return errors.New("error connecting and reading: " + err.Error()) 59 + } 60 + return nil 61 + } 62 + 63 + func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { 64 + if event.Commit == nil { 65 + return nil 66 + } 67 + 68 + switch event.Commit.Collection { 69 + case "org.xcvr.actor.profile": 70 + return h.handleProfile(ctx, event) 71 + case "org.xcvr.feed.channel": 72 + return h.handleChannel(ctx, event) 73 + } 74 + return nil 75 + } 76 + 77 + func (h *handler) handleProfile(ctx context.Context, event *models.Event) error { 78 + var pr lex.ProfileRecord 79 + err := json.Unmarshal(event.Commit.Record, &pr) 80 + if err != nil { 81 + return errors.New("error unmarshaling: " + err.Error()) 82 + } 83 + to := db.ProfileUpdate{ 84 + DID: event.Did, 85 + } 86 + to.UpdateName = pr.DisplayName != nil 87 + to.Name = pr.DisplayName 88 + to.UpdateNick = pr.DefaultNick != nil 89 + to.Nick = pr.DefaultNick 90 + to.UpdateStatus = pr.Status != nil 91 + to.Status = pr.Status 92 + to.UpdateColor = pr.Color != nil 93 + to.Color = pr.Color 94 + return h.db.UpdateProfile(to, ctx) 95 + } 96 + 97 + func (h *handler) handleChannel(ctx context.Context, event *models.Event) error { 98 + var cr lex.ChannelRecord 99 + err := json.Unmarshal(event.Commit.Record, &cr) 100 + if err != nil { 101 + return errors.New("error unmarshl: " + err.Error()) 102 + } 103 + then, err := syntax.ParseDatetimeTime(cr.CreatedAt) 104 + if err != nil { 105 + then = time.Now() 106 + } 107 + channel := types.Channel{ 108 + URI: fmt.Sprintf("at://%s/org.xcvr.feed.channel/%s", event.Did, event.Commit.RKey), 109 + CID: event.Commit.CID, 110 + DID: event.Did, 111 + Host: cr.Host, 112 + Title: cr.Title, 113 + Topic: cr.Topic, 114 + CreatedAt: then, 115 + } 116 + return h.db.StoreChannel(channel, ctx) 117 + }
+30
server/internal/atputils/identity.go
··· 9 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 10 "io" 11 11 "net/http" 12 + "os" 12 13 "strings" 13 14 ) 15 + 16 + var ( 17 + my_handle *string 18 + my_did *string 19 + ) 20 + 21 + func GetMyHandle() string { 22 + if my_handle != nil { 23 + return *my_handle 24 + } 25 + handle := os.Getenv("MY_IDENTITY") 26 + my_handle = &handle 27 + return *my_handle 28 + } 29 + 30 + func GetMyDid(ctx context.Context) (string, error) { 31 + if my_did != nil { 32 + return *my_did, nil 33 + } 34 + if my_handle == nil { 35 + GetMyHandle() 36 + } 37 + did, err := GetDidFromHandle(ctx, *my_handle) 38 + if err != nil { 39 + return "", err 40 + } 41 + my_did = &did 42 + return did, nil 43 + } 14 44 15 45 func GetHandleFromDid(ctx context.Context, did string) (string, error) { 16 46 sdid, err := syntax.ParseDID(did)
+20
server/internal/atputils/validation.go
··· 1 + package atputils 2 + 3 + import ( 4 + "github.com/rivo/uniseg" 5 + "unicode/utf16" 6 + ) 7 + 8 + func ValidateGraphemesAndLength(s string, maxgraphemes int, maxlength int) bool { 9 + return ValidateGraphemes(s, maxgraphemes) || ValidateLength(s, maxlength) 10 + } 11 + 12 + func ValidateGraphemes(s string, max int) bool { 13 + return uniseg.GraphemeClusterCount(s) > max 14 + } 15 + 16 + func ValidateLength(s string, max int) bool { 17 + runes := []rune(s) 18 + us := utf16.Encode(runes) 19 + return len(us) > max 20 + }
+17
server/internal/db/lexicon.go
··· 110 110 } 111 111 return &p, nil 112 112 } 113 + 114 + func (s *Store) StoreChannel(channel types.Channel, ctx context.Context) error { 115 + _, err := s.pool.Exec(ctx, ` 116 + INSERT INTO channels ( 117 + uri, 118 + cid, 119 + did, 120 + host, 121 + title, 122 + topic, 123 + created_at 124 + ) VALUES ( 125 + $1, $2, $3, $4, $5, $6, $7 126 + ) ON CONFLICT (uri) DO NOTHING 127 + `, channel.URI, channel.CID, channel.DID, channel.Host, channel.Title, channel.Topic, channel.CreatedAt) 128 + return err 129 + }
+7 -7
server/internal/db/oauth.go
··· 89 89 return &req, nil 90 90 } 91 91 92 - func (s *Store) GetOauthSesson(did string, ctx context.Context) (*types.Session, error) { 92 + func (s *Store) GetOauthSession(id uint, ctx context.Context) (*types.Session, error) { 93 93 rows, err := s.pool.Query(ctx, ` 94 94 SELECT 95 95 r.authserver_iss, ··· 103 103 r.refresh_token, 104 104 r.expiration 105 105 FROM oauthsessions r 106 - WHERE r.did = $1 106 + WHERE r.id = $1 107 107 LIMIT 1 108 - `, did) 108 + `, id) 109 109 if err != nil { 110 110 return nil, errors.New("error querying oauthsessions:" + err.Error()) 111 111 } ··· 142 142 return nil 143 143 } 144 144 145 - func (s *Store) SetDpopPdsNonce(did, dpopnonce string) error { 145 + func (s *Store) SetDpopPdsNonce(id uint, dpopnonce string) error { 146 146 _, err := s.pool.Exec(context.Background(), ` 147 - UPDATE oauthsessions SET dpop_pds_nonce = $1 WHERE did = $2 148 - `, dpopnonce, did) 147 + UPDATE oauthsessions SET dpop_pds_nonce = $1 WHERE id = $2 148 + `, dpopnonce, id) 149 149 if err != nil { 150 - return errors.New(fmt.Sprintf("error updating dpop nonce for did %s: %s", did, err.Error())) 150 + return errors.New(fmt.Sprintf("error updating dpop nonce for id %d: %s", id, err.Error())) 151 151 } 152 152 return nil 153 153 }
+8 -6
server/internal/handler/handler.go
··· 19 19 router *http.ServeMux 20 20 logger *log.Logger 21 21 oauth *oauth.Service 22 - xrpc *oauth.Client 22 + myClient *oauth.PasswordClient 23 + clientmap *oauth.ClientMap 23 24 } 24 25 25 26 func New(db *db.Store, logger *log.Logger, oauthserv *oauth.Service) *Handler { 26 27 mux := http.NewServeMux() 27 28 sessionStore := sessions.NewCookieStore([]byte(os.Getenv("SESSION_KEY"))) 28 - did, err := atputils.GetDidFromHandle(context.Background(), os.Getenv("MY_IDENTITY")) 29 + host, err := atputils.GetPDSFromHandle(context.Background(), atputils.GetMyHandle()) 29 30 if err != nil { 30 31 panic(err) 31 32 } 32 - pdshost, err := atputils.GetPDSFromDid(context.Background(), did, http.DefaultClient) 33 + did, err := atputils.GetMyDid(context.Background()) 33 34 if err != nil { 34 35 panic(err) 35 36 } 36 - xrpc := oauth.NewXRPCClient(db, logger, pdshost, did) 37 + xrpc := oauth.NewPasswordClient(did, host, logger) 37 38 err = xrpc.CreateSession(context.Background()) 38 39 if err != nil { 39 40 panic(err) 40 41 } 41 - err = xrpc.CreateXCVRSignet(lex.SignetRecord{ 42 + _, _, err = xrpc.CreateXCVRSignet(&lex.SignetRecord{ 42 43 ChannelURI: "beep.boop", 43 44 LRCID: 11, 44 45 Author: "sneep.snirp", ··· 46 47 if err != nil { 47 48 panic(err) 48 49 } 49 - h := &Handler{db, sessionStore, mux, logger, oauthserv, xrpc} 50 + clientmap := oauth.NewClientMap() 51 + h := &Handler{db, sessionStore, mux, logger, oauthserv, xrpc, clientmap} 50 52 // lrc handlers 51 53 mux.HandleFunc("GET /lrc/{user}/{rkey}/ws", h.acceptWebsocket) 52 54 mux.HandleFunc("POST /lrc/channel", h.postChannel)
+109
server/internal/handler/lrcHandlers.go
··· 1 1 package handler 2 2 3 3 import ( 4 + "encoding/json" 5 + "errors" 4 6 "fmt" 7 + "github.com/bluesky-social/indigo/atproto/syntax" 5 8 "net/http" 9 + "time" 10 + "xcvr-backend/internal/atputils" 11 + "xcvr-backend/internal/lex" 6 12 "xcvr-backend/internal/model" 13 + "xcvr-backend/internal/types" 7 14 ) 8 15 9 16 func (h *Handler) acceptWebsocket(w http.ResponseWriter, r *http.Request) { ··· 21 28 } 22 29 23 30 func (h *Handler) postChannel(w http.ResponseWriter, r *http.Request) { 31 + session, _ := h.sessionStore.Get(r, "oauthsession") 32 + _, ok := session.Values["id"].(uint) 33 + if !ok { 34 + h.postMyChannel(w, r) 35 + return 36 + } 37 + client, err := h.getClient(r) 38 + if err != nil { 39 + h.serverError(w, errors.New("couldn't find client: "+err.Error())) 40 + return 41 + } 42 + 43 + lcr, now, err := h.parseChannelRequest(r) 44 + if err != nil { 45 + h.badRequest(w, err) 46 + return 47 + } 48 + uri, cid, err := client.CreateXCVRChannel(lcr, r.Context()) 49 + if err != nil { 50 + h.serverError(w, errors.New("something bad probs happened when posting a channel "+err.Error())) 51 + return 52 + } 53 + channel := types.Channel{ 54 + URI: uri, 55 + CID: cid, 56 + DID: session.Values["did"].(string), 57 + Host: lcr.Host, 58 + Title: lcr.Title, 59 + Topic: lcr.Topic, 60 + CreatedAt: *now, 61 + IndexedAt: time.Now(), 62 + } 63 + err = h.db.StoreChannel(channel, r.Context()) 64 + if err != nil { 65 + h.serverError(w, errors.New("well... the record posted but i couldn't store it: "+err.Error())) 66 + return 67 + } 68 + h.getChannels(w, r) 69 + } 70 + 71 + func (h *Handler) parseChannelRequest(r *http.Request) (*lex.ChannelRecord, *time.Time, error) { 72 + var cr types.PostChannelRequest 73 + decoder := json.NewDecoder(r.Body) 74 + err := decoder.Decode(&cr) 75 + if err != nil { 76 + return nil, nil, errors.New("i think they messed up: " + err.Error()) 77 + } 78 + 79 + var lcr lex.ChannelRecord 80 + if cr.Title == "" || atputils.ValidateGraphemesAndLength(cr.Title, 64, 640) { 81 + return nil, nil, errors.New("title empty or too long") 82 + } 83 + lcr.Title = cr.Title 84 + if cr.Host == "" { 85 + return nil, nil, errors.New("no host") 86 + } 87 + lcr.Host = cr.Host 88 + if cr.Topic != nil { 89 + if atputils.ValidateGraphemesAndLength(*cr.Topic, 256, 2560) { 90 + return nil, nil, errors.New("topic too long") 91 + } 92 + lcr.Topic = cr.Topic 93 + } 94 + 95 + dtn := syntax.DatetimeNow() 96 + lcr.CreatedAt = dtn.String() 97 + time := dtn.Time() 98 + return &lcr, &time, nil 99 + } 100 + 101 + func (h *Handler) postMyChannel(w http.ResponseWriter, r *http.Request) { 102 + lcr, now, err := h.parseChannelRequest(r) 103 + if err != nil { 104 + h.badRequest(w, err) 105 + return 106 + } 107 + cid, uri, err := h.myClient.CreateXCVRChannel(lcr, r.Context()) 108 + if err != nil { 109 + h.serverError(w, err) 110 + return 111 + } 112 + mydid, err := atputils.GetMyDid(r.Context()) 113 + if err != nil { 114 + h.serverError(w, err) 115 + return 116 + } 117 + channel := types.Channel{ 118 + URI: uri, 119 + CID: cid, 120 + DID: mydid, 121 + Host: lcr.Host, 122 + Title: lcr.Title, 123 + Topic: lcr.Topic, 124 + CreatedAt: *now, 125 + IndexedAt: time.Now(), 126 + } 127 + err = h.db.StoreChannel(channel, r.Context()) 128 + if err != nil { 129 + h.serverError(w, errors.New("sooo... the record posted but i couldn't store it: "+err.Error())) 130 + return 131 + } 132 + h.getChannels(w, r) 24 133 25 134 } 26 135
+35 -1
server/internal/handler/oauthHandlers.go
··· 11 11 "xcvr-backend/internal/atputils" 12 12 "xcvr-backend/internal/lex" 13 13 "xcvr-backend/internal/oauth" 14 + "xcvr-backend/internal/types" 14 15 15 16 "github.com/gorilla/sessions" 16 17 "github.com/haileyok/atproto-oauth-golang/helpers" ··· 144 145 Status: &status, 145 146 Color: &color, 146 147 } 147 - err = h.xrpc.CreateXCVRProfile(defaultprofilerecord, OauthSession, context.Background()) 148 + client := h.setupClient(OauthSession) 149 + err = client.CreateXCVRProfile(defaultprofilerecord, context.Background()) 148 150 if err != nil { 149 151 h.logger.Println("#that happened (something went wrong when creating profile) " + err.Error()) 150 152 } ··· 157 159 } 158 160 session.Values = map[any]any{} 159 161 session.Values["did"] = req.Did 162 + session.Values["id"] = req.ID 160 163 err = session.Save(r, w) 161 164 if err != nil { 162 165 h.serverError(w, err) ··· 220 223 } 221 224 h.logger.Deprintln("handling nil error?") 222 225 } 226 + 227 + func (h *Handler) getClient(r *http.Request) (*oauth.OauthXRPCClient, error) { 228 + s, _ := h.sessionStore.Get(r, "oauthsession") 229 + id, ok := s.Values["id"].(uint) 230 + if !ok { 231 + return nil, errors.New("not authorized") 232 + } 233 + client := h.clientmap.Map(id) 234 + if client == nil { 235 + client, err := h.resetClient(id, r.Context()) 236 + if err != nil { 237 + return nil, err 238 + } 239 + return client, nil 240 + } 241 + return client, nil 242 + } 243 + 244 + func (h *Handler) resetClient(id uint, ctx context.Context) (*oauth.OauthXRPCClient, error) { 245 + session, err := h.db.GetOauthSession(id, ctx) 246 + if err != nil { 247 + return nil, errors.New("errpr setting up session: " + err.Error()) 248 + } 249 + return h.setupClient(session), nil 250 + } 251 + 252 + func (h *Handler) setupClient(session *types.Session) *oauth.OauthXRPCClient { 253 + client := oauth.NewOauthXRPCClient(h.db, h.logger, session) 254 + h.clientmap.Append(session.ID, client, session.Expiration) 255 + return client 256 + }
+30 -46
server/internal/handler/xcvrHandlers.go
··· 3 3 import ( 4 4 "encoding/json" 5 5 "errors" 6 - "github.com/rivo/uniseg" 7 6 "net/http" 8 - "unicode/utf16" 7 + "xcvr-backend/internal/atputils" 9 8 "xcvr-backend/internal/db" 10 9 "xcvr-backend/internal/lex" 11 10 "xcvr-backend/internal/types" ··· 27 26 var pu db.ProfileUpdate 28 27 pu.DID = did 29 28 if p.DisplayName != nil { 30 - if uniseg.GraphemeClusterCount(*p.DisplayName) > 64 { 31 - h.badRequest(w, errors.New("too many graphemes")) 32 - return 33 - } 34 - runes := []rune(*p.DisplayName) 35 - us := utf16.Encode(runes) 36 - if len(us) > 640 { 37 - h.badRequest(w, errors.New("too many utf16 code points")) 29 + if atputils.ValidateGraphemesAndLength(*p.DisplayName, 64, 640) { 30 + h.badRequest(w, errors.New("displayname too long")) 38 31 return 39 32 } 40 33 pu.Name = p.DisplayName 41 34 pu.UpdateName = true 42 35 } 43 36 if p.DefaultNick != nil { 44 - runes := []rune(*p.DefaultNick) 45 - us := utf16.Encode(runes) 46 - if len(us) > 16 { 47 - h.badRequest(w, errors.New("too many utf16 code points")) 48 - return 37 + if atputils.ValidateLength(*p.DefaultNick, 16) { 38 + h.badRequest(w, errors.New("nick too long")) 49 39 } 50 40 pu.Nick = p.DefaultNick 51 41 pu.UpdateNick = true 52 42 } 53 43 if p.Status != nil { 54 - if uniseg.GraphemeClusterCount(*p.DisplayName) > 640 { 55 - h.badRequest(w, errors.New("too many graphemes")) 56 - return 57 - } 58 - runes := []rune(*p.DisplayName) 59 - us := utf16.Encode(runes) 60 - if len(us) > 6400 { 61 - h.badRequest(w, errors.New("too many utf16 code points")) 62 - return 44 + if atputils.ValidateGraphemesAndLength(*p.Status, 640, 6400) { 45 + h.badRequest(w, errors.New("status too long")) 63 46 } 64 47 pu.Status = p.Status 65 48 pu.UpdateStatus = true ··· 76 59 pu.Color = p.Color 77 60 pu.UpdateColor = true 78 61 } 79 - err = h.db.UpdateProfile(pu, r.Context()) 80 - if err != nil { 81 - h.serverError(w, errors.New("error updating profile: "+err.Error())) 82 - return 83 - } 84 - 85 - //TODO switch order, only update db after we know the xrpc req went through correctly! 86 - 87 62 session, _ := h.sessionStore.Get(r, "oauthsession") 88 - did, ok := session.Values["did"].(string) 89 - if !ok || did == "" { 90 - h.badRequest(w, errors.New("cannot beep, not authenticated")) 63 + _, ok := session.Values["id"].(uint) 64 + if !ok { 65 + h.badRequest(w, errors.New("cannot update profile, not authenticated")) 66 + return 91 67 } 92 - s, err := h.db.GetOauthSesson(did, r.Context()) 93 68 profilerecord := lex.ProfileRecord{ 94 69 DisplayName: p.DisplayName, 95 70 DefaultNick: p.DefaultNick, 96 71 Status: p.Status, 97 72 Color: p.Color, 98 73 } 99 - err = h.xrpc.UpdateXCVRProfile(profilerecord, s, r.Context()) 74 + client, err := h.getClient(r) 75 + if err != nil { 76 + h.serverError(w, err) 77 + return 78 + } 79 + err = client.UpdateXCVRProfile(profilerecord, r.Context()) 100 80 if err != nil { 101 - h.logger.Deprintf("error updating profilerecord: %s", err.Error()) 81 + h.serverError(w, err) 82 + return 102 83 } 84 + 85 + err = h.db.UpdateProfile(pu, r.Context()) 86 + if err != nil { 87 + h.serverError(w, errors.New("error updating profile: "+err.Error())) 88 + return 89 + } 90 + 103 91 h.serveProfileView(did, handle, w, r) 104 92 } 105 93 106 94 func (h *Handler) beep(w http.ResponseWriter, r *http.Request) { 107 - session, _ := h.sessionStore.Get(r, "oauthsession") 108 - did, ok := session.Values["did"].(string) 109 - if !ok || did == "" { 110 - h.badRequest(w, errors.New("cannot beep, not authenticated")) 111 - } 112 - s, err := h.db.GetOauthSesson(did, r.Context()) 95 + client, err := h.getClient(r) 96 + 113 97 if err != nil { 114 - h.serverError(w, errors.New("error finding session: "+err.Error())) 98 + h.serverError(w, errors.New("error finding client: "+err.Error())) 115 99 } 116 - h.xrpc.MakeBskyPost("beep_", s, r.Context()) 100 + client.MakeBskyPost("beep_", r.Context()) 117 101 }
+6 -2
server/internal/log/log.go
··· 4 4 "fmt" 5 5 "io" 6 6 "log" 7 + "log/slog" 7 8 ) 8 9 9 10 type Logger struct { 10 11 debugLogger *log.Logger 11 - prodLogger *log.Logger 12 + prodLogger *log.Logger 13 + Slog *slog.Logger 12 14 } 13 15 14 16 func New(w io.Writer, verbose bool) Logger { ··· 17 19 if verbose { 18 20 l.debugLogger = log.New(w, "[debug]", log.Ldate|log.Ltime) 19 21 } 22 + slog.New(slog.NewTextHandler(w, nil)) 20 23 return l 21 24 } 22 25 ··· 38 41 39 42 func (l *Logger) Printf(format string, args ...any) { 40 43 l.Println(fmt.Sprintf(format, args...)) 41 - } 44 + } 45 +
+56
server/internal/oauth/clientmapper.go
··· 1 + package oauth 2 + 3 + import ( 4 + "sync" 5 + "time" 6 + ) 7 + 8 + type ClientMap struct { 9 + clients map[uint]*OauthXRPCClient 10 + expiry map[uint]time.Time 11 + mu sync.Mutex 12 + } 13 + 14 + func NewClientMap() *ClientMap { 15 + return &ClientMap{ 16 + clients: make(map[uint]*OauthXRPCClient, 10), 17 + expiry: make(map[uint]time.Time, 10), 18 + } 19 + } 20 + 21 + func (c *ClientMap) Map(id uint) *OauthXRPCClient { 22 + c.mu.Lock() 23 + defer c.mu.Unlock() 24 + return c.clients[id] 25 + } 26 + 27 + func (c *ClientMap) Append(id uint, client *OauthXRPCClient, expiration time.Time) { 28 + c.mu.Lock() 29 + defer c.mu.Unlock() 30 + c.clients[id] = client 31 + c.expiry[id] = expiration 32 + } 33 + 34 + func (c *ClientMap) Cleanup() { 35 + now := time.Now() 36 + c.mu.Lock() 37 + defer c.mu.Unlock() 38 + for id, client := range c.clients { 39 + expiry, ok := c.expiry[id] 40 + if !ok { 41 + delete(c.expiry, id) 42 + delete(c.clients, id) 43 + continue 44 + } 45 + if client == nil { 46 + delete(c.expiry, id) 47 + delete(c.clients, id) 48 + continue 49 + } 50 + if expiry.After(now) { 51 + delete(c.expiry, id) 52 + delete(c.clients, id) 53 + continue 54 + } 55 + } 56 + }
+179
server/internal/oauth/oauthclient.go
··· 1 + package oauth 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "github.com/bluesky-social/indigo/api/atproto" 7 + "github.com/bluesky-social/indigo/api/bsky" 8 + "github.com/bluesky-social/indigo/atproto/client" 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "github.com/bluesky-social/indigo/lex/util" 11 + "github.com/haileyok/atproto-oauth-golang" 12 + "github.com/haileyok/atproto-oauth-golang/helpers" 13 + 14 + "xcvr-backend/internal/db" 15 + "xcvr-backend/internal/lex" 16 + "xcvr-backend/internal/log" 17 + "xcvr-backend/internal/types" 18 + ) 19 + 20 + type OauthXRPCClient struct { 21 + xrpc *oauth.XrpcClient 22 + session *types.Session 23 + logger *log.Logger 24 + } 25 + 26 + func NewOauthXRPCClient(s *db.Store, l *log.Logger, session *types.Session) *OauthXRPCClient { 27 + return &OauthXRPCClient{ 28 + xrpc: &oauth.XrpcClient{ 29 + OnDpopPdsNonceChanged: func(did, newNonce string) { 30 + err := s.SetDpopPdsNonce(session.ID, newNonce) 31 + if err != nil { 32 + l.Println(err.Error()) 33 + return 34 + } 35 + session.DpopPdsNonce = newNonce 36 + }, 37 + }, 38 + session: session, 39 + logger: l, 40 + } 41 + } 42 + 43 + func (c *OauthXRPCClient) getOauthSessionAuthArgs() (*oauth.XrpcAuthedRequestArgs, error) { 44 + s := c.session 45 + privateJwk, err := helpers.ParseJWKFromBytes([]byte(s.DpopPrivKey)) 46 + if err != nil { 47 + return nil, errors.New("failed to parse jwk in getoauthsessionauthargs: " + err.Error()) 48 + } 49 + return &oauth.XrpcAuthedRequestArgs{ 50 + Did: s.Did, 51 + AccessToken: s.AccessToken, 52 + PdsUrl: s.PdsUrl, 53 + Issuer: s.AuthserverIss, 54 + DpopPdsNonce: s.DpopPdsNonce, 55 + DpopPrivateJwk: privateJwk, 56 + }, nil 57 + } 58 + 59 + func (c *OauthXRPCClient) MakeBskyPost(text string, ctx context.Context) error { 60 + authargs, err := c.getOauthSessionAuthArgs() 61 + if err != nil { 62 + return errors.New("failed to get oauthsessionauthargs while making post: " + err.Error()) 63 + } 64 + post := bsky.FeedPost{ 65 + Text: text, 66 + CreatedAt: syntax.DatetimeNow().String(), 67 + } 68 + input := atproto.RepoCreateRecord_Input{ 69 + Collection: "app.bsky.feed.post", 70 + Repo: authargs.Did, 71 + Record: &util.LexiconTypeDecoder{Val: &post}, 72 + } 73 + var out atproto.RepoCreateRecord_Output 74 + err = c.xrpc.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 75 + if err != nil { 76 + return errors.New("oops! failed to make post: " + err.Error()) 77 + } 78 + return nil 79 + } 80 + 81 + func (c *OauthXRPCClient) CreateXCVRProfile(profile lex.ProfileRecord, ctx context.Context) error { 82 + authargs, err := c.getOauthSessionAuthArgs() 83 + if err != nil { 84 + return errors.New("failed to get oauthsessionauthargs while making post: " + err.Error()) 85 + } 86 + getOut, err := getProfileRecord(authargs.PdsUrl, authargs.Did, ctx) 87 + if err != nil { 88 + return errors.New("failed to getProfileRecord while creating XCVR profile: " + err.Error()) 89 + } 90 + if getOut.Cid != nil { 91 + return errors.New("there already is a profileRecord, I don't want to overwrite it") 92 + } 93 + rkey := "self" 94 + input := atproto.RepoCreateRecord_Input{ 95 + Collection: "org.xcvr.actor.profile", 96 + Repo: authargs.Did, 97 + Rkey: &rkey, 98 + Record: &util.LexiconTypeDecoder{Val: &profile}, 99 + } 100 + var out atproto.RepoCreateRecord_Output 101 + err = c.xrpc.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 102 + if err != nil { 103 + return errors.New("oops! failed to create a profile: " + err.Error()) 104 + } 105 + return nil 106 + } 107 + 108 + func (c *OauthXRPCClient) CreateXCVRChannel(channel *lex.ChannelRecord, ctx context.Context) (uri string, cid string, err error) { 109 + authargs, err := c.getOauthSessionAuthArgs() 110 + if err != nil { 111 + err = errors.New("yikers! couldn't createXCVRChannel: " + err.Error()) 112 + return 113 + } 114 + input := atproto.RepoCreateRecord_Input{ 115 + Collection: "org.xcvr.feed.channel", 116 + Repo: authargs.Did, 117 + Record: &util.LexiconTypeDecoder{Val: channel}, 118 + } 119 + var out atproto.RepoCreateRecord_Output 120 + err = c.xrpc.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 121 + if err != nil { 122 + err = errors.New("that's not good! failed to create a XCVRChannel: " + err.Error()) 123 + return 124 + } 125 + uri = out.Uri 126 + cid = out.Cid 127 + return 128 + } 129 + 130 + func (c *OauthXRPCClient) CreateXCVRMessage(message lex.MessageRecord, ctx context.Context) error { 131 + authargs, err := c.getOauthSessionAuthArgs() 132 + if err != nil { 133 + return errors.New("uh oh... I couldn't make a XCVRMessage: " + err.Error()) 134 + } 135 + input := atproto.RepoCreateRecord_Input{ 136 + Collection: "org.xcvr.lrc.message", 137 + Repo: authargs.Did, 138 + Record: &util.LexiconTypeDecoder{Val: &message}, 139 + } 140 + var out atproto.RepoCreateRecord_Output 141 + err = c.xrpc.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 142 + if err != nil { 143 + return errors.New("i've got a bad feeling aobut this... failed to create XCVRMessage: " + err.Error()) 144 + } 145 + return nil 146 + } 147 + 148 + func (c *OauthXRPCClient) UpdateXCVRProfile(profile lex.ProfileRecord, ctx context.Context) error { 149 + authargs, err := c.getOauthSessionAuthArgs() 150 + if err != nil { 151 + return errors.New("failed to get oauthsessionauthargs while making post: " + err.Error()) 152 + } 153 + getOut, err := getProfileRecord(authargs.PdsUrl, authargs.Did, ctx) 154 + if err != nil { 155 + return errors.New("messed that up! " + err.Error()) 156 + } 157 + if getOut.Cid == nil { 158 + return c.CreateXCVRProfile(profile, ctx) 159 + } 160 + rkey := "self" 161 + input := atproto.RepoPutRecord_Input{ 162 + Collection: "org.xcvr.actor.profile", 163 + Repo: authargs.Did, 164 + Rkey: rkey, 165 + Record: &util.LexiconTypeDecoder{Val: &profile}, 166 + SwapRecord: getOut.Cid, 167 + } 168 + var out atproto.RepoPutRecord_Output 169 + err = c.xrpc.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.putRecord", nil, input, &out) 170 + if err != nil { 171 + return errors.New("oops! failed to update a profile: " + err.Error()) 172 + } 173 + return nil 174 + } 175 + 176 + func getProfileRecord(pdsUrl string, did string, ctx context.Context) (*atproto.RepoGetRecord_Output, error) { 177 + cli := client.NewAPIClient(pdsUrl) 178 + return atproto.RepoGetRecord(ctx, cli, "", "org.xcvr.actor.profile", did, "self") 179 + }
+112
server/internal/oauth/passwordclient.go
··· 1 + package oauth 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "github.com/bluesky-social/indigo/api/atproto" 8 + "github.com/bluesky-social/indigo/atproto/client" 9 + "github.com/bluesky-social/indigo/lex/util" 10 + "os" 11 + "xcvr-backend/internal/lex" 12 + "xcvr-backend/internal/log" 13 + ) 14 + 15 + type PasswordClient struct { 16 + logger *log.Logger 17 + xrpc *client.APIClient 18 + accessjwt *string 19 + refreshjwt *string 20 + did *string 21 + } 22 + 23 + func NewPasswordClient(did string, host string, l *log.Logger) *PasswordClient { 24 + return &PasswordClient{ 25 + xrpc: client.NewAPIClient(host), 26 + did: &did, 27 + logger: l, 28 + } 29 + } 30 + 31 + func (c *PasswordClient) CreateSession(ctx context.Context) error { 32 + c.logger.Deprintln("creating session...") 33 + secret := os.Getenv("MY_SECRET") 34 + identity := os.Getenv("MY_IDENTITY") 35 + input := atproto.ServerCreateSession_Input{ 36 + Identifier: identity, 37 + Password: secret, 38 + } 39 + var out atproto.ServerCreateSession_Output 40 + err := c.xrpc.LexDo(ctx, "POST", "application/json", "com.atproto.server.createSession", nil, input, &out) 41 + if err != nil { 42 + return errors.New("I couldn't create a session: " + err.Error()) 43 + } 44 + c.accessjwt = &out.AccessJwt 45 + c.refreshjwt = &out.RefreshJwt 46 + c.logger.Deprintln("created session!") 47 + return nil 48 + } 49 + 50 + func (c *PasswordClient) RefreshSession(ctx context.Context) error { 51 + c.logger.Deprintln("refreshing session") 52 + c.xrpc.Headers.Set("Authorization", fmt.Sprintf("Bearer %s", *c.refreshjwt)) 53 + var out atproto.ServerRefreshSession_Output 54 + err := c.xrpc.LexDo(ctx, "POST", "application/json", "com.atproto.server.refreshSession", nil, nil, &out) 55 + if err != nil { 56 + c.logger.Println("FAILED TO REFRESH RESSION") 57 + return errors.New("failed to refresh session! " + err.Error()) 58 + } 59 + c.accessjwt = &out.AccessJwt 60 + c.refreshjwt = &out.RefreshJwt 61 + c.logger.Deprintln("refreshed session!") 62 + return nil 63 + } 64 + 65 + func (c *PasswordClient) CreateXCVRSignet(signet *lex.SignetRecord, ctx context.Context) (cid string, uri string, err error) { 66 + input := atproto.RepoCreateRecord_Input{ 67 + Collection: "org.xcvr.lrc.signet", 68 + Repo: *c.did, 69 + Record: &util.LexiconTypeDecoder{Val: signet}, 70 + } 71 + return c.createMyRecord(input, ctx) 72 + } 73 + 74 + func (c *PasswordClient) CreateXCVRChannel(channel *lex.ChannelRecord, ctx context.Context) (cid string, uri string, err error) { 75 + input := atproto.RepoCreateRecord_Input{ 76 + Collection: "org.xcvr.lrc.channel", 77 + Repo: *c.did, 78 + Record: &util.LexiconTypeDecoder{Val: channel}, 79 + } 80 + return c.createMyRecord(input, ctx) 81 + } 82 + 83 + func (c *PasswordClient) createMyRecord(input atproto.RepoCreateRecord_Input, ctx context.Context) (cid string, uri string, err error) { 84 + if c.accessjwt == nil { 85 + err = errors.New("must create a session first") 86 + return 87 + } 88 + c.xrpc.Headers.Set("Authorization", fmt.Sprintf("Bearer %s", *c.accessjwt)) 89 + var out atproto.RepoCreateRecord_Output 90 + err = c.xrpc.LexDo(ctx, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 91 + if err != nil { 92 + err1 := err.Error() 93 + err = c.RefreshSession(ctx) 94 + if err != nil { 95 + err = errors.New(fmt.Sprintf("failed to refresh session while creating %s! first %s then %s", input.Collection, err1, err.Error())) 96 + return 97 + } 98 + c.xrpc.Headers.Set("Authorization", fmt.Sprintf("Bearer %s", *c.accessjwt)) 99 + out = atproto.RepoCreateRecord_Output{} 100 + err = c.xrpc.LexDo(ctx, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 101 + if err != nil { 102 + err = errors.New(fmt.Sprintf("not good, failed to create %s after failing then refreshing session! first %s then %s", input.Collection, err1, err.Error())) 103 + return 104 + } 105 + cid = out.Cid 106 + uri = out.Uri 107 + return 108 + } 109 + cid = out.Cid 110 + uri = out.Uri 111 + return 112 + }
-237
server/internal/oauth/xrpcclient.go
··· 1 - package oauth 2 - 3 - import ( 4 - "context" 5 - "errors" 6 - "fmt" 7 - "github.com/bluesky-social/indigo/api/atproto" 8 - "github.com/bluesky-social/indigo/api/bsky" 9 - "github.com/bluesky-social/indigo/atproto/client" 10 - "github.com/bluesky-social/indigo/atproto/syntax" 11 - "github.com/bluesky-social/indigo/lex/util" 12 - "github.com/haileyok/atproto-oauth-golang" 13 - "github.com/haileyok/atproto-oauth-golang/helpers" 14 - "os" 15 - 16 - "xcvr-backend/internal/db" 17 - "xcvr-backend/internal/lex" 18 - "xcvr-backend/internal/log" 19 - "xcvr-backend/internal/types" 20 - ) 21 - 22 - type Client struct { 23 - xrpccli *oauth.XrpcClient 24 - xcvrcli *client.APIClient 25 - accessjwt *string 26 - refreshjwt *string 27 - did *string 28 - logger *log.Logger 29 - } 30 - 31 - func NewXRPCClient(s *db.Store, l *log.Logger, host string, did string) *Client { 32 - return &Client{ 33 - xrpccli: &oauth.XrpcClient{ 34 - OnDpopPdsNonceChanged: func(did, newNonce string) { 35 - err := s.SetDpopPdsNonce(did, newNonce) 36 - if err != nil { 37 - l.Deprintln(err.Error()) 38 - } 39 - }, 40 - }, xcvrcli: client.NewAPIClient(host), did: &did, logger: l, 41 - } 42 - } 43 - 44 - func getOauthSessionAuthArgs(s *types.Session) (*oauth.XrpcAuthedRequestArgs, error) { 45 - privateJwk, err := helpers.ParseJWKFromBytes([]byte(s.DpopPrivKey)) 46 - if err != nil { 47 - return nil, errors.New("failed to parse jwk in getoauthsessionauthargs: " + err.Error()) 48 - } 49 - return &oauth.XrpcAuthedRequestArgs{ 50 - Did: s.Did, 51 - AccessToken: s.AccessToken, 52 - PdsUrl: s.PdsUrl, 53 - Issuer: s.AuthserverIss, 54 - DpopPdsNonce: s.DpopPdsNonce, 55 - DpopPrivateJwk: privateJwk, 56 - }, nil 57 - } 58 - 59 - func (c *Client) MakeBskyPost(text string, s *types.Session, ctx context.Context) error { 60 - authargs, err := getOauthSessionAuthArgs(s) 61 - if err != nil { 62 - return errors.New("failed to get oauthsessionauthargs while making post: " + err.Error()) 63 - } 64 - post := bsky.FeedPost{ 65 - Text: text, 66 - CreatedAt: syntax.DatetimeNow().String(), 67 - } 68 - input := atproto.RepoCreateRecord_Input{ 69 - Collection: "app.bsky.feed.post", 70 - Repo: authargs.Did, 71 - Record: &util.LexiconTypeDecoder{Val: &post}, 72 - } 73 - var out atproto.RepoCreateRecord_Output 74 - err = c.xrpccli.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 75 - if err != nil { 76 - return errors.New("oops! failed to make post: " + err.Error()) 77 - } 78 - return nil 79 - } 80 - 81 - func (c *Client) CreateXCVRProfile(profile lex.ProfileRecord, s *types.Session, ctx context.Context) error { 82 - authargs, err := getOauthSessionAuthArgs(s) 83 - if err != nil { 84 - return errors.New("failed to get oauthsessionauthargs while making post: " + err.Error()) 85 - } 86 - getOut, err := getProfileRecord(authargs.PdsUrl, authargs.Did, ctx) 87 - if err != nil { 88 - return errors.New("failed to getProfileRecord while creating XCVR profile: " + err.Error()) 89 - } 90 - if getOut.Cid != nil { 91 - return errors.New("there already is a profileRecord, I don't want to overwrite it") 92 - } 93 - rkey := "self" 94 - input := atproto.RepoCreateRecord_Input{ 95 - Collection: "org.xcvr.actor.profile", 96 - Repo: authargs.Did, 97 - Rkey: &rkey, 98 - Record: &util.LexiconTypeDecoder{Val: &profile}, 99 - } 100 - var out atproto.RepoCreateRecord_Output 101 - err = c.xrpccli.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 102 - if err != nil { 103 - return errors.New("oops! failed to create a profile: " + err.Error()) 104 - } 105 - return nil 106 - } 107 - 108 - func (c *Client) CreateXCVRChannel(channel lex.ChannelRecord, s *types.Session, ctx context.Context) error { 109 - authargs, err := getOauthSessionAuthArgs(s) 110 - if err != nil { 111 - return errors.New("yikers! couldn't createXCVRChannel: " + err.Error()) 112 - } 113 - input := atproto.RepoCreateRecord_Input{ 114 - Collection: "org.xcvr.feed.channel", 115 - Repo: authargs.Did, 116 - Record: &util.LexiconTypeDecoder{Val: &channel}, 117 - } 118 - var out atproto.RepoCreateRecord_Output 119 - err = c.xrpccli.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 120 - if err != nil { 121 - return errors.New("that's not good! failed to create a XCVRChannel: " + err.Error()) 122 - } 123 - return nil 124 - } 125 - 126 - func (c *Client) CreateXCVRMessage(message lex.MessageRecord, s *types.Session, ctx context.Context) error { 127 - authargs, err := getOauthSessionAuthArgs(s) 128 - if err != nil { 129 - return errors.New("uh oh... I couldn't make a XCVRMessage: " + err.Error()) 130 - } 131 - input := atproto.RepoCreateRecord_Input{ 132 - Collection: "org.xcvr.lrc.message", 133 - Repo: authargs.Did, 134 - Record: &util.LexiconTypeDecoder{Val: &message}, 135 - } 136 - var out atproto.RepoCreateRecord_Output 137 - err = c.xrpccli.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 138 - if err != nil { 139 - return errors.New("i've got a bad feeling aobut this... failed to create XCVRMessage: " + err.Error()) 140 - } 141 - return nil 142 - } 143 - 144 - func (c *Client) CreateSession(ctx context.Context) error { 145 - c.logger.Deprintln("creating session...") 146 - secret := os.Getenv("MY_SECRET") 147 - identity := os.Getenv("MY_IDENTITY") 148 - input := atproto.ServerCreateSession_Input{ 149 - Identifier: identity, 150 - Password: secret, 151 - } 152 - var out atproto.ServerCreateSession_Output 153 - err := c.xcvrcli.LexDo(ctx, "POST", "application/json", "com.atproto.server.createSession", nil, input, &out) 154 - if err != nil { 155 - return errors.New("I couldn't create a session: " + err.Error()) 156 - } 157 - c.accessjwt = &out.AccessJwt 158 - c.refreshjwt = &out.RefreshJwt 159 - c.logger.Deprintln("created session!") 160 - return nil 161 - } 162 - 163 - func (c *Client) RefreshSession(ctx context.Context) error { 164 - c.logger.Deprintln("refreshing session") 165 - c.xcvrcli.Headers.Set("Authorization", fmt.Sprintf("Bearer %s", *c.refreshjwt)) 166 - var out atproto.ServerRefreshSession_Output 167 - err := c.xcvrcli.LexDo(ctx, "POST", "application/json", "com.atproto.server.refreshSession", nil, nil, &out) 168 - if err != nil { 169 - c.logger.Println("FAILED TO REFRESH RESSION") 170 - return errors.New("failed to refresh session! " + err.Error()) 171 - } 172 - c.accessjwt = &out.AccessJwt 173 - c.refreshjwt = &out.RefreshJwt 174 - c.logger.Deprintln("refreshed session!") 175 - return nil 176 - } 177 - 178 - func (c *Client) CreateXCVRSignet(signet lex.SignetRecord, ctx context.Context) error { 179 - if c.accessjwt == nil { 180 - return errors.New("must create a session first") 181 - } 182 - c.xcvrcli.Headers.Set("Authorization", fmt.Sprintf("Bearer %s", *c.accessjwt)) 183 - input := atproto.RepoCreateRecord_Input{ 184 - Collection: "org.xcvr.lrc.signet", 185 - Repo: *c.did, 186 - Record: &util.LexiconTypeDecoder{Val: &signet}, 187 - } 188 - var out atproto.RepoCreateRecord_Output 189 - err := c.xcvrcli.LexDo(ctx, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 190 - if err != nil { 191 - err1 := err.Error() 192 - err = c.RefreshSession(ctx) 193 - if err != nil { 194 - return errors.New("failed to refresh session while creating signet! first " + err1 + " then " + err.Error()) 195 - } 196 - c.xcvrcli.Headers.Set("Authorization", fmt.Sprintf("Bearer %s", *c.accessjwt)) 197 - out = atproto.RepoCreateRecord_Output{} 198 - err = c.xcvrcli.LexDo(ctx, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) 199 - if err != nil { 200 - return errors.New("not good, failed to create signet after failing then refreshing session! first " + err1 + " then " + err.Error()) 201 - } 202 - } 203 - return nil 204 - } 205 - 206 - func (c *Client) UpdateXCVRProfile(profile lex.ProfileRecord, s *types.Session, ctx context.Context) error { 207 - authargs, err := getOauthSessionAuthArgs(s) 208 - if err != nil { 209 - return errors.New("failed to get oauthsessionauthargs while making post: " + err.Error()) 210 - } 211 - getOut, err := getProfileRecord(authargs.PdsUrl, authargs.Did, ctx) 212 - if err != nil { 213 - return errors.New("messed that up! " + err.Error()) 214 - } 215 - if getOut.Cid == nil { 216 - return c.CreateXCVRProfile(profile, s, ctx) 217 - } 218 - rkey := "self" 219 - input := atproto.RepoPutRecord_Input{ 220 - Collection: "org.xcvr.actor.profile", 221 - Repo: authargs.Did, 222 - Rkey: rkey, 223 - Record: &util.LexiconTypeDecoder{Val: &profile}, 224 - SwapRecord: getOut.Cid, 225 - } 226 - var out atproto.RepoPutRecord_Output 227 - err = c.xrpccli.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.putRecord", nil, input, &out) 228 - if err != nil { 229 - return errors.New("oops! failed to update a profile: " + err.Error()) 230 - } 231 - return nil 232 - } 233 - 234 - func getProfileRecord(pdsUrl string, did string, ctx context.Context) (*atproto.RepoGetRecord_Output, error) { 235 - cli := client.NewAPIClient(pdsUrl) 236 - return atproto.RepoGetRecord(ctx, cli, "", "org.xcvr.actor.profile", did, "self") 237 - }
+6
server/internal/types/lexicons.go
··· 48 48 IndexedAt time.Time 49 49 } 50 50 51 + type PostChannelRequest struct { 52 + Title string `json:"title"` 53 + Topic *string `json:"topic,omitempty"` 54 + Host string `json:"host"` 55 + } 56 + 51 57 type ResolveChannelRequest struct { 52 58 DID *string `json:"did,omitempty"` 53 59 Handle *string `json:"handle,omitempty"`