backend for xcvr appview
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor to add record manager, probably borken

rachel-mp4 4c11118b ca2a6a5f

+811 -572
+3 -1
server/cmd/main.go
··· 12 12 "rvcx/internal/log" 13 13 "rvcx/internal/model" 14 14 "rvcx/internal/oauth" 15 + "rvcx/internal/recordmanager" 15 16 "time" 16 17 17 18 "github.com/joho/godotenv" ··· 56 57 logger.Println(err.Error()) 57 58 panic(err) 58 59 } 59 - h := handler.New(store, logger, oauthclient, xrpc, model) 60 + recordmanager := recordmanager.New(logger, store, xrpc, model) 61 + h := handler.New(store, logger, oauthclient, model, recordmanager) 60 62 go consumeLoop(context.Background(), store, logger, xrpc) 61 63 http.ListenAndServe(":8080", h.Serve()) 62 64
+6 -4
server/internal/atplistener/jetstream.go
··· 14 14 "rvcx/internal/lex" 15 15 "rvcx/internal/log" 16 16 "rvcx/internal/oauth" 17 + "rvcx/internal/recordmanager" 17 18 "rvcx/internal/types" 18 19 "time" 19 20 ) ··· 26 27 27 28 type handler struct { 28 29 db *db.Store 30 + rm *recordmanager.RecordManager 29 31 l *log.Logger 30 32 cli *oauth.PasswordClient 31 33 } 32 34 33 - func NewConsumer(jsAddr string, l *log.Logger, db *db.Store, cli *oauth.PasswordClient) *Consumer { 35 + func NewConsumer(jsAddr string, l *log.Logger, db *db.Store, cli *oauth.PasswordClient, rm *recordmanager.RecordManager) *Consumer { 34 36 cfg := client.DefaultClientConfig() 35 37 if jsAddr != "" { 36 38 cfg.WebsocketURL = jsAddr ··· 45 47 return &Consumer{ 46 48 cfg: cfg, 47 49 logger: l, 48 - handler: &handler{db: db, l: l, cli: cli}, 50 + handler: &handler{db: db, l: l, cli: cli, rm: rm}, 49 51 } 50 52 } 51 53 ··· 114 116 to.Status = pr.Status 115 117 to.UpdateColor = pr.Color != nil 116 118 to.Color = pr.Color 117 - return h.db.UpdateProfile(to, ctx) 119 + return h.db.UpdateProfile(&to, ctx) 118 120 } 119 121 120 122 func (h *handler) handleProfileDelete(ctx context.Context, event *models.Event) error { ··· 139 141 if err != nil { 140 142 return errors.New("i couldn't create the channel: " + err.Error()) 141 143 } 142 - return h.db.StoreChannel(channel, ctx) 144 + return h.rm.AcceptChannel(channel, ctx) 143 145 } 144 146 145 147 func (h *handler) handleChannelUpdate(ctx context.Context, event *models.Event) error {
+1 -1
server/internal/db/lexicon.go
··· 47 47 UpdateColor bool 48 48 } 49 49 50 - func (s *Store) UpdateProfile(to ProfileUpdate, ctx context.Context) error { 50 + func (s *Store) UpdateProfile(to *ProfileUpdate, ctx context.Context) error { 51 51 setParts := []string{} 52 52 args := []any{to.DID} 53 53 idx := 2
+4 -5
server/internal/handler/handler.go
··· 9 9 "rvcx/internal/log" 10 10 "rvcx/internal/model" 11 11 "rvcx/internal/oauth" 12 + "rvcx/internal/recordmanager" 12 13 ) 13 14 14 15 type Handler struct { ··· 17 18 router *http.ServeMux 18 19 logger *log.Logger 19 20 oauth *oauth.Service 20 - myClient *oauth.PasswordClient 21 - clientmap *oauth.ClientMap 22 21 model *model.Model 22 + rm *recordmanager.RecordManager 23 23 } 24 24 25 - func New(db *db.Store, logger *log.Logger, oauthserv *oauth.Service, xrpc *oauth.PasswordClient, model *model.Model) *Handler { 25 + func New(db *db.Store, logger *log.Logger, oauthserv *oauth.Service, model *model.Model, recordmanager *recordmanager.RecordManager) *Handler { 26 26 mux := http.NewServeMux() 27 27 sessionStore := sessions.NewCookieStore([]byte(os.Getenv("SESSION_KEY"))) 28 - clientmap := oauth.NewClientMap() 29 - h := &Handler{db, sessionStore, mux, logger, oauthserv, xrpc, clientmap, model} 28 + h := &Handler{db, sessionStore, mux, logger, oauthserv, model, recordmanager} 30 29 // lrc handlers 31 30 mux.HandleFunc("GET /lrc/{user}/{rkey}/ws", h.WithCORS(h.acceptWebsocket)) 32 31 mux.HandleFunc("DELETE /lrc/{user}/{rkey}/ws", h.deleteChannel)
+40 -223
server/internal/handler/lrcHandlers.go
··· 8 8 "net/http" 9 9 "os" 10 10 "rvcx/internal/atputils" 11 - "rvcx/internal/lex" 12 11 "rvcx/internal/types" 13 - "slices" 14 - "time" 15 - 16 - "github.com/bluesky-social/indigo/atproto/syntax" 17 - "github.com/rachel-mp4/lrcd" 18 12 ) 19 13 20 14 func (h *Handler) acceptWebsocket(w http.ResponseWriter, r *http.Request) { ··· 32 26 } 33 27 34 28 func (h *Handler) postChannel(w http.ResponseWriter, r *http.Request) { 35 - session, _ := h.sessionStore.Get(r, "oauthsession") 36 - _, ok := session.Values["id"].(int) 37 - if !ok { 38 - h.postMyChannel(w, r) 39 - return 40 - } 41 - client, err := h.getClient(r) 42 - if err != nil { 43 - h.serverError(w, errors.New("couldn't find client: "+err.Error())) 44 - return 45 - } 46 - lcr, now, err := h.parseChannelRequest(r) 29 + cr, err := h.parseChannelRequest(r) 47 30 if err != nil { 48 31 h.badRequest(w, err) 49 32 return 50 33 } 51 - uri, cid, err := client.CreateXCVRChannel(lcr, r.Context()) 52 - if err != nil { 53 - h.serverError(w, errors.New("something bad probs happened when posting a channel "+err.Error())) 54 - return 55 - } 56 - channel := types.Channel{ 57 - URI: uri, 58 - CID: cid, 59 - DID: session.Values["did"].(string), 60 - Host: lcr.Host, 61 - Title: lcr.Title, 62 - Topic: lcr.Topic, 63 - CreatedAt: *now, 64 - IndexedAt: time.Now(), 65 - } 66 - h.postPostChannelPostHandler(&channel, w, r) 67 - } 68 - 69 - func (h *Handler) parseChannelRequest(r *http.Request) (*lex.ChannelRecord, *time.Time, error) { 70 - var cr types.PostChannelRequest 71 - decoder := json.NewDecoder(r.Body) 72 - err := decoder.Decode(&cr) 73 - if err != nil { 74 - return nil, nil, errors.New("i think they messed up: " + err.Error()) 75 - } 76 - 77 - var lcr lex.ChannelRecord 78 - if cr.Title == "" || atputils.ValidateGraphemesAndLength(cr.Title, 64, 640) { 79 - return nil, nil, errors.New("title empty or too long") 80 - } 81 - lcr.Title = cr.Title 82 - if cr.Host == "" { 83 - return nil, nil, errors.New("no host") 84 - } 85 - lcr.Host = cr.Host 86 - if cr.Topic != nil { 87 - if atputils.ValidateGraphemesAndLength(*cr.Topic, 256, 2560) { 88 - return nil, nil, errors.New("topic too long") 34 + session, _ := h.sessionStore.Get(r, "oauthsession") 35 + id, ok := session.Values["id"].(int) 36 + var uri, did string 37 + if !ok { 38 + uri, did, err = h.rm.PostMyChannel(r.Context(), cr) 39 + } else { 40 + udid, ok := session.Values["did"].(string) 41 + if !ok { 42 + h.badRequest(w, errors.New("user session has no did?")) 43 + return 89 44 } 90 - lcr.Topic = cr.Topic 45 + uri, did, err = h.rm.PostChannel(id, udid, r.Context(), cr) 91 46 } 92 - 93 - dtn := syntax.DatetimeNow() 94 - lcr.CreatedAt = dtn.String() 95 - time := dtn.Time() 96 - return &lcr, &time, nil 97 - } 98 - 99 - func (h *Handler) postMyChannel(w http.ResponseWriter, r *http.Request) { 100 - lcr, now, err := h.parseChannelRequest(r) 101 - if err != nil { 102 - h.badRequest(w, err) 103 - return 104 - } 105 - cid, uri, err := h.myClient.CreateXCVRChannel(lcr, r.Context()) 106 47 if err != nil { 107 48 h.serverError(w, err) 108 49 return 109 50 } 110 - channel := types.Channel{ 111 - URI: uri, 112 - CID: cid, 113 - DID: atputils.GetMyDid(), 114 - Host: lcr.Host, 115 - Title: lcr.Title, 116 - Topic: lcr.Topic, 117 - CreatedAt: *now, 118 - IndexedAt: time.Now(), 119 - } 120 - h.postPostChannelPostHandler(&channel, w, r) 121 - } 122 - 123 - func (h *Handler) postPostChannelPostHandler(channel *types.Channel, w http.ResponseWriter, r *http.Request) { 124 - err := h.db.StoreChannel(channel, r.Context()) 125 - if err != nil { 126 - h.serverError(w, errors.New("well... the record posted but i couldn't store it: "+err.Error())) 127 - return 128 - } 129 - err = h.model.AddChannel(channel) 51 + handle, err := h.db.ResolveDid(did, r.Context()) 130 52 if err != nil { 131 - h.serverError(w, errors.New("very strange situation: "+err.Error())) 132 - return 133 - } 134 - handle, err := h.db.ResolveDid(channel.DID, r.Context()) 135 - if err != nil { 136 - handle, err = atputils.TryLookupDid(r.Context(), channel.DID) 53 + handle, err = atputils.TryLookupDid(r.Context(), did) 137 54 if err != nil { 138 55 h.serverError(w, errors.New("couldn't find handle")) 139 56 return 140 57 } 141 - go h.db.StoreDidHandle(channel.DID, handle, context.Background()) 58 + go h.db.StoreDidHandle(did, handle, context.Background()) 142 59 } 143 - rkey, _ := atputils.RkeyFromUri(channel.URI) 60 + rkey, _ := atputils.RkeyFromUri(uri) 144 61 http.Redirect(w, r, fmt.Sprintf("/c/%s/%s", handle, rkey), http.StatusSeeOther) 145 62 } 146 63 147 - func (h *Handler) parseMessageRequest(r *http.Request) (lmr *lex.MessageRecord, now *time.Time, handle *string, nonce []byte, err error) { 148 - var mr types.PostMessageRequest 149 - lmr = &lex.MessageRecord{} 64 + func (h *Handler) parseChannelRequest(r *http.Request) (*types.PostChannelRequest, error) { 65 + var cr types.PostChannelRequest 150 66 decoder := json.NewDecoder(r.Body) 151 - err = decoder.Decode(&mr) 67 + err := decoder.Decode(&cr) 152 68 if err != nil { 153 - err = errors.New("couldn't decode: " + err.Error()) 154 - return 69 + return nil, errors.New("i think they messed up: " + err.Error()) 155 70 } 156 - if mr.SignetURI == nil { 157 - if mr.MessageID == nil || mr.ChannelURI == nil { 158 - err = errors.New("must provide a way to determine signet") 159 - return 160 - } 161 - signetUri, signetHandle, yorks := h.db.QuerySignet(*mr.ChannelURI, *mr.MessageID, r.Context()) 162 - if yorks != nil { 163 - err = errors.New("i couldn't find the signet :c : " + yorks.Error()) 164 - return 165 - } 166 - mr.SignetURI = &signetUri 167 - handle = &signetHandle 168 - } else { 169 - signetHandle, yorks := h.db.QuerySignetHandle(*mr.SignetURI, r.Context()) 170 - if yorks != nil { 171 - err = errors.New("yorks skooby 💀" + yorks.Error()) 172 - return 173 - } 174 - handle = &signetHandle 175 - } 176 - lmr.SignetURI = *mr.SignetURI 177 - lmr.Body = mr.Body 178 - if mr.Nick != nil { 179 - nick := *mr.Nick 180 - if atputils.ValidateLength(nick, 16) { 181 - err = errors.New("that nick is too long") 182 - return 183 - } 184 - } 185 - lmr.Nick = mr.Nick 186 - 187 - if mr.Color != nil { 188 - color := uint64(*mr.Color) 189 - if color > 16777215 { 190 - err = errors.New("that color is too big") 191 - return 192 - } 193 - lmr.Color = &color 194 - } 195 - 196 - nonce = mr.Nonce 197 - nowsyn := syntax.DatetimeNow() 198 - lmr.PostedAt = nowsyn.String() 199 - nt := nowsyn.Time() 200 - now = &nt 201 - return 71 + return &cr, nil 202 72 } 203 73 204 - func (h *Handler) postMyMessage(w http.ResponseWriter, r *http.Request) { 205 - lmr, now, handle, nonce, err := h.parseMessageRequest(r) 74 + func (h *Handler) parseMessageRequest(r *http.Request) (*types.PostMessageRequest, error) { 75 + var mr types.PostMessageRequest 76 + decoder := json.NewDecoder(r.Body) 77 + err := decoder.Decode(&mr) 206 78 if err != nil { 207 - h.badRequest(w, errors.New("no good! "+err.Error())) 208 - return 79 + return nil, errors.New("couldn't decode: " + err.Error()) 209 80 } 210 - if handle == nil || *handle != atputils.GetMyHandle() { 211 - h.badRequest(w, errors.New("i only post my messages")) 212 - return 213 - } 214 - curi, mid, err := h.db.QuerySignetChannelIdNum(lmr.SignetURI, r.Context()) 215 - if err != nil { 216 - h.serverError(w, err) 217 - return 218 - } 219 - correctNonce := lrcd.GenerateNonce(mid, curi, os.Getenv("LRCD_SECRET")) 220 - if !slices.Equal(nonce, correctNonce) { 221 - h.badRequest(w, errors.New("i think user tried to post someone else's post")) 222 - return 223 - } 224 - uri, cid, err := h.myClient.CreateXCVRMessage(lmr, r.Context()) 225 - if err != nil { 226 - h.serverError(w, errors.New("that didn't go as planneD: "+err.Error())) 227 - return 228 - } 229 - did := atputils.GetMyDid() 230 - h.postPostMessagePostHandler(uri, cid, did, now, lmr, w, r) 81 + return &mr, nil 231 82 } 232 83 233 - func (h *Handler) postPostMessagePostHandler(uri, cid, did string, now *time.Time, lmr *lex.MessageRecord, w http.ResponseWriter, r *http.Request) { 234 - var coloruint32ptr *uint32 235 - if lmr.Color != nil { 236 - color := uint32(*lmr.Color) 237 - coloruint32ptr = &color 238 - } 239 - message := types.Message{ 240 - URI: uri, 241 - DID: did, 242 - CID: cid, 243 - SignetURI: lmr.SignetURI, 244 - Body: lmr.Body, 245 - Nick: lmr.Nick, 246 - Color: coloruint32ptr, 247 - PostedAt: *now, 248 - } 249 - err := h.db.StoreMessage(&message, r.Context()) 84 + func (h *Handler) postMessage(w http.ResponseWriter, r *http.Request) { 85 + pmr, err := h.parseMessageRequest(r) 250 86 if err != nil { 251 - h.serverError(w, errors.New("sooo... the record posted but i couldn't store it: "+err.Error())) 87 + h.badRequest(w, errors.New("failed to parse message request: "+err.Error())) 252 88 return 253 89 } 254 - curi, err := h.db.GetMsgChannelURI(lmr.SignetURI, r.Context()) 255 - if err != nil { 256 - h.serverError(w, errors.New("aaaaaaaaaaaa "+err.Error())) 257 - } 258 - h.model.BroadcastMessage(curi, message) 259 - h.getMessages(w, r) 260 - } 261 - 262 - func (h *Handler) postMessage(w http.ResponseWriter, r *http.Request) { 263 90 session, _ := h.sessionStore.Get(r, "oauthsession") 264 - _, ok := session.Values["id"].(int) 91 + id, ok := session.Values["id"].(int) 265 92 if !ok { 266 - h.postMyMessage(w, r) 267 - return 268 - } 269 - client, err := h.getClient(r) 270 - if err != nil { 271 - h.serverError(w, errors.New("couldn't find client: "+err.Error())) 272 - return 273 - } 274 - 275 - lmr, now, _, _, err := h.parseMessageRequest(r) 276 - if err != nil { 277 - h.badRequest(w, errors.New("couldn't parse message "+err.Error())) 278 - return 93 + err = h.rm.PostMyMessage(r.Context(), pmr) 94 + } else { 95 + did, ok := session.Values["did"].(string) 96 + if !ok { 97 + h.badRequest(w, errors.New("has sid but doesn't have sdid!")) 98 + } 99 + err = h.rm.PostMessage(id, did, r.Context(), pmr) 279 100 } 280 - 281 - uri, cid, err := client.CreateXCVRMessage(*lmr, r.Context()) 282 101 if err != nil { 283 - h.serverError(w, errors.New("couldn't add to user repo: ")) 284 - return 102 + h.serverError(w, errors.New("error posting message: "+err.Error())) 285 103 } 286 - did := session.Values["did"].(string) 287 - h.postPostMessagePostHandler(uri, cid, did, now, lmr, w, r) 104 + w.Write(nil) 288 105 } 289 106 290 107 func (h *Handler) deleteChannel(w http.ResponseWriter, r *http.Request) {
+10 -64
server/internal/handler/oauthHandlers.go
··· 9 9 "net/url" 10 10 "os" 11 11 "rvcx/internal/atputils" 12 - "rvcx/internal/lex" 13 12 "rvcx/internal/oauth" 14 - "rvcx/internal/types" 15 13 16 14 "github.com/gorilla/sessions" 17 15 "github.com/haileyok/atproto-oauth-golang/helpers" ··· 124 122 h.serverError(w, err) 125 123 return 126 124 } 127 - go func() { 128 - nick := "wanderer" 129 - status := "just setting up my xcvr" 130 - color := uint64(3602605) 131 - handle, err := h.db.ResolveDid(req.Did, context.Background()) 132 - if err != nil { 133 - h.logger.Println("i couldn't find the handle, so i couldn't create default profile record. gootbye") 134 - return 135 - } 136 - 137 - defaultprofilerecord := lex.ProfileRecord{ 138 - DisplayName: &handle, 139 - DefaultNick: &nick, 140 - Status: &status, 141 - Color: &color, 142 - } 143 - client := h.setupClient(OauthSession) 144 - pr, err := client.CreateXCVRProfile(defaultprofilerecord, context.Background()) 145 - if err != nil { 146 - h.logger.Println("i couldn't create their profile, which is bad_" + err.Error()) 147 - return 148 - } 149 - h.logger.Deprintln("initializing....") 150 - err = h.db.InitializeProfile(req.Did, pr.DisplayName, pr.DefaultNick, pr.Status, pr.Color, context.Background()) 151 - if err != nil { 152 - h.logger.Deprintln("failed to initialize profile: " + err.Error()) 153 - return 154 - } 155 - }() 125 + err = h.rm.CreateInitialProfile(req.Did, req.ID, r.Context()) 126 + if err != nil { 127 + h.serverError(w, err) 128 + return 129 + } 156 130 157 131 session.Options = &sessions.Options{ 158 132 Path: "/", ··· 222 196 h.logger.Deprintln("handling nil error?") 223 197 } 224 198 225 - func (h *Handler) getClient(r *http.Request) (*oauth.OauthXRPCClient, error) { 199 + func (h *Handler) oauthLogout(w http.ResponseWriter, r *http.Request) { 226 200 s, _ := h.sessionStore.Get(r, "oauthsession") 227 201 id, ok := s.Values["id"].(int) 228 - if !ok { 229 - return nil, errors.New("not authorized") 230 - } 231 - client := h.clientmap.Map(id) 232 - if client == nil { 233 - client, err := h.resetClient(id, r.Context()) 202 + if ok { 203 + err := h.rm.DeleteSession(id, r.Context()) 234 204 if err != nil { 235 - return nil, err 205 + h.serverError(w, errors.New("couldn't log out: "+err.Error())) 206 + return 236 207 } 237 - return client, nil 238 - } 239 - return client, nil 240 - } 241 - 242 - func (h *Handler) resetClient(id int, ctx context.Context) (*oauth.OauthXRPCClient, error) { 243 - session, err := h.db.GetOauthSession(id, ctx) 244 - if err != nil { 245 - return nil, errors.New(fmt.Sprintf("errpr setting up session %d: %s", id, err.Error())) 246 - } 247 - return h.setupClient(session), nil 248 - } 249 - 250 - func (h *Handler) setupClient(session *types.Session) *oauth.OauthXRPCClient { 251 - client := oauth.NewOauthXRPCClient(h.db, h.logger, session) 252 - h.clientmap.Append(session.ID, client, session.Expiration) 253 - return client 254 - } 255 - 256 - func (h *Handler) oauthLogout(w http.ResponseWriter, r *http.Request) { 257 - s, _ := h.sessionStore.Get(r, "oauthsession") 258 - id, ok := s.Values["id"].(int) 259 - if ok { 260 - h.db.DeleteOauthSession(id, r.Context()) 261 - h.clientmap.Delete(id) 262 208 } 263 209 s.Values = make(map[interface{}]interface{}) 264 210 s.Options.MaxAge = -1
+15 -67
server/internal/handler/xcvrHandlers.go
··· 4 4 "encoding/json" 5 5 "errors" 6 6 "net/http" 7 - "rvcx/internal/atputils" 8 - "rvcx/internal/db" 9 - "rvcx/internal/lex" 10 7 "rvcx/internal/types" 11 8 ) 12 9 ··· 23 20 h.badRequest(w, errors.New("error decoding post profile request: "+err.Error())) 24 21 return 25 22 } 26 - var pu db.ProfileUpdate 27 - pu.DID = did 28 - if p.DisplayName != nil { 29 - if atputils.ValidateGraphemesAndLength(*p.DisplayName, 64, 640) { 30 - h.badRequest(w, errors.New("displayname too long")) 31 - return 32 - } 33 - pu.Name = p.DisplayName 34 - pu.UpdateName = true 35 - } 36 - if p.DefaultNick != nil { 37 - if atputils.ValidateLength(*p.DefaultNick, 16) { 38 - h.badRequest(w, errors.New("nick too long")) 39 - } 40 - pu.Nick = p.DefaultNick 41 - pu.UpdateNick = true 42 - } 43 - if p.Status != nil { 44 - if atputils.ValidateGraphemesAndLength(*p.Status, 640, 6400) { 45 - h.badRequest(w, errors.New("status too long")) 46 - } 47 - pu.Status = p.Status 48 - pu.UpdateStatus = true 49 - } 50 - if p.Avatar != nil { 51 - // TODO think about how to do avatars! 52 - pu.Avatar = p.Avatar 53 - pu.UpdateAvatar = true 54 - } 55 - if p.Color != nil { 56 - if *p.Color > 16777215 || *p.Color < 0 { 57 - h.badRequest(w, errors.New("color out of bounds")) 58 - } 59 - pu.Color = p.Color 60 - pu.UpdateColor = true 61 - } 62 - session, _ := h.sessionStore.Get(r, "oauthsession") 63 - _, ok := session.Values["id"].(int) 23 + s, _ := h.sessionStore.Get(r, "oauthsession") 24 + id, ok := s.Values["id"].(int) 64 25 if !ok { 65 - h.badRequest(w, errors.New("cannot update profile, not authenticated")) 26 + h.badRequest(w, errors.New("must be logged in!")) 66 27 return 67 28 } 68 - profilerecord := lex.ProfileRecord{ 69 - DisplayName: p.DisplayName, 70 - DefaultNick: p.DefaultNick, 71 - Status: p.Status, 72 - Color: p.Color, 73 - } 74 - client, err := h.getClient(r) 29 + err = h.rm.PostProfile(did, id, r.Context(), &p) 75 30 if err != nil { 76 - h.serverError(w, err) 77 - return 78 - } 79 - _, err = client.UpdateXCVRProfile(profilerecord, r.Context()) 80 - if err != nil { 81 - h.serverError(w, err) 82 - return 31 + h.serverError(w, errors.New("erroring in postprofile flow: "+err.Error())) 83 32 } 84 - 85 - err = h.db.UpdateProfile(pu, r.Context()) 86 - if err != nil { 87 - h.serverError(w, errors.New("error updating profile: "+err.Error())) 88 - return 89 - } 90 - 91 33 h.serveProfileView(did, handle, w, r) 92 34 } 93 35 94 36 func (h *Handler) beep(w http.ResponseWriter, r *http.Request) { 95 - client, err := h.getClient(r) 96 - 37 + s, _ := h.sessionStore.Get(r, "oauthsession") 38 + id, ok := s.Values["id"].(int) 39 + if !ok { 40 + h.badRequest(w, errors.New("must be logged in!")) 41 + return 42 + } 43 + err := h.rm.Beep(id, r.Context()) 97 44 if err != nil { 98 - h.serverError(w, errors.New("error finding client: "+err.Error())) 45 + h.badRequest(w, err) 46 + return 99 47 } 100 - client.MakeBskyPost("beep_", r.Context()) 48 + w.Write(nil) 101 49 }
+48 -127
server/internal/model/channel.go
··· 3 3 import ( 4 4 "context" 5 5 "errors" 6 - "github.com/bluesky-social/indigo/atproto/syntax" 7 6 "net/http" 8 7 "os" 9 - "rvcx/internal/atputils" 10 8 "rvcx/internal/db" 11 - "rvcx/internal/lex" 12 9 "rvcx/internal/log" 13 10 "rvcx/internal/oauth" 11 + "rvcx/internal/recordmanager" 14 12 "rvcx/internal/types" 15 13 "sync" 16 14 "time" ··· 25 23 logger *log.Logger 26 24 cli *oauth.PasswordClient 27 25 mu sync.Mutex 26 + rm *recordmanager.RecordManager 28 27 } 29 28 30 29 type channelModel struct { 31 - uri string 32 - welcome string 33 - serverModel *serverModel 34 - streamModel *lexStreamModel 35 - } 30 + uri string 31 + logger *log.Logger 32 + valid bool 36 33 37 - type serverModel struct { 34 + welcome string 38 35 server *lrcd.Server 39 36 lastID uint32 40 - initChan chan lrcpb.Event_Init 37 + initChan <-chan lrcpb.Event_Init 41 38 ctx context.Context 42 39 cancel func() 43 - } 44 40 45 - type lexStreamModel struct { 46 - clients map[*client]bool 47 - clientsmu sync.Mutex 48 - ctx context.Context 49 - cancel func() 50 - signetBus chan types.SignetView 51 - messageBus chan types.MessageView 52 - logger *log.Logger 41 + clients map[*client]bool 42 + clientsmu sync.Mutex 53 43 } 54 44 55 45 func (m *Model) GetWSHandlerFrom(uri string) (http.HandlerFunc, error) { ··· 61 51 } 62 52 63 53 func (m *Model) GetLexStreamFrom(uri string) (http.HandlerFunc, error) { 64 - lsm, err := m.getLexStream(uri) 65 - if err != nil { 66 - return nil, err 67 - } 68 - return lsm.WSHandler(uri, m), nil 69 - } 70 - 71 - func (m *Model) getLexStream(uri string) (*lexStreamModel, error) { 72 - m.mu.Lock() 73 - defer m.mu.Unlock() 74 - cm := m.uriMap[uri] 75 - if cm == nil { 76 - return nil, errors.New("Not a valid server") 77 - } 78 - if cm.streamModel == nil { 79 - m.logger.Deprintln("i'm making a server now") 80 - ctx, cancel := context.WithCancel(context.Background()) 81 - lsm := lexStreamModel{ 82 - clients: make(map[*client]bool), 83 - clientsmu: sync.Mutex{}, 84 - ctx: ctx, 85 - cancel: cancel, 86 - signetBus: make(chan types.SignetView, 10), 87 - messageBus: make(chan types.MessageView, 10), 88 - logger: m.logger, 89 - } 90 - cm.streamModel = &lsm 91 - go lsm.broadcaster() 54 + cm, ok := m.uriMap[uri] 55 + if !ok { 56 + return nil, errors.New("not a valid server") 92 57 } 93 - return cm.streamModel, nil 58 + return cm.WSHandler(uri, m), nil 94 59 } 95 60 96 - func Init(store *db.Store, logger *log.Logger, cli *oauth.PasswordClient) *Model { 61 + func Init(store *db.Store, logger *log.Logger, cli *oauth.PasswordClient, rm *recordmanager.RecordManager) *Model { 97 62 uris, err := store.GetChannelURIs(context.Background()) 98 63 if err != nil { 99 64 panic(err) ··· 105 70 beep := channelModel{ 106 71 welcome: uri.Topic, 107 72 uri: uri.URI, 108 - } 109 - if valid { 110 - beep.serverModel = &serverModel{lastID: uri.LastID} 73 + logger: logger, 74 + lastID: uri.LastID, 75 + valid: valid, 111 76 } 112 77 uriToServerModel[uri.URI] = &beep 113 78 } ··· 117 82 logger, 118 83 cli, 119 84 sync.Mutex{}, 85 + rm, 120 86 } 121 87 } 122 88 ··· 135 101 beep := channelModel{ 136 102 welcome: welcome, 137 103 uri: c.URI, 138 - } 139 - if valid { 140 - beep.serverModel = &serverModel{lastID: 1} 104 + logger: m.logger, 105 + lastID: 1, 106 + valid: valid, 141 107 } 142 108 m.uriMap[c.URI] = &beep 143 109 return nil ··· 149 115 150 116 cm := m.uriMap[uri] 151 117 if cm == nil { 152 - return nil, errors.New("Not a valid server") 118 + return nil, errors.New("uri doesn't refer to a channel i am aware of") 153 119 } 154 - sm := cm.serverModel 155 - if sm == nil { 120 + if !cm.valid { 156 121 return nil, errors.New("Not hosted on this backend!") 157 122 } 158 123 159 - if sm.server == nil { 124 + if cm.server == nil { 160 125 m.logger.Deprintln("i think the server should exist, so i'm making it") 161 126 var err error 162 - lastID := sm.lastID 127 + lastID := cm.lastID 163 128 initChan := make(chan lrcpb.Event_Init, 100) 164 129 165 130 server, err := lrcd.NewServer( ··· 178 143 return nil, errors.New("Error starting server") 179 144 } 180 145 181 - if sm.cancel != nil { 146 + if cm.cancel != nil { 182 147 m.logger.Println("that's weird, old cancel lying around") 183 - sm.cancel() 148 + cm.cancel() 184 149 } 185 150 186 151 ctx, cancel := context.WithCancel(context.Background()) 187 - sm.server = server 188 - sm.initChan = initChan 189 - sm.cancel = cancel 190 - sm.ctx = ctx 152 + cm.server = server 153 + cm.initChan = initChan 154 + cm.cancel = cancel 155 + cm.ctx = ctx 191 156 192 - go m.handleInitEvents(ctx, uri, initChan) 157 + go m.handleInitEvents(cm) 193 158 } 194 - return sm.server, nil 159 + return cm.server, nil 195 160 } 196 161 197 - func (m *Model) handleInitEvents(ctx context.Context, uri string, initChan <-chan lrcpb.Event_Init) { 162 + func (m *Model) handleInitEvents(cm *channelModel) { 198 163 ticker := time.NewTicker(5 * time.Minute) 199 164 defer ticker.Stop() 200 165 201 166 for { 202 167 select { 203 - case <-ctx.Done(): 204 - m.logger.Deprintln("i'm a handleinitevent goroutine and my context is done") 168 + case <-cm.ctx.Done(): 169 + cm.logger.Deprintln("i'm a handleinitevent goroutine and my context is done") 205 170 return 206 171 case <-ticker.C: 207 - m.mu.Lock() 208 - cm := m.uriMap[uri] 209 - if cm == nil || cm.serverModel == nil || cm.serverModel.server == nil { 210 - m.mu.Unlock() 211 - return 212 - } 213 - sm := cm.serverModel 214 - 215 - c := sm.server.Connected() 172 + c := cm.server.Connected() 216 173 if c == 0 { 217 - m.logger.Deprintln("i think the server is empty! gonna break some things") 218 - lastID, err := sm.server.Stop() 174 + cm.logger.Deprintln("i think the server is empty! gonna break some things") 175 + lastID, err := cm.server.Stop() 219 176 if err != nil { 220 - m.mu.Unlock() 221 177 panic(err) 222 178 } 223 - sm.lastID = lastID 224 - sm.server = nil 225 - sm.initChan = nil 226 - sm.cancel() 227 - sm.cancel = nil 228 - m.mu.Unlock() 179 + cm.lastID = lastID 180 + cm.server = nil 181 + cm.initChan = nil 182 + cm.cancel() 183 + cm.cancel = nil 229 184 return 230 185 } 231 - m.mu.Unlock() 232 - case e, ok := <-initChan: 186 + case e, ok := <-cm.initChan: 233 187 if !ok { 234 - m.logger.Println("this is a weird case!") 188 + cm.logger.Println("this is a weird case!") 235 189 return 236 190 } 237 - signet := lex.SignetRecord{} 238 - handle := e.Init.ExternalID 239 - if handle == nil { 240 - h := "" 241 - handle = &h 242 - } 243 - signet.AuthorHandle = *handle 244 - if e.Init.Id == nil { 245 - m.logger.Deprintln("initchannel gave me a nil id") 246 - continue 247 - } 248 - lrcid := uint64(*e.Init.Id) 249 - signet.LRCID = lrcid 250 - signet.ChannelURI = uri 251 - now := syntax.DatetimeNow() 252 - nowTime := now.Time() 253 - nowString := now.String() 254 - signet.StartedAt = &nowString 255 - cid, recorduri, err := m.cli.CreateXCVRSignet(&signet, context.Background()) 191 + err := m.rm.PostSignet(e, cm.uri, context.Background()) 256 192 if err != nil { 257 - m.logger.Deprintf("couldn't post a signet in %s: %s", uri, err.Error()) 258 - continue 193 + m.logger.Println("error posting signet: " + err.Error()) 259 194 } 260 - sr := types.Signet{ 261 - URI: recorduri, 262 - IssuerDID: atputils.GetMyDid(), 263 - AuthorHandle: signet.AuthorHandle, 264 - ChannelURI: uri, 265 - MessageID: *e.Init.Id, 266 - CID: cid, 267 - StartedAt: nowTime, 268 - } 269 - err = m.store.StoreSignet(&sr, context.Background()) 270 - if err != nil { 271 - m.logger.Println("failed to store signet!" + err.Error()) 272 - } 273 - m.BroadcastSignet(uri, sr) 274 195 } 275 196 } 276 197 }
+36 -72
server/internal/model/channelLexiconStream.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "errors" 5 6 "net/http" 6 7 "rvcx/internal/atputils" 7 8 "rvcx/internal/types" ··· 15 16 bus chan any 16 17 } 17 18 18 - func (lsm *lexStreamModel) WSHandler(uri string, m *Model) http.HandlerFunc { 19 + func (cm *channelModel) WSHandler(uri string, m *Model) http.HandlerFunc { 19 20 return func(w http.ResponseWriter, r *http.Request) { 20 21 upgrader := &websocket.Upgrader{ 21 22 CheckOrigin: func(r *http.Request) bool { ··· 33 34 conn, 34 35 bus, 35 36 } 36 - lsm.clientsmu.Lock() 37 - lsm.clients[client] = true 38 - lsm.clientsmu.Unlock() 37 + cm.clientsmu.Lock() 38 + cm.clients[client] = true 39 + cm.clientsmu.Unlock() 39 40 40 - client.wsWriter(lsm.ctx) 41 - lsm.logger.Deprintln("i am a lex stream wshandler and i am exiting") 41 + client.wsWriter(cm.ctx) 42 + cm.logger.Deprintln("i am a lex stream wshandler and i am exiting") 42 43 43 - lsm.clientsmu.Lock() 44 - delete(lsm.clients, client) 45 - if len(lsm.clients) == 0 { 46 - lsm.logger.Deprintln("i think that there are no more clients, so i will terminate the stream model ok") 47 - lsm.cancel() 48 - m.uriMap[uri].streamModel = nil 49 - } 50 - lsm.clientsmu.Unlock() 44 + cm.clientsmu.Lock() 45 + delete(cm.clients, client) 46 + cm.clientsmu.Unlock() 51 47 } 52 48 } 53 49 ··· 71 67 } 72 68 } 73 69 74 - func (lsm *lexStreamModel) broadcaster() { 75 - for { 76 - select { 77 - case <-lsm.ctx.Done(): 78 - lsm.logger.Deprintln("since lsm context ended, i am cleaning it up") 79 - lsm.cleanUp() 80 - return 81 - case m, ok := <-lsm.messageBus: 82 - if !ok { 83 - lsm.logger.Deprintln("since lsm message bus gave bad message, i am cleaning it up") 84 - lsm.cleanUp() 85 - return 86 - } 87 - lsm.broadcast(m) 88 - case s, ok := <-lsm.signetBus: 89 - if !ok { 90 - lsm.logger.Deprintln("since lsm signetbus gave bad message, i am cleaning it up") 91 - lsm.cleanUp() 92 - return 93 - } 94 - lsm.broadcast(s) 95 - } 96 - } 97 - } 98 - 99 - func (lsm *lexStreamModel) cleanUp() { 100 - lsm.clientsmu.Lock() 101 - defer lsm.clientsmu.Unlock() 102 - for cli := range lsm.clients { 103 - close(cli.bus) 104 - } 105 - } 70 + // func (cm *channelModel) cleanUp() { 71 + // cm.clientsmu.Lock() 72 + // defer cm.clientsmu.Unlock() 73 + // for cli := range cm.clients { 74 + // close(cli.bus) 75 + // } 76 + // } 106 77 107 - func (lsm *lexStreamModel) broadcast(a any) { 108 - lsm.clientsmu.Lock() 109 - defer lsm.clientsmu.Unlock() 110 - for cli := range lsm.clients { 78 + func (cm *channelModel) broadcast(a any) { 79 + cm.clientsmu.Lock() 80 + defer cm.clientsmu.Unlock() 81 + for cli := range cm.clients { 111 82 select { 112 83 case cli.bus <- a: 113 84 default: 114 - delete(lsm.clients, cli) 85 + delete(cm.clients, cli) 115 86 close(cli.bus) 116 87 } 117 88 } 118 89 } 119 90 120 - // should this be on lsm? 121 - func (m *Model) BroadcastSignet(uri string, s types.Signet) { 122 - lsm := m.uriMap[uri] 123 - if lsm == nil { 124 - m.logger.Println("AAAAAAAAAAA") 125 - return 91 + func (m *Model) BroadcastSignet(uri string, s *types.Signet) error { 92 + cm := m.uriMap[uri] 93 + if cm == nil { 94 + return errors.New("AAAAAAAAAAA") 126 95 } 127 96 ihandle, err := m.store.ResolveDid(s.IssuerDID, context.Background()) 128 97 if err != nil { 129 98 ihandle, err = atputils.TryLookupDid(context.Background(), s.IssuerDID) 130 99 if err != nil { 131 - m.logger.Println("AAAAAAAAAAAAAAAAAAAAA") 132 - return 100 + return errors.New("AAAAAAAAAAAAAAAAAAAAA") 133 101 } 134 102 go m.store.StoreDidHandle(s.IssuerDID, ihandle, context.Background()) 135 103 } ··· 141 109 AuthorHandle: s.AuthorHandle, 142 110 StartedAt: s.StartedAt, 143 111 } 144 - if lsm.streamModel == nil { 145 - m.logger.Println("curious *watches the world burn*") 146 - } 147 - lsm.streamModel.signetBus <- sv 112 + cm.broadcast(sv) 113 + return nil 148 114 } 149 115 150 - func (m *Model) BroadcastMessage(uri string, msg types.Message) { 151 - m.logger.Deprintln("broadcasting!") 152 - lsm := m.uriMap[uri] 153 - if lsm == nil { 154 - m.logger.Deprintln("failed to map uri to lsm!") 155 - return 116 + func (m *Model) BroadcastMessage(uri string, msg *types.Message) error { 117 + cm := m.uriMap[uri] 118 + if cm == nil { 119 + return errors.New("failed to map uri to lsm!") 156 120 } 157 121 pv, err := m.store.GetProfileView(msg.DID, context.Background()) 158 122 if err != nil { 159 - m.logger.Deprintln("failed to get profile view: " + err.Error()) 160 - return 123 + return errors.New("failed to get profile view: " + err.Error()) 161 124 } 162 125 mv := types.MessageView{ 163 126 URI: msg.URI, ··· 168 131 SignetURI: msg.SignetURI, 169 132 PostedAt: msg.PostedAt, 170 133 } 171 - lsm.streamModel.messageBus <- mv 134 + cm.broadcast(mv) 135 + return nil 172 136 }
+8 -8
server/internal/oauth/oauthclient.go
··· 79 79 return nil 80 80 } 81 81 82 - func (c *OauthXRPCClient) CreateXCVRProfile(profile lex.ProfileRecord, ctx context.Context) (p *lex.ProfileRecord, err error) { 82 + func (c *OauthXRPCClient) CreateXCVRProfile(profile *lex.ProfileRecord, ctx context.Context) (p *lex.ProfileRecord, err error) { 83 83 authargs, err := c.getOauthSessionAuthArgs() 84 84 if err != nil { 85 85 err = errors.New("failed to get oauthsessionauthargs while making post: " + err.Error()) ··· 108 108 Collection: "org.xcvr.actor.profile", 109 109 Repo: authargs.Did, 110 110 Rkey: &rkey, 111 - Record: &util.LexiconTypeDecoder{Val: &profile}, 111 + Record: &util.LexiconTypeDecoder{Val: profile}, 112 112 } 113 113 var out atproto.RepoCreateRecord_Output 114 114 err = c.xrpc.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) ··· 116 116 err = errors.New("oops! failed to create a profile: " + err.Error()) 117 117 return 118 118 } 119 - return &profile, nil 119 + return profile, nil 120 120 } 121 121 122 122 func (c *OauthXRPCClient) CreateXCVRChannel(channel *lex.ChannelRecord, ctx context.Context) (uri string, cid string, err error) { ··· 141 141 return 142 142 } 143 143 144 - func (c *OauthXRPCClient) CreateXCVRMessage(message lex.MessageRecord, ctx context.Context) (uri string, cid string, err error) { 144 + func (c *OauthXRPCClient) CreateXCVRMessage(message *lex.MessageRecord, ctx context.Context) (uri string, cid string, err error) { 145 145 authargs, err := c.getOauthSessionAuthArgs() 146 146 if err != nil { 147 147 err = errors.New("uh oh... I couldn't make a XCVRMessage: " + err.Error()) ··· 150 150 input := atproto.RepoCreateRecord_Input{ 151 151 Collection: "org.xcvr.lrc.message", 152 152 Repo: authargs.Did, 153 - Record: &util.LexiconTypeDecoder{Val: &message}, 153 + Record: &util.LexiconTypeDecoder{Val: message}, 154 154 } 155 155 var out atproto.RepoCreateRecord_Output 156 156 err = c.xrpc.Do(ctx, authargs, "POST", "application/json", "com.atproto.repo.createRecord", nil, input, &out) ··· 163 163 return 164 164 } 165 165 166 - func (c *OauthXRPCClient) UpdateXCVRProfile(profile lex.ProfileRecord, ctx context.Context) (p *lex.ProfileRecord, err error) { 166 + func (c *OauthXRPCClient) UpdateXCVRProfile(profile *lex.ProfileRecord, ctx context.Context) (p *lex.ProfileRecord, err error) { 167 167 authargs, err := c.getOauthSessionAuthArgs() 168 168 if err != nil { 169 169 err = errors.New("failed to get oauthsessionauthargs while making post: " + err.Error()) ··· 182 182 Collection: "org.xcvr.actor.profile", 183 183 Repo: authargs.Did, 184 184 Rkey: rkey, 185 - Record: &util.LexiconTypeDecoder{Val: &profile}, 185 + Record: &util.LexiconTypeDecoder{Val: profile}, 186 186 SwapRecord: getOut.Cid, 187 187 } 188 188 var out atproto.RepoPutRecord_Output ··· 191 191 err = errors.New("oops! failed to update a profile: " + err.Error()) 192 192 return 193 193 } 194 - return &profile, nil 194 + return profile, nil 195 195 } 196 196 197 197 func getProfileRecord(pdsUrl string, did string, ctx context.Context) (*atproto.RepoGetRecord_Output, error) {
+14
server/internal/recordmanager/beep.go
··· 1 + package recordmanager 2 + 3 + import ( 4 + "context" 5 + ) 6 + 7 + func (rm *RecordManager) Beep(id int, ctx context.Context) error { 8 + 9 + client, err := rm.getClient(id, ctx) 10 + if err != nil { 11 + return err 12 + } 13 + return client.MakeBskyPost("beep_", ctx) 14 + }
+154
server/internal/recordmanager/channel.go
··· 1 + package recordmanager 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + "rvcx/internal/atputils" 8 + "rvcx/internal/lex" 9 + "rvcx/internal/types" 10 + "time" 11 + ) 12 + 13 + // TODO: make sure initchannel works correctly 14 + func (rm *RecordManager) AcceptChannel(c *types.Channel, ctx context.Context) error { 15 + err := rm.storeChannel(c, ctx) 16 + if err != nil { 17 + return errors.New("failed to store channel: " + err.Error()) 18 + } 19 + err = rm.initChannel(c) 20 + if err != nil { 21 + return errors.New("failed to initialize channel: " + err.Error()) 22 + } 23 + return nil 24 + } 25 + 26 + func (rm *RecordManager) AcceptChannelUpdate(c *types.Channel, ctx context.Context) error { 27 + err := rm.updateChanneldb(c, ctx) 28 + if err != nil { 29 + return errors.New("failed to update channel: " + err.Error()) 30 + } 31 + err = rm.updateChannelmodel(c) 32 + if err != nil { 33 + return errors.New("failed to update channel model: " + err.Error()) 34 + } 35 + return nil 36 + } 37 + 38 + func (rm *RecordManager) PostMyChannel(ctx context.Context, pcr *types.PostChannelRequest) (did string, uri string, err error) { 39 + return rm.postchannelflow(rm.createMyChannel(), ctx, pcr) 40 + } 41 + 42 + func (rm *RecordManager) PostChannel(id int, udid string, ctx context.Context, pcr *types.PostChannelRequest) (did string, uri string, err error) { 43 + return rm.postchannelflow(rm.createChannel(id, udid), ctx, pcr) 44 + } 45 + 46 + func (rm *RecordManager) postchannelflow(f func(*lex.ChannelRecord, *time.Time, context.Context) (*types.Channel, error), ctx context.Context, pcr *types.PostChannelRequest) (did string, uri string, err error) { 47 + lcr, now, err := rm.validateChannel(pcr) 48 + if err != nil { 49 + err = errors.New("couldn't validate channel: " + err.Error()) 50 + return 51 + } 52 + channel, err := f(lcr, now, ctx) 53 + if err != nil { 54 + err = errors.New("couldn't create channel: " + err.Error()) 55 + return 56 + } 57 + err = rm.storeChannel(channel, ctx) 58 + if err != nil { 59 + err = errors.New("couldn't store channel: " + err.Error()) 60 + return 61 + } 62 + err = rm.initChannel(channel) 63 + if err != nil { 64 + err = errors.New("couldn't init channel: " + err.Error()) 65 + return 66 + } 67 + did = channel.DID 68 + uri = channel.URI 69 + return 70 + } 71 + 72 + func (rm *RecordManager) storeChannel(c *types.Channel, ctx context.Context) error { 73 + return rm.db.StoreChannel(c, ctx) 74 + } 75 + 76 + func (rm *RecordManager) initChannel(c *types.Channel) error { 77 + return rm.broadcaster.AddChannel(c) 78 + } 79 + 80 + func (rm *RecordManager) updateChanneldb(c *types.Channel, ctx context.Context) error { 81 + return rm.db.UpdateChannel(c, ctx) 82 + } 83 + 84 + // TODO: impl updatechannel 85 + func (rm *RecordManager) updateChannelmodel(c *types.Channel) error { 86 + return rm.broadcaster.AddChannel(c) 87 + } 88 + 89 + func (rm *RecordManager) createChannel(id int, did string) func(*lex.ChannelRecord, *time.Time, context.Context) (*types.Channel, error) { 90 + return func(lcr *lex.ChannelRecord, now *time.Time, ctx context.Context) (*types.Channel, error) { 91 + client, err := rm.getClient(id, ctx) 92 + if err != nil { 93 + return nil, errors.New("couldn't get client") 94 + } 95 + uri, cid, err := client.CreateXCVRChannel(lcr, ctx) 96 + if err != nil { 97 + return nil, errors.New("something bad probs happened when posting a channel " + err.Error()) 98 + } 99 + channel := types.Channel{ 100 + URI: uri, 101 + CID: cid, 102 + DID: did, 103 + Host: lcr.Host, 104 + Title: lcr.Title, 105 + Topic: lcr.Topic, 106 + CreatedAt: *now, 107 + IndexedAt: time.Now(), 108 + } 109 + return &channel, nil 110 + } 111 + } 112 + 113 + func (rm *RecordManager) createMyChannel() func(*lex.ChannelRecord, *time.Time, context.Context) (*types.Channel, error) { 114 + return func(lcr *lex.ChannelRecord, now *time.Time, ctx context.Context) (*types.Channel, error) { 115 + uri, cid, err := rm.myClient.CreateXCVRChannel(lcr, ctx) 116 + if err != nil { 117 + return nil, errors.New("something bad probs happened when posting a channel " + err.Error()) 118 + } 119 + channel := types.Channel{ 120 + URI: uri, 121 + CID: cid, 122 + DID: atputils.GetMyDid(), 123 + Host: lcr.Host, 124 + Title: lcr.Title, 125 + Topic: lcr.Topic, 126 + CreatedAt: *now, 127 + IndexedAt: time.Now(), 128 + } 129 + return &channel, nil 130 + } 131 + } 132 + 133 + func (rm *RecordManager) validateChannel(cr *types.PostChannelRequest) (*lex.ChannelRecord, *time.Time, error) { 134 + var lcr lex.ChannelRecord 135 + if cr.Title == "" || atputils.ValidateGraphemesAndLength(cr.Title, 64, 640) { 136 + return nil, nil, errors.New("title empty or too long") 137 + } 138 + lcr.Title = cr.Title 139 + if cr.Host == "" { 140 + return nil, nil, errors.New("no host") 141 + } 142 + lcr.Host = cr.Host 143 + if cr.Topic != nil { 144 + if atputils.ValidateGraphemesAndLength(*cr.Topic, 256, 2560) { 145 + return nil, nil, errors.New("topic too long") 146 + } 147 + lcr.Topic = cr.Topic 148 + } 149 + 150 + dtn := syntax.DatetimeNow() 151 + lcr.CreatedAt = dtn.String() 152 + time := dtn.Time() 153 + return &lcr, &time, nil 154 + }
+185
server/internal/recordmanager/message.go
··· 1 + package recordmanager 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + "github.com/rachel-mp4/lrcd" 8 + "os" 9 + "rvcx/internal/atputils" 10 + "rvcx/internal/lex" 11 + "rvcx/internal/types" 12 + "slices" 13 + "time" 14 + ) 15 + 16 + func (rm *RecordManager) PostMessage(id int, udid string, ctx context.Context, pmr *types.PostMessageRequest) error { 17 + lmr, now, _, _, err := rm.validateMessage(pmr, ctx) 18 + if err != nil { 19 + return errors.New("failed to validate message: " + err.Error()) 20 + } 21 + m, err := rm.createMessage(id, udid, lmr, now, ctx) 22 + if err != nil { 23 + return errors.New("failed to create message: " + err.Error()) 24 + } 25 + err = rm.storeMessage(m, ctx) 26 + if err != nil { 27 + return errors.New("failed to store message: " + err.Error()) 28 + } 29 + err = rm.forwardMessage(m, ctx) 30 + if err != nil { 31 + return errors.New("failed to forward message: " + err.Error()) 32 + } 33 + return nil 34 + } 35 + 36 + func (rm *RecordManager) PostMyMessage(ctx context.Context, pmr *types.PostMessageRequest) error { 37 + lmr, now, handle, nonce, err := rm.validateMessage(pmr, ctx) 38 + if err != nil { 39 + return errors.New("failed to validate message: " + err.Error()) 40 + } 41 + err = rm.validateHandleAndNonce(handle, nonce, lmr.SignetURI, ctx) 42 + if err != nil { 43 + return errors.New("failed to validate my handle and nonce: " + err.Error()) 44 + } 45 + m, err := rm.createMyMessage(lmr, now, ctx) 46 + if err != nil { 47 + return errors.New("failed to create message: " + err.Error()) 48 + } 49 + err = rm.storeMessage(m, ctx) 50 + if err != nil { 51 + return errors.New("failed to store message: " + err.Error()) 52 + } 53 + err = rm.forwardMessage(m, ctx) 54 + if err != nil { 55 + return errors.New("failed to forward message: " + err.Error()) 56 + } 57 + return nil 58 + } 59 + 60 + func (rm *RecordManager) validateHandleAndNonce(handle *string, nonce []byte, signetUri string, ctx context.Context) error { 61 + if handle == nil || *handle != atputils.GetMyHandle() { 62 + return errors.New("i only post my messages") 63 + } 64 + curi, mid, err := rm.db.QuerySignetChannelIdNum(signetUri, ctx) 65 + if err != nil { 66 + return errors.New("failed to find signet") 67 + } 68 + correctNonce := lrcd.GenerateNonce(mid, curi, os.Getenv("LRCD_SECRET")) 69 + if !slices.Equal(nonce, correctNonce) { 70 + return errors.New("i think user tried to post someone else's post") 71 + } 72 + return nil 73 + } 74 + 75 + func (rm *RecordManager) createMyMessage(lmr *lex.MessageRecord, now *time.Time, ctx context.Context) (*types.Message, error) { 76 + uri, cid, err := rm.myClient.CreateXCVRMessage(lmr, ctx) 77 + if err != nil { 78 + return nil, errors.New("couldn't add to user repo: " + err.Error()) 79 + } 80 + var coloruint32ptr *uint32 81 + if lmr.Color != nil { 82 + color := uint32(*lmr.Color) 83 + coloruint32ptr = &color 84 + } 85 + message := &types.Message{ 86 + URI: uri, 87 + DID: atputils.GetMyDid(), 88 + CID: cid, 89 + SignetURI: lmr.SignetURI, 90 + Body: lmr.Body, 91 + Nick: lmr.Nick, 92 + Color: coloruint32ptr, 93 + PostedAt: *now, 94 + } 95 + return message, nil 96 + } 97 + 98 + func (rm *RecordManager) createMessage(id int, did string, lmr *lex.MessageRecord, now *time.Time, ctx context.Context) (*types.Message, error) { 99 + client, err := rm.getClient(id, ctx) 100 + if err != nil { 101 + return nil, errors.New("failed to get client: " + err.Error()) 102 + } 103 + uri, cid, err := client.CreateXCVRMessage(lmr, ctx) 104 + if err != nil { 105 + return nil, errors.New("couldn't add to user repo: " + err.Error()) 106 + } 107 + var coloruint32ptr *uint32 108 + if lmr.Color != nil { 109 + color := uint32(*lmr.Color) 110 + coloruint32ptr = &color 111 + } 112 + message := &types.Message{ 113 + URI: uri, 114 + DID: did, 115 + CID: cid, 116 + SignetURI: lmr.SignetURI, 117 + Body: lmr.Body, 118 + Nick: lmr.Nick, 119 + Color: coloruint32ptr, 120 + PostedAt: *now, 121 + } 122 + return message, nil 123 + } 124 + 125 + func (rm *RecordManager) storeMessage(m *types.Message, ctx context.Context) error { 126 + return rm.db.StoreMessage(m, ctx) 127 + } 128 + 129 + func (rm *RecordManager) forwardMessage(m *types.Message, ctx context.Context) error { 130 + curi, err := rm.db.GetMsgChannelURI(m.SignetURI, ctx) 131 + if err != nil { 132 + return errors.New("aaaaaaaaaaaa " + err.Error()) 133 + } 134 + return rm.broadcaster.BroadcastMessage(curi, m) 135 + } 136 + 137 + func (rm *RecordManager) validateMessage(mr *types.PostMessageRequest, ctx context.Context) (lmr *lex.MessageRecord, now *time.Time, handle *string, nonce []byte, err error) { 138 + lmr = &lex.MessageRecord{} 139 + if mr.SignetURI == nil { 140 + if mr.MessageID == nil || mr.ChannelURI == nil { 141 + err = errors.New("must provide a way to determine signet") 142 + return 143 + } 144 + signetUri, signetHandle, yorks := rm.db.QuerySignet(*mr.ChannelURI, *mr.MessageID, ctx) 145 + if yorks != nil { 146 + err = errors.New("i couldn't find the signet :c : " + yorks.Error()) 147 + return 148 + } 149 + mr.SignetURI = &signetUri 150 + handle = &signetHandle 151 + } else { 152 + signetHandle, yorks := rm.db.QuerySignetHandle(*mr.SignetURI, ctx) 153 + if yorks != nil { 154 + err = errors.New("yorks skooby 💀" + yorks.Error()) 155 + return 156 + } 157 + handle = &signetHandle 158 + } 159 + lmr.SignetURI = *mr.SignetURI 160 + lmr.Body = mr.Body 161 + if mr.Nick != nil { 162 + nick := *mr.Nick 163 + if atputils.ValidateLength(nick, 16) { 164 + err = errors.New("that nick is too long") 165 + return 166 + } 167 + } 168 + lmr.Nick = mr.Nick 169 + 170 + if mr.Color != nil { 171 + color := uint64(*mr.Color) 172 + if color > 16777215 { 173 + err = errors.New("that color is too big") 174 + return 175 + } 176 + lmr.Color = &color 177 + } 178 + 179 + nonce = mr.Nonce 180 + nowsyn := syntax.DatetimeNow() 181 + lmr.PostedAt = nowsyn.String() 182 + nt := nowsyn.Time() 183 + now = &nt 184 + return 185 + }
+131
server/internal/recordmanager/profile.go
··· 1 + package recordmanager 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "rvcx/internal/atputils" 7 + "rvcx/internal/db" 8 + "rvcx/internal/lex" 9 + "rvcx/internal/types" 10 + ) 11 + 12 + func (rm *RecordManager) CreateInitialProfile(did string, id int, ctx context.Context) error { 13 + nick := "wanderer" 14 + status := "just setting up my xcvr" 15 + color := uint64(3602605) 16 + handle, err := rm.db.ResolveDid(did, ctx) 17 + if err != nil { 18 + return errors.New("i couldn't find the handle, so i couldn't create default profile record. gootbye") 19 + } 20 + 21 + p, err := rm.createProfile(&handle, &nick, &status, &color, id, ctx) 22 + if err != nil { 23 + return errors.New("AAAAA error creating profile") 24 + } 25 + rm.log.Deprintln("initializing profile....") 26 + err = rm.db.InitializeProfile(did, p.DisplayName, p.DefaultNick, p.Status, p.Color, ctx) 27 + if err != nil { 28 + return errors.New("failed to initialize profile: " + err.Error()) 29 + } 30 + return nil 31 + 32 + } 33 + 34 + func (rm *RecordManager) PostProfile(did string, id int, ctx context.Context, p *types.PostProfileRequest) error { 35 + pu, err := rm.validateProfile(did, p) 36 + if err != nil { 37 + return errors.New("couldn't validate profile: " + err.Error()) 38 + } 39 + err = rm.updateProfile(p.DisplayName, p.DefaultNick, p.Status, p.Color, id, ctx) 40 + if err != nil { 41 + return errors.New("couldn't create profile: " + err.Error()) 42 + } 43 + err = rm.storeProfile(pu, ctx) 44 + if err != nil { 45 + return errors.New("couldn't store profile: " + err.Error()) 46 + } 47 + return nil 48 + } 49 + 50 + func (rm *RecordManager) storeProfile(pu *db.ProfileUpdate, ctx context.Context) error { 51 + err := rm.db.UpdateProfile(pu, ctx) 52 + if err != nil { 53 + return errors.New("error updating profile: " + err.Error()) 54 + } 55 + return nil 56 + } 57 + 58 + func (rm *RecordManager) updateProfile(name *string, nick *string, status *string, color *uint64, id int, ctx context.Context) error { 59 + profilerecord := &lex.ProfileRecord{ 60 + DisplayName: name, 61 + DefaultNick: nick, 62 + Status: status, 63 + Color: color, 64 + } 65 + client, err := rm.getClient(id, ctx) 66 + if err != nil { 67 + return err 68 + } 69 + _, err = client.UpdateXCVRProfile(profilerecord, ctx) 70 + if err != nil { 71 + return err 72 + } 73 + return nil 74 + } 75 + 76 + func (rm *RecordManager) createProfile(name *string, nick *string, status *string, color *uint64, id int, ctx context.Context) (*lex.ProfileRecord, error) { 77 + profilerecord := &lex.ProfileRecord{ 78 + DisplayName: name, 79 + DefaultNick: nick, 80 + Status: status, 81 + Color: color, 82 + } 83 + client, err := rm.getClient(id, ctx) 84 + if err != nil { 85 + return nil, err 86 + } 87 + p, err := client.CreateXCVRProfile(profilerecord, ctx) 88 + if err != nil { 89 + return nil, errors.New("failed to create profile: " + err.Error()) 90 + } 91 + return p, nil 92 + } 93 + 94 + func (rm *RecordManager) validateProfile(did string, p *types.PostProfileRequest) (*db.ProfileUpdate, error) { 95 + var pu db.ProfileUpdate 96 + pu.DID = did 97 + if p.DisplayName != nil { 98 + if atputils.ValidateGraphemesAndLength(*p.DisplayName, 64, 640) { 99 + return nil, errors.New("displayname too long") 100 + } 101 + pu.Name = p.DisplayName 102 + pu.UpdateName = true 103 + } 104 + if p.DefaultNick != nil { 105 + if atputils.ValidateLength(*p.DefaultNick, 16) { 106 + return nil, errors.New("nick too long") 107 + } 108 + pu.Nick = p.DefaultNick 109 + pu.UpdateNick = true 110 + } 111 + if p.Status != nil { 112 + if atputils.ValidateGraphemesAndLength(*p.Status, 640, 6400) { 113 + return nil, errors.New("status too long") 114 + } 115 + pu.Status = p.Status 116 + pu.UpdateStatus = true 117 + } 118 + if p.Avatar != nil { 119 + // TODO think about how to do avatars! 120 + pu.Avatar = p.Avatar 121 + pu.UpdateAvatar = true 122 + } 123 + if p.Color != nil { 124 + if *p.Color > 16777215 || *p.Color < 0 { 125 + return nil, errors.New("color out of bounds") 126 + } 127 + pu.Color = p.Color 128 + pu.UpdateColor = true 129 + } 130 + return &pu, nil 131 + }
+60
server/internal/recordmanager/recordmanager.go
··· 1 + package recordmanager 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "rvcx/internal/db" 8 + "rvcx/internal/log" 9 + "rvcx/internal/oauth" 10 + "rvcx/internal/types" 11 + ) 12 + 13 + type LexBroadcaster interface { 14 + BroadcastSignet(uri string, s *types.Signet) error 15 + BroadcastMessage(uri string, m *types.Message) error 16 + AddChannel(c *types.Channel) error 17 + } 18 + 19 + type RecordManager struct { 20 + log *log.Logger 21 + db *db.Store 22 + myClient *oauth.PasswordClient 23 + clientmap *oauth.ClientMap 24 + broadcaster LexBroadcaster 25 + } 26 + 27 + func New(log *log.Logger, db *db.Store, myClient *oauth.PasswordClient, broadcaster LexBroadcaster) *RecordManager { 28 + clientmap := oauth.NewClientMap() 29 + return &RecordManager{log, db, myClient, clientmap, broadcaster} 30 + } 31 + 32 + func (rm *RecordManager) getClient(id int, ctx context.Context) (*oauth.OauthXRPCClient, error) { 33 + client := rm.clientmap.Map(id) 34 + if client == nil { 35 + client, err := rm.resetClient(id, ctx) 36 + if err != nil { 37 + return nil, err 38 + } 39 + return client, nil 40 + } 41 + return client, nil 42 + } 43 + 44 + func (rm *RecordManager) resetClient(id int, ctx context.Context) (*oauth.OauthXRPCClient, error) { 45 + session, err := rm.db.GetOauthSession(id, ctx) 46 + if err != nil { 47 + return nil, errors.New(fmt.Sprintf("errpr setting up session %d: %s", id, err.Error())) 48 + } 49 + return rm.setupClient(session), nil 50 + } 51 + 52 + func (rm *RecordManager) setupClient(session *types.Session) *oauth.OauthXRPCClient { 53 + client := oauth.NewOauthXRPCClient(rm.db, rm.log, session) 54 + rm.clientmap.Append(session.ID, client, session.Expiration) 55 + return client 56 + } 57 + 58 + // create - oauth 59 + // store - db 60 + // broadcast - channels model
+15
server/internal/recordmanager/session.go
··· 1 + package recordmanager 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + ) 7 + 8 + func (rm *RecordManager) DeleteSession(id int, ctx context.Context) error { 9 + err := rm.db.DeleteOauthSession(id, ctx) 10 + if err != nil { 11 + return errors.New("failed to delete session: " + err.Error()) 12 + } 13 + rm.clientmap.Delete(id) 14 + return nil 15 + }
+81
server/internal/recordmanager/signet.go
··· 1 + package recordmanager 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + lrcpb "github.com/rachel-mp4/lrcproto/gen/go" 8 + "rvcx/internal/atputils" 9 + "rvcx/internal/lex" 10 + "rvcx/internal/types" 11 + "time" 12 + ) 13 + 14 + func (rm *RecordManager) PostSignet(e lrcpb.Event_Init, uri string, ctx context.Context) error { 15 + lsr, now, err := rm.validateSignet(e, uri) 16 + if err != nil { 17 + return errors.New("failed to validate signet: " + err.Error()) 18 + } 19 + signet, err := rm.createSignet(lsr, now, *e.Init.Id, ctx) 20 + if err != nil { 21 + return errors.New("failed to create signet: " + err.Error()) 22 + } 23 + err = rm.storeSignet(signet, ctx) 24 + if err != nil { 25 + return errors.New("failed to store signet: " + err.Error()) 26 + } 27 + err = rm.forwardSignet(signet, uri) 28 + if err != nil { 29 + return errors.New("failed to forward signet: " + err.Error()) 30 + } 31 + return nil 32 + } 33 + 34 + func (rm *RecordManager) validateSignet(e lrcpb.Event_Init, uri string) (*lex.SignetRecord, *time.Time, error) { 35 + signet := lex.SignetRecord{} 36 + handle := e.Init.ExternalID 37 + if handle == nil { 38 + h := "" 39 + handle = &h 40 + } 41 + signet.AuthorHandle = *handle 42 + if e.Init.Id == nil { 43 + return nil, nil, errors.New("ID should not be nil") 44 + } 45 + lrcid := uint64(*e.Init.Id) 46 + signet.LRCID = lrcid 47 + signet.ChannelURI = uri 48 + now := syntax.DatetimeNow() 49 + nowTime := now.Time() 50 + nowString := now.String() 51 + signet.StartedAt = &nowString 52 + return &signet, &nowTime, nil 53 + } 54 + 55 + func (rm *RecordManager) createSignet(lsr *lex.SignetRecord, now *time.Time, id uint32, ctx context.Context) (*types.Signet, error) { 56 + cid, recorduri, err := rm.myClient.CreateXCVRSignet(lsr, ctx) 57 + if err != nil { 58 + return nil, errors.New("couldn't create signet: " + err.Error()) 59 + } 60 + if now == nil { 61 + return nil, errors.New("wasn't provided time") 62 + } 63 + sr := types.Signet{ 64 + URI: recorduri, 65 + IssuerDID: atputils.GetMyDid(), 66 + AuthorHandle: lsr.AuthorHandle, 67 + ChannelURI: lsr.ChannelURI, 68 + MessageID: id, 69 + CID: cid, 70 + StartedAt: *now, 71 + } 72 + return &sr, nil 73 + } 74 + 75 + func (rm *RecordManager) storeSignet(signet *types.Signet, ctx context.Context) error { 76 + return rm.db.StoreSignet(signet, ctx) 77 + } 78 + 79 + func (rm *RecordManager) forwardSignet(signet *types.Signet, uri string) error { 80 + return rm.broadcaster.BroadcastSignet(uri, signet) 81 + }