backend for xcvr appview
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add stuff to allow for lexicon event stream in thread

rachel-mp4 7c5343ce a177c56b

+451 -70
+36 -5
lexicons/org/xcvr/lrc/defs.json
··· 4 4 "defs": { 5 5 "messageView": { 6 6 "type": "object", 7 - "required": ["uri", "author", "body", "startedAt", "postedAt"], 7 + "required": ["uri", "author", "body", "signetURI", "postedAt"], 8 8 "properties": { 9 9 "uri": { 10 10 "type": "string", ··· 15 15 "ref": "org.xcvr.actor.defs#profileView" 16 16 }, 17 17 "body": { 18 - "type": "string", 18 + "type": "string" 19 19 }, 20 20 "nick": { 21 21 "type": "string", ··· 26 26 "minimum": 0, 27 27 "maximum": 16777215 28 28 }, 29 - "startedAt": { 29 + "signetURI": { 30 + "type": "string", 31 + "format": "at-uri" 32 + }, 33 + "postedAt": { 30 34 "type": "string", 31 35 "format": "datetime" 36 + } 37 + } 38 + }, 39 + "signetView": { 40 + "type": "object", 41 + "required": ["uri", "issuer", "channelURI", "lrcID", "author", "startedAt"], 42 + "properties": { 43 + "uri": { 44 + "type": "string", 45 + "format": "at-uri" 32 46 }, 33 - "postedAt": { 47 + "issuer": { 48 + "type": "string", 49 + "format": "handle" 50 + }, 51 + "channelURI": { 52 + "type": "string", 53 + "format": "at-uri" 54 + }, 55 + "lrcID": { 56 + "type": "integer", 57 + "minimum": 0, 58 + "maximum": 4294967295 59 + }, 60 + "author": { 61 + "type": "string", 62 + "format": "handle" 63 + }, 64 + "startedAt": { 34 65 "type": "string", 35 66 "format": "datetime" 36 67 } 37 68 } 38 69 } 39 70 } 40 - } 71 + }
+1 -1
migrations/001_init.up.sql
··· 37 37 CREATE TABLE signets ( 38 38 uri TEXT PRIMARY KEY, 39 39 issuer_did TEXT NOT NULL, 40 - did TEXT NOT NULL, 40 + author_handle TEXT NOT NULL, 41 41 channel_uri TEXT NOT NULL, 42 42 FOREIGN KEY (channel_uri) REFERENCES channels(uri) ON DELETE CASCADE, 43 43 message_id INTEGER CHECK (message_id BETWEEN 0 AND 4294967295),
+3 -3
migrations/002_populate.up.sql
··· 18 18 ('at://did:example:alice/org.xcvr.feed.channel/general', 'chanCid1', 'did:example:alice', 'xcvr.org', 'General Chat', 'All-purpose chatter', now() - interval '2 days'), 19 19 ('at://did:example:bob/org.xcvr.feed.channel/help', 'chanCid2', 'did:example:bob', 'xcvr.org', 'Help Channel', 'Support and help', now() - interval '1 day'); 20 20 21 - INSERT INTO signets (uri, issuer_did, did, channel_uri, message_id, cid) 21 + INSERT INTO signets (uri, issuer_did, author_handle, channel_uri, message_id, cid) 22 22 VALUES 23 - ('at://did:example:xcvr/org.xcvr.lrc.signet/signet1', 'did:example:xcvr', 'did:example:alice', 'at://did:example:alice/org.xcvr.feed.channel/general', 1, 'signetCid1'), 24 - ('at://did:example:xcvr/org.xcvr.lrc.signet/signet2', 'did:example:xcvr', 'did:example:bob', 'at://did:example:bob/org.xcvr.feed.channel/help', 2, 'signetCid2'); 23 + ('at://did:example:xcvr/org.xcvr.lrc.signet/signet1', 'did:example:xcvr', 'alice.com', 'at://did:example:alice/org.xcvr.feed.channel/general', 1, 'signetCid1'), 24 + ('at://did:example:xcvr/org.xcvr.lrc.signet/signet2', 'did:example:xcvr', 'bob.net', 'at://did:example:bob/org.xcvr.feed.channel/help', 2, 'signetCid2'); 25 25 26 26 INSERT INTO messages (uri, did, signet_uri, body, nick, color, cid) 27 27 VALUES
+3 -3
server/cmd/main.go
··· 57 57 panic(err) 58 58 } 59 59 h := handler.New(store, logger, oauthclient, xrpc, model) 60 - go consumeLoop(context.Background(), store, logger) 60 + go consumeLoop(context.Background(), store, logger, xrpc) 61 61 http.ListenAndServe(":8080", h.WithCORSAll()) 62 62 63 63 } ··· 66 66 defaultServerAddr = "wss://jetstream.atproto.tools/subscribe" 67 67 ) 68 68 69 - func consumeLoop(ctx context.Context, db *db.Store, l *log.Logger) { 69 + func consumeLoop(ctx context.Context, db *db.Store, l *log.Logger, cli *oauth.PasswordClient) { 70 70 jsServerAddr := os.Getenv("JS_SERVER_ADDR") 71 71 if jsServerAddr == "" { 72 72 jsServerAddr = defaultServerAddr 73 73 } 74 - consumer := atplistener.NewConsumer(jsServerAddr, l, db) 74 + consumer := atplistener.NewConsumer(jsServerAddr, l, db, cli) 75 75 for { 76 76 err := consumer.Consume(ctx) 77 77 if err != nil {
+36 -11
server/internal/atplistener/jetstream.go
··· 14 14 "xcvr-backend/internal/db" 15 15 "xcvr-backend/internal/lex" 16 16 "xcvr-backend/internal/log" 17 + "xcvr-backend/internal/oauth" 17 18 "xcvr-backend/internal/types" 18 19 ) 19 20 ··· 24 25 } 25 26 26 27 type handler struct { 27 - db *db.Store 28 - l *log.Logger 28 + db *db.Store 29 + l *log.Logger 30 + cli *oauth.PasswordClient 29 31 } 30 32 31 - func NewConsumer(jsAddr string, l *log.Logger, db *db.Store) *Consumer { 33 + func NewConsumer(jsAddr string, l *log.Logger, db *db.Store, cli *oauth.PasswordClient) *Consumer { 32 34 cfg := client.DefaultClientConfig() 33 35 if jsAddr != "" { 34 36 cfg.WebsocketURL = jsAddr ··· 43 45 return &Consumer{ 44 46 cfg: cfg, 45 47 logger: l, 46 - handler: &handler{db: db, l: l}, 48 + handler: &handler{db: db, l: l, cli: cli}, 47 49 } 48 50 } 49 51 ··· 196 198 if err != nil { 197 199 return errors.New("error parsing: " + err.Error()) 198 200 } 201 + host, _ := atputils.DidFromUri(message.SignetURI) 202 + rkey, err := atputils.RkeyFromUri(message.SignetURI) 203 + if err != nil { 204 + return errors.New("i think the record is borked ngl") 205 + } 206 + if host == atputils.GetMyDid() { 207 + dne, err := h.cli.DeleteXCVRSignet(rkey, ctx) 208 + if err != nil { 209 + if dne { 210 + err = h.db.DeleteSignet(message.SignetURI, ctx) 211 + if err != nil { 212 + return errors.New("a lot of stuff happened yikers!" + err.Error()) 213 + } 214 + return nil 215 + } 216 + return errors.New("failed to delete signet after infetterance: " + err.Error()) 217 + } 218 + err = h.db.DeleteSignet(message.SignetURI, ctx) 219 + if err != nil { 220 + return errors.New("i deleted the signet, however i couldn't delete it from my db: " + err.Error()) 221 + } 222 + return nil 223 + } 199 224 return h.db.UpdateMessage(message, ctx) 200 225 } 201 226 ··· 279 304 then = time.Now() 280 305 } 281 306 signet := types.Signet{ 282 - URI: fmt.Sprintf("at://%s/org.xcvr.feed.channel/%s", event.Did, event.Commit.RKey), 283 - CID: event.Commit.CID, 284 - IssuerDID: event.Did, 285 - DID: sr.Author, 286 - ChannelURI: sr.ChannelURI, 287 - MessageID: uint32(sr.LRCID), 288 - StartedAt: then, 307 + URI: fmt.Sprintf("at://%s/org.xcvr.feed.channel/%s", event.Did, event.Commit.RKey), 308 + CID: event.Commit.CID, 309 + IssuerDID: event.Did, 310 + AuthorHandle: sr.AuthorHandle, 311 + ChannelURI: sr.ChannelURI, 312 + MessageID: uint32(sr.LRCID), 313 + StartedAt: then, 289 314 } 290 315 return &signet, nil 291 316 }
+44
server/internal/atputils/uri.go
··· 1 1 package atputils 2 2 3 3 import ( 4 + "errors" 4 5 "fmt" 6 + "strings" 5 7 ) 6 8 7 9 func URI(did string, collection string, rkey string) string { 8 10 return fmt.Sprintf("at://%s/%s/%s", did, collection, rkey) 9 11 } 12 + 13 + func DidFromUri(uri string) (did string, err error) { 14 + s, err := trimScheme(uri) 15 + if err != nil { 16 + return 17 + } 18 + ss, err := uriFragSplit(s) 19 + if err != nil { 20 + return 21 + } 22 + did = ss[0] 23 + return 24 + } 25 + 26 + func trimScheme(uri string) (string, error) { 27 + s, ok := strings.CutPrefix(uri, "at://") 28 + if !ok { 29 + return "", errors.New("not a uri, missing at:// scheme") 30 + } 31 + return s, nil 32 + } 33 + 34 + func uriFragSplit(urifrag string) ([]string, error) { 35 + ss := strings.Split(urifrag, "/") 36 + if len(ss) != 3 { 37 + return nil, errors.New("not a urifrag, incorrect number of bits") 38 + } 39 + return ss, nil 40 + } 41 + 42 + func RkeyFromUri(uri string) (rkey string, err error) { 43 + s, err := trimScheme(uri) 44 + if err != nil { 45 + return 46 + } 47 + ss, err := uriFragSplit(s) 48 + if err != nil { 49 + return 50 + } 51 + rkey = ss[2] 52 + return 53 + }
+13 -2
server/internal/db/db.go
··· 135 135 } 136 136 137 137 type URIHost struct { 138 - URI string 139 - Host string 138 + URI string 139 + Host string 140 + LastID uint32 140 141 } 141 142 142 143 func (s *Store) GetChannelURIs(ctx context.Context) ([]URIHost, error) { ··· 157 158 if err != nil { 158 159 return nil, err 159 160 } 161 + var maxMessageID uint32 162 + err = s.pool.QueryRow(ctx, ` 163 + SELECT COALESCE(MAX(message_id), 0) 164 + FROM signets 165 + WHERE channel_uri = $1 166 + `, urihost.URI).Scan(&maxMessageID) 167 + if err != nil { 168 + return nil, err 169 + } 170 + urihost.LastID = maxMessageID 160 171 urihosts = append(urihosts, urihost) 161 172 } 162 173 return urihosts, nil
+4 -4
server/internal/db/lexicon.go
··· 209 209 INSERT INTO signets ( 210 210 uri, 211 211 issuer_did, 212 - did, 212 + author_handle, 213 213 channel_uri, 214 214 message_id, 215 215 cid, ··· 217 217 ) VALUES ( 218 218 $1, $2, $3, $4, $5, $6, $7 219 219 ) ON CONFLICT (uri) DO NOTHING 220 - `, signet.URI, signet.IssuerDID, signet.DID, signet.ChannelURI, signet.MessageID, signet.CID, signet.StartedAt) 220 + `, signet.URI, signet.IssuerDID, signet.AuthorHandle, signet.ChannelURI, signet.MessageID, signet.CID, signet.StartedAt) 221 221 if err != nil { 222 222 err = errors.New("SOMETHING BAD HAPPENED: " + err.Error()) 223 223 } ··· 229 229 INSERT INTO signets ( 230 230 uri, 231 231 issuer_did, 232 - did, 232 + AuthorHandle, 233 233 channel_uri, 234 234 message_id, 235 235 cid, ··· 237 237 ) VALUES ( 238 238 $1, $2, $3, $4, $5, $6, $7 239 239 ) 240 - `, signet.URI, signet.IssuerDID, signet.DID, signet.ChannelURI, signet.MessageID, signet.CID, signet.StartedAt) 240 + `, signet.URI, signet.IssuerDID, signet.AuthorHandle, signet.ChannelURI, signet.MessageID, signet.CID, signet.StartedAt) 241 241 if err != nil { 242 242 err = errors.New("SOMETHING BAD HAPPENED: " + err.Error()) 243 243 }
+1
server/internal/handler/handler.go
··· 40 40 mux.HandleFunc("GET /xrpc/org.xcvr.lrc.getMessages", h.getMessages) 41 41 mux.HandleFunc("GET /xrpc/org.xcvr.actor.resolveChannel", h.resolveChannel) 42 42 mux.HandleFunc("GET /xrpc/org.xcvr.actor.getProfileView", h.getProfileView) 43 + mux.HandleFunc("GET /xrpc/org.xcvr.lrc.getLexStream", h.subscribeLexStream) 43 44 // backend metadata handlers 44 45 mux.HandleFunc(clientMetadataPath(), h.serveClientMetadata) 45 46 mux.HandleFunc(clientTOSPath(), h.serveTOS)
+14
server/internal/handler/lrcHandlers.go
··· 240 240 } 241 241 h.getChannels(w, r) 242 242 } 243 + 244 + func (h *Handler) subscribeLexStream(w http.ResponseWriter, r *http.Request) { 245 + rkey := r.URL.Query().Get("rkey") 246 + user := r.URL.Query().Get("user") 247 + uri := fmt.Sprintf("at://%s/org.xcvr.feed.channel/%s", user, rkey) 248 + f, err := h.model.GetLexStreamFrom(uri) 249 + if err != nil { 250 + http.NotFound(w, r) 251 + h.logger.Deprintf("couldn't find user %s's server %s", user, rkey) 252 + h.logger.Println(err.Error()) 253 + return 254 + } 255 + f(w, r) 256 + }
+5 -5
server/internal/lex/lexicons_cbor.go
··· 981 981 return err 982 982 } 983 983 984 - if len(t.Author) > 8192 { 984 + if len(t.AuthorHandle) > 8192 { 985 985 return xerrors.Errorf("Value in field t.Author was too long") 986 986 } 987 987 988 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Author))); err != nil { 988 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.AuthorHandle))); err != nil { 989 989 return err 990 990 } 991 - if _, err := cw.WriteString(string(t.Author)); err != nil { 991 + if _, err := cw.WriteString(string(t.AuthorHandle)); err != nil { 992 992 return err 993 993 } 994 994 ··· 1126 1126 1127 1127 switch string(nameBuf[:nameLen]) { 1128 1128 // t.Author (string) (string) 1129 - case "nick": 1129 + case "authorHandle": 1130 1130 1131 1131 { 1132 1132 sval, err := cbg.ReadStringWithMax(cr, 8192) ··· 1134 1134 return err 1135 1135 } 1136 1136 1137 - t.Author = string(sval) 1137 + t.AuthorHandle = string(sval) 1138 1138 } 1139 1139 // t.LexiconTypeID (string) (string) 1140 1140 case "$type":
+1 -1
server/internal/lex/types.go
··· 41 41 LexiconTypeID string `json:"$type,const=org.xcvr.lrc.signet" cborgen:"$type,const=org.xcvr.lrc.signet"` 42 42 ChannelURI string `json:"channelURI" cborgen:"channelURI"` 43 43 LRCID uint64 `json:"lrcID" cborgen:"lrcID"` 44 - Author string `json:"author" cborgen:"nick"` 44 + AuthorHandle string `json:"authorHandle" cborgen:"authorHandle"` 45 45 StartedAt *string `json:"startedAt,omitempty" cborgen:"startedAt,omitempty"` 46 46 }
+82 -24
server/internal/model/channel.go
··· 21 21 22 22 type Model struct { 23 23 store *db.Store 24 - uriMap map[string]*serverModel 24 + uriMap map[string]*channelModel 25 25 logger *log.Logger 26 26 cli *oauth.PasswordClient 27 27 mu sync.Mutex 28 28 } 29 29 30 + type channelModel struct { 31 + uri string 32 + serverModel *serverModel 33 + streamModel *lexStreamModel 34 + } 35 + 30 36 type serverModel struct { 31 - valid bool 32 - server *lrcd.Server 33 - lastID uint32 34 - initChan chan lrcpb.Event_Init 35 - cancelFunc func() 37 + server *lrcd.Server 38 + lastID uint32 39 + initChan chan lrcpb.Event_Init 40 + ctx context.Context 41 + cancel func() 42 + } 43 + 44 + type lexStreamModel struct { 45 + clients map[*client]bool 46 + clientsmu sync.Mutex 47 + ctx context.Context 48 + cancel func() 49 + signetBus chan types.SignetView 50 + messageBus chan types.MessageView 36 51 } 37 52 38 53 func (m *Model) GetWSHandlerFrom(uri string) (http.HandlerFunc, error) { ··· 43 58 return server.WSHandler(), nil 44 59 } 45 60 61 + func (m *Model) GetLexStreamFrom(uri string) (http.HandlerFunc, error) { 62 + lsm, err := m.getLexStream(uri) 63 + if err != nil { 64 + return nil, err 65 + } 66 + return lsm.WSHandler(uri, m), nil 67 + } 68 + 69 + func (m *Model) getLexStream(uri string) (*lexStreamModel, error) { 70 + m.mu.Lock() 71 + defer m.mu.Unlock() 72 + cm := m.uriMap[uri] 73 + if cm == nil { 74 + return nil, errors.New("Not a valid server") 75 + } 76 + if cm.streamModel == nil { 77 + ctx, cancel := context.WithCancel(context.Background()) 78 + lsm := lexStreamModel{ 79 + clients: make(map[*client]bool), 80 + clientsmu: sync.Mutex{}, 81 + ctx: ctx, 82 + cancel: cancel, 83 + signetBus: make(chan types.SignetView, 10), 84 + messageBus: make(chan types.MessageView, 10), 85 + } 86 + cm.streamModel = &lsm 87 + go lsm.broadcaster() 88 + } 89 + return cm.streamModel, nil 90 + } 91 + 46 92 func Init(store *db.Store, logger *log.Logger, cli *oauth.PasswordClient) *Model { 47 93 uris, err := store.GetChannelURIs(context.Background()) 48 94 if err != nil { 49 95 panic(err) 50 96 } 51 - uriToServerModel := make(map[string]*serverModel, len(uris)) 97 + uriToServerModel := make(map[string]*channelModel, len(uris)) 52 98 myid := os.Getenv("MY_IDENTITY") 53 99 for _, uri := range uris { 54 100 valid := (uri.Host == myid) 55 - beep := serverModel{valid: valid} 101 + beep := channelModel{ 102 + uri: uri.URI, 103 + } 104 + if valid { 105 + beep.serverModel = &serverModel{lastID: uri.LastID} 106 + } 56 107 uriToServerModel[uri.URI] = &beep 57 108 } 58 109 return &Model{ ··· 68 119 m.mu.Lock() 69 120 defer m.mu.Unlock() 70 121 71 - sm := m.uriMap[uri] 72 - if sm == nil { 122 + cm := m.uriMap[uri] 123 + if cm == nil { 73 124 return nil, errors.New("Not a valid server") 74 125 } 75 - if !sm.valid { 126 + sm := cm.serverModel 127 + if sm == nil { 76 128 return nil, errors.New("Not hosted on this backend!") 77 129 } 78 130 ··· 95 147 return nil, errors.New("Error starting server") 96 148 } 97 149 98 - if sm.cancelFunc != nil { 99 - sm.cancelFunc() 150 + if sm.cancel != nil { 151 + m.logger.Println("that's weird, old cancel lying around") 152 + sm.cancel() 100 153 } 101 154 102 155 ctx, cancel := context.WithCancel(context.Background()) 103 156 sm.server = server 104 157 sm.initChan = initChan 105 - sm.cancelFunc = cancel 158 + sm.cancel = cancel 159 + sm.ctx = ctx 106 160 107 161 go m.handleInitEvents(ctx, uri, initChan) 108 162 } ··· 119 173 return 120 174 case <-ticker.C: 121 175 m.mu.Lock() 122 - sm := m.uriMap[uri] 123 - if sm == nil || sm.server == nil { 176 + cm := m.uriMap[uri] 177 + if cm == nil || cm.serverModel == nil || cm.serverModel.server == nil { 124 178 m.mu.Unlock() 125 179 return 126 180 } 181 + sm := cm.serverModel 127 182 128 183 c := sm.server.Connected() 129 184 if c == 0 { ··· 135 190 sm.lastID = lastID 136 191 sm.server = nil 137 192 sm.initChan = nil 193 + sm.cancel() 194 + sm.cancel = nil 138 195 m.mu.Unlock() 139 196 return 140 197 } 141 198 m.mu.Unlock() 142 199 case e, ok := <-initChan: 143 200 if !ok { 201 + m.logger.Println("this is a weird case!") 144 202 return 145 203 } 146 204 signet := lex.SignetRecord{} ··· 149 207 h := "" 150 208 handle = &h 151 209 } 152 - signet.Author = *handle 210 + signet.AuthorHandle = *handle 153 211 if e.Init.Id == nil { 154 212 m.logger.Deprintln("initchannel gave me a nil id") 155 213 continue ··· 167 225 continue 168 226 } 169 227 sr := types.Signet{ 170 - URI: recorduri, 171 - IssuerDID: atputils.GetMyDid(), 172 - DID: signet.Author, 173 - ChannelURI: uri, 174 - MessageID: *e.Init.Id, 175 - CID: cid, 176 - StartedAt: nowTime, 228 + URI: recorduri, 229 + IssuerDID: atputils.GetMyDid(), 230 + AuthorHandle: signet.AuthorHandle, 231 + ChannelURI: uri, 232 + MessageID: *e.Init.Id, 233 + CID: cid, 234 + StartedAt: nowTime, 177 235 } 178 236 err = m.store.StoreSignet(&sr, context.Background()) 179 237 if err != nil {
+145
server/internal/model/channelLexiconStream.go
··· 1 + package model 2 + 3 + import ( 4 + "context" 5 + "github.com/gorilla/websocket" 6 + "net/http" 7 + "xcvr-backend/internal/types" 8 + ) 9 + 10 + type client struct { 11 + conn *websocket.Conn 12 + bus chan any 13 + } 14 + 15 + func (lsm *lexStreamModel) WSHandler(uri string, m *Model) http.HandlerFunc { 16 + return func(w http.ResponseWriter, r *http.Request) { 17 + upgrader := &websocket.Upgrader{ 18 + CheckOrigin: func(r *http.Request) bool { 19 + return true 20 + }, 21 + } 22 + conn, err := upgrader.Upgrade(w, r, nil) 23 + if err != nil { 24 + return 25 + } 26 + defer conn.Close() 27 + 28 + bus := make(chan any, 10) 29 + client := &client{ 30 + conn, 31 + bus, 32 + } 33 + lsm.clientsmu.Lock() 34 + lsm.clients[client] = true 35 + lsm.clientsmu.Unlock() 36 + 37 + client.wsWriter(lsm.ctx) 38 + 39 + lsm.clientsmu.Lock() 40 + delete(lsm.clients, client) 41 + if len(lsm.clients) == 0 { 42 + lsm.cancel() 43 + m.uriMap[uri].streamModel = nil 44 + } 45 + lsm.clientsmu.Unlock() 46 + } 47 + } 48 + 49 + func (c *client) wsWriter(ctx context.Context) { 50 + for { 51 + select { 52 + case <-ctx.Done(): 53 + return 54 + case e, ok := <-c.bus: 55 + if !ok { 56 + return 57 + } 58 + c.conn.WriteJSON(e) 59 + } 60 + } 61 + } 62 + 63 + func (lsm *lexStreamModel) broadcaster() { 64 + for { 65 + select { 66 + case <-lsm.ctx.Done(): 67 + lsm.cleanUp() 68 + return 69 + case m, ok := <-lsm.messageBus: 70 + if !ok { 71 + lsm.cleanUp() 72 + return 73 + } 74 + lsm.broadcast(m) 75 + case s, ok := <-lsm.signetBus: 76 + if !ok { 77 + lsm.cleanUp() 78 + return 79 + } 80 + lsm.broadcast(s) 81 + } 82 + } 83 + } 84 + 85 + func (lsm *lexStreamModel) cleanUp() { 86 + lsm.clientsmu.Lock() 87 + defer lsm.clientsmu.Unlock() 88 + for cli := range lsm.clients { 89 + close(cli.bus) 90 + } 91 + } 92 + 93 + func (lsm *lexStreamModel) broadcast(a any) { 94 + lsm.clientsmu.Lock() 95 + defer lsm.clientsmu.Unlock() 96 + for cli := range lsm.clients { 97 + select { 98 + case cli.bus <- a: 99 + default: 100 + delete(lsm.clients, cli) 101 + close(cli.bus) 102 + } 103 + } 104 + } 105 + 106 + func (m *Model) BroadcastSignet(uri string, s types.Signet) { 107 + lsm := m.uriMap[uri] 108 + if lsm == nil { 109 + return 110 + } 111 + ihandle, err := m.store.ResolveDid(s.IssuerDID, context.Background()) 112 + if err != nil { 113 + return 114 + } 115 + sv := types.SignetView{ 116 + URI: s.URI, 117 + IssuerHandle: ihandle, 118 + ChannelURI: s.ChannelURI, 119 + LrcId: s.MessageID, 120 + AuthorHandle: s.AuthorHandle, 121 + StartedAt: s.StartedAt, 122 + } 123 + lsm.streamModel.signetBus <- sv 124 + } 125 + 126 + func (m *Model) BroadcastMessage(uri string, msg types.Message) { 127 + lsm := m.uriMap[uri] 128 + if lsm == nil { 129 + return 130 + } 131 + pv, err := m.store.GetProfileView(msg.DID, context.Background()) 132 + if err != nil { 133 + return 134 + } 135 + mv := types.MessageView{ 136 + URI: msg.URI, 137 + Author: *pv, 138 + Body: msg.Body, 139 + Nick: msg.Nick, 140 + Color: msg.Color, 141 + SignetURI: msg.SignetURI, 142 + PostedAt: msg.PostedAt, 143 + } 144 + lsm.streamModel.messageBus <- mv 145 + }
+43
server/internal/oauth/passwordclient.go
··· 110 110 uri = out.Uri 111 111 return 112 112 } 113 + 114 + func (c *PasswordClient) DeleteXCVRSignet(rkey string, ctx context.Context) (bool, error) { 115 + getOut, err := atproto.RepoGetRecord(ctx, c.xrpc, "", "org.xcvr.lrc.signet", *c.did, rkey) 116 + if err != nil { 117 + return true, errors.New("nothing to delete :3") 118 + } 119 + input := atproto.RepoDeleteRecord_Input{ 120 + Repo: *c.did, 121 + Collection: "org.xcvr.lrc.signet", 122 + Rkey: rkey, 123 + SwapRecord: getOut.Cid, 124 + } 125 + err = c.deleteMyRecord(input, ctx) 126 + if err != nil { 127 + return false, errors.New("failed to delete") 128 + } 129 + return true, nil 130 + } 131 + 132 + func (c *PasswordClient) deleteMyRecord(input atproto.RepoDeleteRecord_Input, ctx context.Context) (err error) { 133 + if c.accessjwt == nil { 134 + err = errors.New("must create a session first") 135 + return 136 + } 137 + c.xrpc.Headers.Set("Authorization", fmt.Sprintf("Bearer %s", *c.accessjwt)) 138 + var out atproto.RepoDeleteRecord_Output 139 + err = c.xrpc.LexDo(ctx, "POST", "application/json", "com.atproto.repo.deleteRecord", nil, input, &out) 140 + if err != nil { 141 + err1 := err.Error() 142 + err = c.RefreshSession(ctx) 143 + if err != nil { 144 + err = errors.New(fmt.Sprintf("failed to refresh session while deleting %s! first %s then %s", input.Collection, err1, err.Error())) 145 + return 146 + } 147 + c.xrpc.Headers.Set("Authorization", fmt.Sprintf("Bearer %s", *c.accessjwt)) 148 + err = c.xrpc.LexDo(ctx, "POST", "application/json", "com.atproto.repo.deleteRecord", nil, input, &out) 149 + if err != nil { 150 + err = errors.New(fmt.Sprintf("not good, failed to delete %s after failing then refreshing session! first %s then %s", input.Collection, err1, err.Error())) 151 + return 152 + } 153 + } 154 + return 155 + }
+20 -11
server/internal/types/lexicons.go
··· 80 80 } 81 81 82 82 type Signet struct { 83 - URI string 84 - IssuerDID string 85 - DID string 86 - ChannelURI string 87 - MessageID uint32 88 - CID string 89 - StartedAt time.Time 90 - IndexedAt time.Time 83 + URI string 84 + IssuerDID string 85 + AuthorHandle string 86 + ChannelURI string 87 + MessageID uint32 88 + CID string 89 + StartedAt time.Time 90 + IndexedAt time.Time 91 + } 92 + 93 + type SignetView struct { 94 + URI string `json:"uri"` 95 + IssuerHandle string `json:"issuerHandle"` 96 + ChannelURI string `json:"channelURI"` 97 + LrcId uint32 `json:"lrcID"` 98 + AuthorHandle string `json:"authorHandle"` 99 + StartedAt time.Time `json:"startedAt"` 91 100 } 92 101 93 102 type Message struct { ··· 115 124 URI string `json:"uri"` 116 125 Author ProfileView `json:"author"` 117 126 Body string `json:"body"` 118 - Nick string `json:"nick,omitempty"` 119 - Color int `json:"color"` 120 - StartedAt time.Time `json:"startedAt"` 127 + Nick *string `json:"nick,omitempty"` 128 + Color *uint32 `json:"color,omitempty"` 129 + SignetURI string `json:"signetURI"` 121 130 PostedAt time.Time `json:"postedAt"` 122 131 }