backend for xcvr appview
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

make jetstream use recordmanager

rachel-mp4 b8b628f6 12c0e55b

+178 -47
+10 -44
server/internal/atplistener/jetstream.go
··· 105 105 if err != nil { 106 106 return errors.New("error unmarshaling: " + err.Error()) 107 107 } 108 - to := db.ProfileUpdate{ 109 - DID: event.Did, 110 - } 111 - to.UpdateName = pr.DisplayName != nil 112 - to.Name = pr.DisplayName 113 - to.UpdateNick = pr.DefaultNick != nil 114 - to.Nick = pr.DefaultNick 115 - to.UpdateStatus = pr.Status != nil 116 - to.Status = pr.Status 117 - to.UpdateColor = pr.Color != nil 118 - to.Color = pr.Color 119 - return h.db.UpdateProfile(&to, ctx) 108 + return h.rm.AcceptProfile(pr, event.Did, ctx) 120 109 } 121 110 122 111 func (h *handler) handleProfileDelete(ctx context.Context, event *models.Event) error { 123 - return h.db.DeleteProfile(event.Did, event.Commit.CID, ctx) 112 + return h.rm.DeleteProfile(event.Did, event.Commit.CID, ctx) 124 113 } 125 114 126 115 func (h *handler) handleChannel(ctx context.Context, event *models.Event) error { ··· 149 138 if err != nil { 150 139 return errors.New("i couldn't create the channel: " + err.Error()) 151 140 } 152 - return h.db.UpdateChannel(channel, ctx) 141 + return h.rm.AcceptChannelUpdate(channel, ctx) 153 142 } 154 143 155 144 func parseChannelRecord(event *models.Event) (*types.Channel, error) { ··· 175 164 } 176 165 177 166 func (h *handler) handleChannelDelete(ctx context.Context, event *models.Event) error { 178 - return h.db.DeleteChannel(URI(event), ctx) 167 + return h.rm.AcceptChannelDelete(URI(event), ctx) 179 168 } 180 169 181 170 func (h *handler) handleMessage(ctx context.Context, event *models.Event) error { ··· 196 185 if err != nil { 197 186 return errors.New("error parsing: " + err.Error()) 198 187 } 199 - return h.db.StoreMessage(message, ctx) 188 + return h.rm.AcceptMessage(message, ctx) 200 189 } 201 190 202 191 func (h *handler) handleMessageUpdate(ctx context.Context, event *models.Event) error { ··· 204 193 if err != nil { 205 194 return errors.New("error parsing: " + err.Error()) 206 195 } 207 - host, _ := atputils.DidFromUri(message.SignetURI) 208 - rkey, err := atputils.RkeyFromUri(message.SignetURI) 209 - if err != nil { 210 - return errors.New("i think the record is borked ngl") 211 - } 212 - if host == atputils.GetMyDid() { 213 - dne, err := h.cli.DeleteXCVRSignet(rkey, ctx) 214 - if err != nil { 215 - if dne { 216 - err = h.db.DeleteSignet(message.SignetURI, ctx) 217 - if err != nil { 218 - return errors.New("a lot of stuff happened yikers!" + err.Error()) 219 - } 220 - return nil 221 - } 222 - return errors.New("failed to delete signet after infetterance: " + err.Error()) 223 - } 224 - err = h.db.DeleteSignet(message.SignetURI, ctx) 225 - if err != nil { 226 - return errors.New("i deleted the signet, however i couldn't delete it from my db: " + err.Error()) 227 - } 228 - return nil 229 - } 230 - return h.db.UpdateMessage(message, ctx) 196 + return h.rm.AcceptMessageUpdate(message, event.Did, ctx) 231 197 } 232 198 233 199 func (h *handler) handleMessageDelete(ctx context.Context, event *models.Event) error { 234 - return h.db.DeleteMessage(URI(event), ctx) 200 + return h.rm.AcceptMessageDelete(URI(event), ctx) 235 201 } 236 202 237 203 func parseMessageRecord(event *models.Event) (*types.Message, error) { ··· 280 246 if err != nil { 281 247 return errors.New("failed to parse: " + err.Error()) 282 248 } 283 - return h.db.StoreSignet(signet, ctx) 249 + return h.rm.AcceptSignet(signet, ctx) 284 250 } 285 251 286 252 func (h *handler) handleSignetUpdate(ctx context.Context, event *models.Event) error { ··· 288 254 if err != nil { 289 255 return errors.New("failed to parse: " + err.Error()) 290 256 } 291 - return h.db.UpdateSignet(signet, ctx) 257 + return h.rm.AcceptSignetUpdate(signet, ctx) 292 258 } 293 259 func (h *handler) handleSignetDelete(ctx context.Context, event *models.Event) error { 294 - return h.db.DeleteSignet(URI(event), ctx) 260 + return h.rm.AcceptSignetDelete(URI(event), ctx) 295 261 } 296 262 297 263 func parseSignetRecord(event *models.Event) (*types.Signet, error) {
+34
server/internal/model/channel.go
··· 111 111 return nil 112 112 } 113 113 114 + func (m *Model) UpdateChannel(c *types.Channel) error { 115 + cm, ok := m.uriMap[c.URI] 116 + if !ok { 117 + return m.AddChannel(c) 118 + } 119 + valid := (c.Host == os.Getenv("my_IDENTITY")) 120 + if valid != cm.valid { 121 + if valid { 122 + cm.valid = true 123 + } else { 124 + cm.valid = false 125 + cm.cancel() 126 + } 127 + } 128 + var welcome string 129 + if c.Topic == nil { 130 + welcome = "and now you're connected" 131 + } else { 132 + welcome = *c.Topic 133 + } 134 + cm.welcome = welcome 135 + return nil 136 + } 137 + 138 + func (m *Model) DeleteChannel(uri string) error { 139 + cm, ok := m.uriMap[uri] 140 + if !ok { 141 + return nil 142 + } 143 + delete(m.uriMap, uri) 144 + cm.cancel() 145 + return nil 146 + } 147 + 114 148 func (m *Model) getServer(uri string) (*lrcd.Server, error) { 115 149 m.mu.Lock() 116 150 defer m.mu.Unlock()
+9 -3
server/internal/recordmanager/channel.go
··· 10 10 "time" 11 11 ) 12 12 13 - // TODO: make sure initchannel works correctly 14 13 func (rm *RecordManager) AcceptChannel(c *types.Channel, ctx context.Context) error { 15 14 err := rm.storeChannel(c, ctx) 16 15 if err != nil { ··· 33 32 return errors.New("failed to update channel model: " + err.Error()) 34 33 } 35 34 return nil 35 + } 36 + 37 + func (rm *RecordManager) AcceptChannelDelete(uri string, ctx context.Context) error { 38 + err := rm.db.DeleteChannel(uri, ctx) 39 + if err != nil { 40 + return errors.New("failed to delete channel: " + err.Error()) 41 + } 42 + return rm.broadcaster.DeleteChannel(uri) 36 43 } 37 44 38 45 func (rm *RecordManager) PostMyChannel(ctx context.Context, pcr *types.PostChannelRequest) (did string, uri string, err error) { ··· 81 88 return rm.db.UpdateChannel(c, ctx) 82 89 } 83 90 84 - // TODO: impl updatechannel 85 91 func (rm *RecordManager) updateChannelmodel(c *types.Channel) error { 86 - return rm.broadcaster.AddChannel(c) 92 + return rm.broadcaster.UpdateChannel(c) 87 93 } 88 94 89 95 func (rm *RecordManager) createChannel(id int, did string) func(*lex.ChannelRecord, *time.Time, context.Context) (*types.Channel, error) {
+59
server/internal/recordmanager/message.go
··· 13 13 "time" 14 14 ) 15 15 16 + func (rm *RecordManager) AcceptMessage(m *types.Message, ctx context.Context) error { 17 + err := rm.storeMessage(m, ctx) 18 + if err != nil { 19 + return errors.New("failed to store message: " + err.Error()) 20 + } 21 + err = rm.forwardMessage(m, ctx) 22 + if err != nil { 23 + return errors.New("failed to forward message: " + err.Error()) 24 + } 25 + return nil 26 + } 27 + 28 + func (rm *RecordManager) AcceptMessageUpdate(m *types.Message, did string, ctx context.Context) error { 29 + err := rm.updateMessage(m, ctx) 30 + if err != nil { 31 + return errors.New("failed to store message: " + err.Error()) 32 + } 33 + err = rm.checkInterference(m, did, ctx) 34 + if err != nil { 35 + return errors.New("error while checking interference: " + err.Error()) 36 + } 37 + return nil 38 + } 39 + 40 + func (rm *RecordManager) AcceptMessageDelete(uri string, ctx context.Context) error { 41 + err := rm.db.DeleteMessage(uri, ctx) 42 + if err != nil { 43 + return errors.New("failed to delete message: " + err.Error()) 44 + } 45 + return nil 46 + } 47 + 48 + func (rm *RecordManager) checkInterference(m *types.Message, did string, ctx context.Context) error { 49 + handle, err := rm.db.QuerySignetHandle(m.SignetURI, ctx) 50 + if err != nil { 51 + return errors.New("couldn't find signet") 52 + } 53 + sdid, err := atputils.DidFromUri(m.SignetURI) 54 + if sdid != atputils.GetMyDid() { 55 + return nil 56 + } 57 + mhandle, err := rm.db.ResolveDid(did, ctx) 58 + if err != nil { 59 + return errors.New("couldn't resolve mhandle") 60 + } 61 + if handle != mhandle { 62 + return nil 63 + } 64 + err = rm.DeleteSignet(m.SignetURI, ctx) 65 + if err != nil { 66 + return errors.New("failed to delete signet") 67 + } 68 + return nil 69 + } 70 + 16 71 func (rm *RecordManager) PostMessage(id int, udid string, ctx context.Context, pmr *types.PostMessageRequest) error { 17 72 lmr, now, _, _, err := rm.validateMessage(pmr, ctx) 18 73 if err != nil { ··· 120 175 PostedAt: *now, 121 176 } 122 177 return message, nil 178 + } 179 + 180 + func (rm *RecordManager) updateMessage(m *types.Message, ctx context.Context) error { 181 + return rm.db.UpdateMessage(m, ctx) 123 182 } 124 183 125 184 func (rm *RecordManager) storeMessage(m *types.Message, ctx context.Context) error {
+32
server/internal/recordmanager/profile.go
··· 9 9 "rvcx/internal/types" 10 10 ) 11 11 12 + func (rm *RecordManager) AcceptProfile(p lex.ProfileRecord, did string, ctx context.Context) error { 13 + pu := convertToPu(p, did) 14 + err := rm.storeProfile(pu, ctx) 15 + if err != nil { 16 + return errors.New("failed to store profile: " + err.Error()) 17 + } 18 + return nil 19 + } 20 + 21 + func (rm *RecordManager) DeleteProfile(did string, cid string, ctx context.Context) error { 22 + return rm.db.DeleteProfile(did, cid, ctx) 23 + } 24 + 25 + func convertToPu(p lex.ProfileRecord, did string) *db.ProfileUpdate { 26 + avatar := p.Avatar.Ref.String() 27 + return &db.ProfileUpdate{ 28 + DID: did, 29 + Name: p.DisplayName, 30 + UpdateName: true, 31 + Nick: p.DefaultNick, 32 + UpdateNick: true, 33 + Status: p.Status, 34 + UpdateStatus: true, 35 + Color: p.Color, 36 + UpdateColor: true, 37 + Avatar: &avatar, 38 + UpdateAvatar: true, 39 + Mime: &p.Avatar.MimeType, 40 + UpdateMime: true, 41 + } 42 + } 43 + 12 44 func (rm *RecordManager) CreateInitialProfile(did string, id int, ctx context.Context) error { 13 45 nick := "wanderer" 14 46 status := "just setting up my xcvr"
+2
server/internal/recordmanager/recordmanager.go
··· 14 14 BroadcastSignet(uri string, s *types.Signet) error 15 15 BroadcastMessage(uri string, m *types.Message) error 16 16 AddChannel(c *types.Channel) error 17 + UpdateChannel(c *types.Channel) error 18 + DeleteChannel(uri string) error 17 19 } 18 20 19 21 type RecordManager struct {
+32
server/internal/recordmanager/signet.go
··· 31 31 return nil 32 32 } 33 33 34 + func (rm *RecordManager) DeleteSignet(uri string, ctx context.Context) error { 35 + rkey, err := atputils.RkeyFromUri(uri) 36 + if err != nil { 37 + return errors.New("invalid signet uri: " + err.Error()) 38 + } 39 + _, err = rm.myClient.DeleteXCVRSignet(rkey, ctx) 40 + if err != nil { 41 + return errors.New("failed to delete signet record from repo: " + err.Error()) 42 + } 43 + err = rm.db.DeleteSignet(uri, ctx) 44 + if err != nil { 45 + return errors.New("failed to delete signet from database") 46 + } 47 + return nil 48 + } 49 + 50 + func (rm *RecordManager) AcceptSignet(s *types.Signet, ctx context.Context) error { 51 + err := rm.storeSignet(s, ctx) 52 + if err != nil { 53 + return errors.New("failed to store signet") 54 + } 55 + return nil 56 + } 57 + 58 + func (rm *RecordManager) AcceptSignetDelete(uri string, ctx context.Context) error { 59 + return rm.db.DeleteSignet(uri, ctx) 60 + } 61 + 62 + func (rm *RecordManager) AcceptSignetUpdate(s *types.Signet, ctx context.Context) error { 63 + return rm.db.UpdateSignet(s, ctx) 64 + } 65 + 34 66 func (rm *RecordManager) validateSignet(e lrcpb.Event_Init, uri string) (*lex.SignetRecord, *time.Time, error) { 35 67 signet := lex.SignetRecord{} 36 68 handle := e.Init.ExternalID