backend for xcvr appview
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add create update delete for jetstream on profiles, channels, messages, signets

rachel-mp4 076f0137 82e374f2

+271 -11
+180 -4
server/internal/atplistener/jetstream.go
··· 10 10 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 11 11 "github.com/bluesky-social/jetstream/pkg/models" 12 12 "time" 13 + "xcvr-backend/internal/atputils" 13 14 "xcvr-backend/internal/db" 14 15 "xcvr-backend/internal/lex" 15 16 "xcvr-backend/internal/log" ··· 24 25 25 26 type handler struct { 26 27 db *db.Store 27 - l *log.Logger 28 + l *log.Logger 28 29 } 29 30 30 31 func NewConsumer(jsAddr string, l *log.Logger, db *db.Store) *Consumer { ··· 71 72 return h.handleProfile(ctx, event) 72 73 case "org.xcvr.feed.channel": 73 74 return h.handleChannel(ctx, event) 75 + case "org.xcvr.lrc.message": 76 + return h.handleMessage(ctx, event) 77 + case "org.xcvr.lrc.signet": 78 + return h.handleSignet(ctx, event) 74 79 } 75 80 return nil 76 81 } 77 82 78 83 func (h *handler) handleProfile(ctx context.Context, event *models.Event) error { 79 84 h.l.Deprintln("handling profile") 85 + switch event.Commit.Operation { 86 + case "create", "update": 87 + return h.handleProfileCreateUpdate(ctx, event) 88 + case "delete": 89 + return h.handleProfileDelete(ctx, event) 90 + } 91 + return errors.New("unsupported commit operation") 92 + } 93 + 94 + func (h *handler) handleProfileCreateUpdate(ctx context.Context, event *models.Event) error { 80 95 var pr lex.ProfileRecord 81 96 err := json.Unmarshal(event.Commit.Record, &pr) 82 97 if err != nil { ··· 96 111 return h.db.UpdateProfile(to, ctx) 97 112 } 98 113 114 + func (h *handler) handleProfileDelete(ctx context.Context, event *models.Event) error { 115 + return h.db.DeleteProfile(event.Did, event.Commit.CID, ctx) 116 + } 117 + 99 118 func (h *handler) handleChannel(ctx context.Context, event *models.Event) error { 119 + h.l.Deprintln("handling channel") 120 + switch event.Commit.Operation { 121 + case "create": 122 + return h.handleChannelCreate(ctx, event) 123 + case "update": 124 + return h.handleChannelUpdate(ctx, event) 125 + case "delete": 126 + return h.handleChannelDelete(ctx, event) 127 + } 128 + return nil 129 + } 130 + 131 + func (h *handler) handleChannelCreate(ctx context.Context, event *models.Event) error { 132 + channel, err := parseChannelRecord(event) 133 + if err != nil { 134 + return errors.New("i couldn't create the channel: " + err.Error()) 135 + } 136 + return h.db.StoreChannel(channel, ctx) 137 + } 138 + 139 + func (h *handler) handleChannelUpdate(ctx context.Context, event *models.Event) error { 140 + channel, err := parseChannelRecord(event) 141 + if err != nil { 142 + return errors.New("i couldn't create the channel: " + err.Error()) 143 + } 144 + return h.db.UpdateChannel(channel, ctx) 145 + } 146 + 147 + func parseChannelRecord(event *models.Event) (*types.Channel, error) { 100 148 var cr lex.ChannelRecord 101 149 err := json.Unmarshal(event.Commit.Record, &cr) 102 150 if err != nil { 103 - return errors.New("error unmarshl: " + err.Error()) 151 + return nil, errors.New("error unmarshl: " + err.Error()) 104 152 } 105 153 then, err := syntax.ParseDatetimeTime(cr.CreatedAt) 106 154 if err != nil { 107 155 then = time.Now() 108 156 } 109 157 channel := types.Channel{ 110 - URI: fmt.Sprintf("at://%s/org.xcvr.feed.channel/%s", event.Did, event.Commit.RKey), 158 + URI: URI(event), 111 159 CID: event.Commit.CID, 112 160 DID: event.Did, 113 161 Host: cr.Host, ··· 115 163 Topic: cr.Topic, 116 164 CreatedAt: then, 117 165 } 118 - return h.db.StoreChannel(channel, ctx) 166 + return &channel, nil 167 + } 168 + 169 + func (h *handler) handleChannelDelete(ctx context.Context, event *models.Event) error { 170 + return h.db.DeleteChannel(URI(event), ctx) 171 + } 172 + 173 + func (h *handler) handleMessage(ctx context.Context, event *models.Event) error { 174 + h.l.Deprintln("handling message") 175 + switch event.Commit.Operation { 176 + case "create": 177 + return h.handleMessageCreate(ctx, event) 178 + case "update": 179 + return h.handleMessageUpdate(ctx, event) 180 + case "delete": 181 + return h.handleMessageDelete(ctx, event) 182 + } 183 + return errors.New("unimplemented Operation") 184 + } 185 + 186 + func (h *handler) handleMessageCreate(ctx context.Context, event *models.Event) error { 187 + message, err := parseMessageRecord(event) 188 + if err != nil { 189 + return errors.New("error parsing: " + err.Error()) 190 + } 191 + return h.db.StoreMessage(message, ctx) 192 + } 193 + 194 + func (h *handler) handleMessageUpdate(ctx context.Context, event *models.Event) error { 195 + message, err := parseMessageRecord(event) 196 + if err != nil { 197 + return errors.New("error parsing: " + err.Error()) 198 + } 199 + return h.db.UpdateMessage(message, ctx) 200 + } 201 + 202 + func (h *handler) handleMessageDelete(ctx context.Context, event *models.Event) error { 203 + return h.db.DeleteMessage(URI(event), ctx) 204 + } 205 + 206 + func parseMessageRecord(event *models.Event) (*types.Message, error) { 207 + var mr lex.MessageRecord 208 + err := json.Unmarshal(event.Commit.Record, &mr) 209 + if err != nil { 210 + return nil, errors.New("error unmarshl: " + err.Error()) 211 + } 212 + then, err := syntax.ParseDatetimeTime(mr.PostedAt) 213 + if err != nil { 214 + then = time.Now() 215 + } 216 + var color *uint32 217 + if mr.Color != nil { 218 + c := uint32(*mr.Color) 219 + color = &c 220 + } 221 + message := types.Message{ 222 + URI: URI(event), 223 + CID: event.Commit.CID, 224 + DID: event.Did, 225 + SignetURI: mr.SignetURI, 226 + Body: mr.Body, 227 + Nick: mr.Nick, 228 + Color: color, 229 + PostedAt: then, 230 + } 231 + return &message, nil 232 + } 233 + 234 + func (h *handler) handleSignet(ctx context.Context, event *models.Event) error { 235 + h.l.Deprintln("handling signet") 236 + switch event.Commit.Operation { 237 + case "create": 238 + return h.handleSignetCreate(ctx, event) 239 + case "update": 240 + return h.handleSignetUpdate(ctx, event) 241 + case "delete": 242 + return h.handleSignetDelete(ctx, event) 243 + } 244 + return errors.New("unimplemented Operation") 245 + } 246 + 247 + func (h *handler) handleSignetCreate(ctx context.Context, event *models.Event) error { 248 + signet, err := parseSignetRecord(event) 249 + if err != nil { 250 + return errors.New("failed to parse: " + err.Error()) 251 + } 252 + return h.db.StoreSignet(signet, ctx) 253 + } 254 + 255 + func (h *handler) handleSignetUpdate(ctx context.Context, event *models.Event) error { 256 + signet, err := parseSignetRecord(event) 257 + if err != nil { 258 + return errors.New("failed to parse: " + err.Error()) 259 + } 260 + return h.db.UpdateSignet(signet, ctx) 261 + } 262 + func (h *handler) handleSignetDelete(ctx context.Context, event *models.Event) error { 263 + return h.db.DeleteSignet(URI(event), ctx) 264 + } 265 + 266 + func parseSignetRecord(event *models.Event) (*types.Signet, error) { 267 + var sr lex.SignetRecord 268 + err := json.Unmarshal(event.Commit.Record, &sr) 269 + if err != nil { 270 + return nil, errors.New("error unmarshl: " + err.Error()) 271 + } 272 + var then time.Time 273 + if sr.StartedAt != nil { 274 + then, err = syntax.ParseDatetimeTime(*sr.StartedAt) 275 + if err != nil { 276 + then = time.Now() 277 + } 278 + } else { 279 + then = time.Now() 280 + } 281 + signet := types.Signet{ 282 + URI: fmt.Sprintf("at://%s/org.xcvr.feed.channel/%s", event.Did, event.Commit.RKey), 283 + CID: event.Commit.CID, 284 + IssuerDID: event.Did, 285 + DID: sr.Author, 286 + ChannelURI: sr.ChannelURI, 287 + MessageID: uint32(sr.LRCID), 288 + StartedAt: then, 289 + } 290 + return &signet, nil 291 + } 292 + 293 + func URI(event *models.Event) string { 294 + return atputils.URI(event.Did, event.Commit.Collection, event.Commit.RKey) 119 295 }
+9
server/internal/atputils/uri.go
··· 1 + package atputils 2 + 3 + import ( 4 + "fmt" 5 + ) 6 + 7 + func URI(did string, collection string, rkey string) string { 8 + return fmt.Sprintf("at://%s/%s/%s", did, collection, rkey) 9 + }
+78 -3
server/internal/db/lexicon.go
··· 88 88 return nil 89 89 } 90 90 91 + func (s *Store) DeleteProfile(did string, cid string, ctx context.Context) error { 92 + _, err := s.pool.Exec(ctx, ` 93 + DELETE FROM profiles p WHERE p.DID = $1 AND p.CID = $2 94 + `, did, cid) 95 + return err 96 + } 97 + 91 98 func (s *Store) GetProfileView(did string, ctx context.Context) (*types.ProfileView, error) { 92 99 row := s.pool.QueryRow(ctx, `SELECT 93 100 p.display_name, ··· 111 118 return &p, nil 112 119 } 113 120 114 - func (s *Store) StoreChannel(channel types.Channel, ctx context.Context) error { 121 + func (s *Store) StoreChannel(channel *types.Channel, ctx context.Context) error { 115 122 _, err := s.pool.Exec(ctx, ` 116 123 INSERT INTO channels ( 117 124 uri, ··· 128 135 return err 129 136 } 130 137 131 - func (s *Store) StoreMessage(message types.Message, ctx context.Context) error { 138 + func (s *Store) UpdateChannel(channel *types.Channel, ctx context.Context) error { 139 + _, err := s.pool.Exec(ctx, ` 140 + INSERT INTO channels ( 141 + uri, 142 + cid, 143 + did, 144 + host, 145 + title, 146 + topic, 147 + created_at 148 + ) VALUES ( 149 + $1, $2, $3, $4, $5, $6, $7 150 + )`, channel.URI, channel.CID, channel.DID, channel.Host, channel.Title, channel.Topic, channel.CreatedAt) 151 + return err 152 + } 153 + 154 + func (s *Store) DeleteMessage(uri string, ctx context.Context) error { 155 + _, err := s.pool.Exec(ctx, ` 156 + DELETE FROM messages m WHERE m.uri = $1 157 + `, uri) 158 + return err 159 + } 160 + 161 + func (s *Store) StoreMessage(message *types.Message, ctx context.Context) error { 162 + _, err := s.pool.Exec(ctx, ` 163 + INSERT INTO messages ( 164 + uri, 165 + cid, 166 + did, 167 + signet_uri, 168 + body, 169 + nick, 170 + color, 171 + posted_at 172 + ) VALUES ( 173 + $1, $2, $3, $4, $5, $6, $7, $8 174 + ) ON CONFLICT (uri) DO NOTHING 175 + `, message.URI, message.CID, message.DID, message.SignetURI, message.Body, message.Nick, message.Color, message.PostedAt) 176 + return err 177 + } 178 + 179 + func (s *Store) UpdateMessage(message *types.Message, ctx context.Context) error { 132 180 _, err := s.pool.Exec(ctx, ` 133 181 INSERT INTO messages ( 134 182 uri, ··· 156 204 return signetUri, nil 157 205 } 158 206 159 - func (s *Store) StoreSignet(signet types.Signet, ctx context.Context) error { 207 + func (s *Store) StoreSignet(signet *types.Signet, ctx context.Context) error { 160 208 _, err := s.pool.Exec(ctx, ` 161 209 INSERT INTO signets ( 162 210 uri, ··· 175 223 } 176 224 return err 177 225 } 226 + 227 + func (s *Store) UpdateSignet(signet *types.Signet, ctx context.Context) error { 228 + _, err := s.pool.Exec(ctx, ` 229 + INSERT INTO signets ( 230 + uri, 231 + issuer_did, 232 + did, 233 + channel_uri, 234 + message_id, 235 + cid, 236 + started_at 237 + ) VALUES ( 238 + $1, $2, $3, $4, $5, $6, $7 239 + ) 240 + `, signet.URI, signet.IssuerDID, signet.DID, signet.ChannelURI, signet.MessageID, signet.CID, signet.StartedAt) 241 + if err != nil { 242 + err = errors.New("SOMETHING BAD HAPPENED: " + err.Error()) 243 + } 244 + return err 245 + } 246 + 247 + func (s *Store) DeleteSignet(uri string, ctx context.Context) error { 248 + _, err := s.pool.Exec(ctx, ` 249 + DELETE FROM signets s WHERE s.uri = $1 250 + `, uri) 251 + return err 252 + }
+3 -3
server/internal/handler/lrcHandlers.go
··· 60 60 CreatedAt: *now, 61 61 IndexedAt: time.Now(), 62 62 } 63 - err = h.db.StoreChannel(channel, r.Context()) 63 + err = h.db.StoreChannel(&channel, r.Context()) 64 64 if err != nil { 65 65 h.serverError(w, errors.New("well... the record posted but i couldn't store it: "+err.Error())) 66 66 return ··· 119 119 CreatedAt: *now, 120 120 IndexedAt: time.Now(), 121 121 } 122 - err = h.db.StoreChannel(channel, r.Context()) 122 + err = h.db.StoreChannel(&channel, r.Context()) 123 123 if err != nil { 124 124 h.serverError(w, errors.New("sooo... the record posted but i couldn't store it: "+err.Error())) 125 125 return ··· 212 212 Color: coloruint32ptr, 213 213 PostedAt: *now, 214 214 } 215 - err = h.db.StoreMessage(message, r.Context()) 215 + err = h.db.StoreMessage(&message, r.Context()) 216 216 if err != nil { 217 217 h.serverError(w, errors.New("sooo... the record posted but i couldn't store it: "+err.Error())) 218 218 return
+1 -1
server/internal/model/channel.go
··· 175 175 CID: cid, 176 176 StartedAt: nowTime, 177 177 } 178 - err = m.store.StoreSignet(sr, context.Background()) 178 + err = m.store.StoreSignet(&sr, context.Background()) 179 179 if err != nil { 180 180 m.logger.Println("failed to store signet!" + err.Error()) 181 181 }