this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Rough draft for working out who to send the alert to and allowing users to subscribe via dm

+604 -1
+28
cmd/main.go
··· 12 12 "os/signal" 13 13 "path" 14 14 "syscall" 15 + "time" 15 16 16 17 tangledalertbot "tangled.sh/willdot.net/tangled-alert-bot" 17 18 ··· 56 57 } 57 58 defer database.Close() 58 59 60 + dmService, err := tangledalertbot.NewDmService(database, time.Second*30) 61 + if err != nil { 62 + return fmt.Errorf("create dm service: %w", err) 63 + } 64 + 59 65 ctx, cancel := context.WithCancel(context.Background()) 60 66 defer cancel() 61 67 ··· 63 69 64 70 go startHttpServer(ctx, database) 65 71 72 + go dmService.Start(ctx) 73 + 66 74 <-signals 67 75 cancel() 68 76 ··· 101 109 mux := http.NewServeMux() 102 110 mux.HandleFunc("/issues", srv.handleListIssues) 103 111 mux.HandleFunc("/comments", srv.handleListComments) 112 + mux.HandleFunc("/users", srv.handleListUsers) 104 113 105 114 err := http.ListenAndServe(":3000", mux) 106 115 if err != nil { ··· 149 158 w.Header().Set("Content-Type", "application/json") 150 159 w.Write(b) 151 160 } 161 + 162 + func (s *server) handleListUsers(w http.ResponseWriter, r *http.Request) { 163 + users, err := s.db.GetUsers() 164 + if err != nil { 165 + slog.Error("getting users from DB", "error", err) 166 + http.Error(w, "error getting users from DB", http.StatusInternalServerError) 167 + return 168 + } 169 + 170 + b, err := json.Marshal(users) 171 + if err != nil { 172 + slog.Error("marshalling users from DB", "error", err) 173 + http.Error(w, "marshalling users from DB", http.StatusInternalServerError) 174 + return 175 + } 176 + 177 + w.Header().Set("Content-Type", "application/json") 178 + w.Write(b) 179 + }
+49 -1
consumer.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "strings" 6 7 7 8 "fmt" 8 9 "log/slog" ··· 38 39 DeleteIssue(did, rkey string) error 39 40 DeleteComment(did, rkey string) error 40 41 DeleteCommentsForIssue(issueURI string) error 42 + GetUser(did string) (User, error) 43 + CreateUser(user User) error 41 44 } 42 45 43 46 // JetstreamConsumer is responsible for consuming from a jetstream instance ··· 217 220 } 218 221 219 222 // TODO: now send a notification to either the issue creator or whoever the comment was a reply to 223 + didToNotify := getUserToAlert(comment) 224 + if didToNotify == "" { 225 + slog.Info("could not work out did to send alert to", "comment", comment) 226 + return 227 + } 220 228 221 - slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey) 229 + user, err := h.store.GetUser(didToNotify) 230 + if err != nil { 231 + slog.Error("getting user to send alert to", "error", err, "did", didToNotify) 232 + return 233 + } 234 + 235 + slog.Info("sending alert to user", "value", comment, "did", didToNotify, "convo", user.ConvoID) 222 236 } 223 237 224 238 func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) { ··· 259 273 260 274 slog.Info("deleted comment ", "did", did, "rkey", rkey) 261 275 } 276 + 277 + // at://did:plc:dadhhalkfcq3gucaq25hjqon/sh.tangled.repo.issue.comment/3lzkp4va62m22 278 + func getUserToAlert(comment tangled.RepoIssueComment) string { 279 + if comment.ReplyTo != nil { 280 + return getDidFromCommentURI(*comment.ReplyTo) 281 + } 282 + return getDidFromIssueURI(comment.Issue) 283 + } 284 + 285 + func getDidFromCommentURI(uri string) string { 286 + split := strings.Split(uri, tangled.RepoIssueCommentNSID) 287 + if len(split) != 2 { 288 + slog.Error("invalid comment URI received", "uri", uri) 289 + return "" 290 + } 291 + 292 + did := strings.TrimPrefix(split[0], "at://") 293 + did = strings.TrimSuffix(did, "/") 294 + 295 + return did 296 + } 297 + 298 + func getDidFromIssueURI(uri string) string { 299 + split := strings.Split(uri, tangled.RepoIssueNSID) 300 + if len(split) != 2 { 301 + slog.Error("invalid issue URI received", "uri", uri) 302 + return "" 303 + } 304 + 305 + did := strings.TrimPrefix(split[0], "at://") 306 + did = strings.TrimSuffix(did, "/") 307 + 308 + return did 309 + }
+86
database.go
··· 44 44 return nil, fmt.Errorf("creating comments table: %w", err) 45 45 } 46 46 47 + err = createUsersTable(db) 48 + if err != nil { 49 + return nil, fmt.Errorf("creating users table: %w", err) 50 + } 51 + 47 52 return &Database{db: db}, nil 48 53 } 49 54 ··· 122 127 return nil 123 128 } 124 129 130 + func createUsersTable(db *sql.DB) error { 131 + createTableSQL := `CREATE TABLE IF NOT EXISTS users ( 132 + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, 133 + "did" TEXT, 134 + "handle" TEXT, 135 + "convoId" TEXT, 136 + "createdAt" integer NOT NULL, 137 + UNIQUE(did) 138 + );` 139 + 140 + slog.Info("Create users table...") 141 + statement, err := db.Prepare(createTableSQL) 142 + if err != nil { 143 + return fmt.Errorf("prepare DB statement to create users table: %w", err) 144 + } 145 + _, err = statement.Exec() 146 + if err != nil { 147 + return fmt.Errorf("exec sql statement to create users table: %w", err) 148 + } 149 + slog.Info("users table created") 150 + 151 + return nil 152 + } 153 + 125 154 // CreateIssue will insert a issue into a database 126 155 func (d *Database) CreateIssue(issue Issue) error { 127 156 sql := `REPLACE INTO issues (authorDid, rkey, title, body, repo, createdAt) VALUES (?, ?, ?, ?, ?, ?);` ··· 138 167 _, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.CreatedAt) 139 168 if err != nil { 140 169 return fmt.Errorf("exec insert comment: %w", err) 170 + } 171 + return nil 172 + } 173 + 174 + // CreateUser will insert a user into a database 175 + func (d *Database) CreateUser(user User) error { 176 + sql := `REPLACE INTO users (did, handle, convoId, createdAt) VALUES (?, ?, ?, ?);` 177 + _, err := d.db.Exec(sql, user.DID, user.Handle, user.ConvoID, user.CreatedAt) 178 + if err != nil { 179 + return fmt.Errorf("exec insert user: %w", err) 141 180 } 142 181 return nil 143 182 } ··· 182 221 return results, nil 183 222 } 184 223 224 + func (d *Database) GetUser(did string) (User, error) { 225 + sql := "SELECT did, handle, convoId, createdAt FROM users WHERE did = ?;" 226 + rows, err := d.db.Query(sql, did) 227 + if err != nil { 228 + return User{}, fmt.Errorf("run query to get user: %w", err) 229 + } 230 + defer rows.Close() 231 + 232 + for rows.Next() { 233 + var user User 234 + if err := rows.Scan(&user.DID, &user.Handle, &user.ConvoID, &user.CreatedAt); err != nil { 235 + return User{}, fmt.Errorf("scan row: %w", err) 236 + } 237 + 238 + return user, nil 239 + } 240 + return User{}, fmt.Errorf("user not found") 241 + } 242 + 243 + func (d *Database) GetUsers() ([]User, error) { 244 + sql := "SELECT did, handle, convoId, createdAt FROM users;" 245 + rows, err := d.db.Query(sql) 246 + if err != nil { 247 + return nil, fmt.Errorf("run query to get user: %w", err) 248 + } 249 + defer rows.Close() 250 + 251 + var results []User 252 + for rows.Next() { 253 + var user User 254 + if err := rows.Scan(&user.DID, &user.Handle, &user.ConvoID, &user.CreatedAt); err != nil { 255 + return nil, fmt.Errorf("scan row: %w", err) 256 + } 257 + results = append(results, user) 258 + } 259 + return results, nil 260 + } 261 + 185 262 func (d *Database) DeleteIssue(did, rkey string) error { 186 263 sql := "DELETE FROM issues WHERE authorDid = ? AND rkey = ?;" 187 264 _, err := d.db.Exec(sql, did, rkey) ··· 208 285 } 209 286 return nil 210 287 } 288 + 289 + func (d *Database) DeleteUser(did string) error { 290 + sql := "DELETE FROM users WHERE did = ?;" 291 + _, err := d.db.Exec(sql, did) 292 + if err != nil { 293 + return fmt.Errorf("exec delete user") 294 + } 295 + return nil 296 + }
+441
dm_handler.go
··· 1 + package tangledalertbot 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "log/slog" 10 + "net/http" 11 + "os" 12 + "strings" 13 + "time" 14 + 15 + "github.com/pkg/errors" 16 + ) 17 + 18 + const ( 19 + httpClientTimeoutDuration = time.Second * 5 20 + transportIdleConnTimeoutDuration = time.Second * 90 21 + baseBskyURL = "https://bsky.social/xrpc" 22 + ) 23 + 24 + type auth struct { 25 + AccessJwt string `json:"accessJwt"` 26 + RefershJWT string `json:"refreshJwt"` 27 + Did string `json:"did"` 28 + } 29 + 30 + type accessData struct { 31 + handle string 32 + appPassword string 33 + } 34 + 35 + type ListConvosResponse struct { 36 + Cursor string `json:"cursor"` 37 + Convos []Convo `json:"convos"` 38 + } 39 + 40 + type Convo struct { 41 + ID string `json:"id"` 42 + Members []ConvoMember `json:"members"` 43 + UnreadCount int `json:"unreadCount"` 44 + } 45 + 46 + type ConvoMember struct { 47 + Did string `json:"did"` 48 + Handle string `json:"handle"` 49 + } 50 + 51 + type ErrorResponse struct { 52 + Error string `json:"error"` 53 + } 54 + 55 + type MessageResp struct { 56 + Messages []Message `json:"messages"` 57 + Cursor string `json:"cursor"` 58 + } 59 + 60 + type Message struct { 61 + ID string `json:"id"` 62 + Sender MessageSender `json:"sender"` 63 + Text string `json:"text"` 64 + } 65 + 66 + type MessageSender struct { 67 + Did string `json:"did"` 68 + } 69 + 70 + type UpdateMessageReadRequest struct { 71 + ConvoID string `json:"convoId"` 72 + MessageID string `json:"messageId"` 73 + } 74 + 75 + type User struct { 76 + DID string 77 + Handle string 78 + ConvoID string 79 + CreatedAt int 80 + } 81 + 82 + type DmService struct { 83 + httpClient *http.Client 84 + accessData accessData 85 + auth auth 86 + timerDuration time.Duration 87 + pdsURL string 88 + store Store 89 + } 90 + 91 + func NewDmService(store Store, timerDuration time.Duration) (*DmService, error) { 92 + httpClient := http.Client{ 93 + Timeout: httpClientTimeoutDuration, 94 + Transport: &http.Transport{ 95 + IdleConnTimeout: transportIdleConnTimeoutDuration, 96 + }, 97 + } 98 + 99 + accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE") 100 + accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD") 101 + pdsURL := os.Getenv("MESSAGING_PDS_URL") 102 + 103 + service := DmService{ 104 + httpClient: &httpClient, 105 + accessData: accessData{ 106 + handle: accessHandle, 107 + appPassword: accessAppPassword, 108 + }, 109 + timerDuration: timerDuration, 110 + pdsURL: pdsURL, 111 + store: store, 112 + } 113 + 114 + auth, err := service.Authenicate() 115 + if err != nil { 116 + return nil, fmt.Errorf("authenticating: %w", err) 117 + } 118 + 119 + service.auth = auth 120 + 121 + return &service, nil 122 + } 123 + 124 + func (d *DmService) Start(ctx context.Context) { 125 + go d.RefreshTask(ctx) 126 + 127 + timer := time.NewTimer(d.timerDuration) 128 + defer timer.Stop() 129 + 130 + for { 131 + select { 132 + case <-ctx.Done(): 133 + slog.Warn("context canceled - stopping dm task") 134 + return 135 + case <-timer.C: 136 + err := d.HandleMessageTimer(ctx) 137 + if err != nil { 138 + slog.Error("handle message timer", "error", err) 139 + } 140 + timer.Reset(d.timerDuration) 141 + } 142 + } 143 + } 144 + 145 + func (d *DmService) RefreshTask(ctx context.Context) { 146 + timer := time.NewTimer(time.Hour) 147 + defer timer.Stop() 148 + 149 + for { 150 + select { 151 + case <-ctx.Done(): 152 + return 153 + case <-timer.C: 154 + err := d.RefreshAuthenication(ctx) 155 + if err != nil { 156 + slog.Error("handle refresh auth timer", "error", err) 157 + // TODO: better retry with backoff probably 158 + timer.Reset(time.Minute) 159 + continue 160 + } 161 + timer.Reset(time.Hour) 162 + } 163 + } 164 + } 165 + 166 + func (d *DmService) HandleMessageTimer(ctx context.Context) error { 167 + convoResp, err := d.GetUnreadMessages() 168 + if err != nil { 169 + return fmt.Errorf("get unread messages: %w", err) 170 + } 171 + 172 + // TODO: handle the cursor pagination 173 + 174 + for _, convo := range convoResp.Convos { 175 + if convo.UnreadCount == 0 { 176 + continue 177 + } 178 + 179 + messageResp, err := d.GetMessages(ctx, convo.ID) 180 + if err != nil { 181 + slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID) 182 + continue 183 + } 184 + 185 + unreadCount := convo.UnreadCount 186 + unreadMessages := make([]Message, 0, convo.UnreadCount) 187 + // TODO: handle cursor pagination 188 + for _, msg := range messageResp.Messages { 189 + // TODO: techincally if I get to a message that's from the bot account, then there shouldn't be 190 + // an more unread messages? 191 + if msg.Sender.Did == d.auth.Did { 192 + continue 193 + } 194 + 195 + unreadMessages = append(unreadMessages, msg) 196 + unreadCount-- 197 + if unreadCount == 0 { 198 + break 199 + } 200 + } 201 + 202 + for _, msg := range unreadMessages { 203 + d.handleMessage(msg, convo) 204 + 205 + err = d.MarkMessageRead(msg.ID, convo.ID) 206 + if err != nil { 207 + slog.Error("marking message read", "error", err) 208 + continue 209 + } 210 + } 211 + } 212 + 213 + return nil 214 + } 215 + 216 + func (d *DmService) handleMessage(msg Message, convo Convo) { 217 + // TODO: add or remote user the list of "subsribed" users 218 + if strings.ToLower(msg.Text) == "subscribe" { 219 + userHandle := "" 220 + for _, member := range convo.Members { 221 + if member.Did == msg.Sender.Did { 222 + userHandle = member.Handle 223 + break 224 + } 225 + } 226 + 227 + if userHandle == "" { 228 + slog.Error("user handle for sent message not found", "sender did", msg.Sender.Did, "convo members", convo.Members) 229 + return 230 + } 231 + 232 + user := User{ 233 + DID: msg.Sender.Did, 234 + ConvoID: convo.ID, 235 + Handle: userHandle, 236 + CreatedAt: int(time.Now().UnixMilli()), 237 + } 238 + 239 + err := d.store.CreateUser(user) 240 + if err != nil { 241 + slog.Error("error creating user", "error", err, "user", user) 242 + return 243 + } 244 + } 245 + } 246 + 247 + func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) { 248 + url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL) 249 + request, err := http.NewRequest("GET", url, nil) 250 + if err != nil { 251 + return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err) 252 + } 253 + 254 + request.Header.Add("Content-Type", "application/json") 255 + request.Header.Add("Accept", "application/json") 256 + request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 257 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 258 + 259 + resp, err := d.httpClient.Do(request) 260 + if err != nil { 261 + return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err) 262 + } 263 + defer resp.Body.Close() 264 + 265 + if resp.StatusCode != http.StatusOK { 266 + var errorResp ErrorResponse 267 + err = decodeResp(resp.Body, &errorResp) 268 + if err != nil { 269 + return ListConvosResponse{}, err 270 + } 271 + 272 + return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 273 + } 274 + 275 + var listConvoResp ListConvosResponse 276 + err = decodeResp(resp.Body, &listConvoResp) 277 + if err != nil { 278 + return ListConvosResponse{}, err 279 + } 280 + 281 + return listConvoResp, nil 282 + } 283 + 284 + func (d *DmService) MarkMessageRead(messageID, convoID string) error { 285 + bodyReq := UpdateMessageReadRequest{ 286 + ConvoID: convoID, 287 + MessageID: messageID, 288 + } 289 + 290 + bodyB, err := json.Marshal(bodyReq) 291 + if err != nil { 292 + return fmt.Errorf("marshal update message request body: %w", err) 293 + } 294 + 295 + r := bytes.NewReader(bodyB) 296 + 297 + url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL) 298 + request, err := http.NewRequest("POST", url, r) 299 + if err != nil { 300 + return fmt.Errorf("create new list convos http request: %w", err) 301 + } 302 + 303 + request.Header.Add("Content-Type", "application/json") 304 + request.Header.Add("Accept", "application/json") 305 + request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 306 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 307 + 308 + resp, err := d.httpClient.Do(request) 309 + if err != nil { 310 + return fmt.Errorf("do http request to update message read: %w", err) 311 + } 312 + defer resp.Body.Close() 313 + 314 + if resp.StatusCode == http.StatusOK { 315 + return nil 316 + } 317 + 318 + var errorResp ErrorResponse 319 + err = decodeResp(resp.Body, &errorResp) 320 + if err != nil { 321 + return err 322 + } 323 + 324 + return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 325 + 326 + } 327 + 328 + func (d *DmService) Authenicate() (auth, error) { 329 + url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL) 330 + 331 + requestData := map[string]interface{}{ 332 + "identifier": d.accessData.handle, 333 + "password": d.accessData.appPassword, 334 + } 335 + 336 + data, err := json.Marshal(requestData) 337 + if err != nil { 338 + return auth{}, errors.Wrap(err, "failed to marshal request") 339 + } 340 + 341 + r := bytes.NewReader(data) 342 + 343 + request, err := http.NewRequest("POST", url, r) 344 + if err != nil { 345 + return auth{}, errors.Wrap(err, "failed to create request") 346 + } 347 + 348 + request.Header.Add("Content-Type", "application/json") 349 + 350 + resp, err := d.httpClient.Do(request) 351 + if err != nil { 352 + return auth{}, errors.Wrap(err, "failed to make request") 353 + } 354 + defer resp.Body.Close() 355 + 356 + var loginResp auth 357 + err = decodeResp(resp.Body, &loginResp) 358 + if err != nil { 359 + return auth{}, err 360 + } 361 + 362 + return loginResp, nil 363 + } 364 + 365 + func (d *DmService) RefreshAuthenication(ctx context.Context) error { 366 + url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL) 367 + 368 + request, err := http.NewRequest("POST", url, nil) 369 + if err != nil { 370 + return errors.Wrap(err, "failed to create request") 371 + } 372 + 373 + request.Header.Add("Content-Type", "application/json") 374 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT)) 375 + 376 + resp, err := d.httpClient.Do(request) 377 + if err != nil { 378 + return errors.Wrap(err, "failed to make request") 379 + } 380 + defer resp.Body.Close() 381 + 382 + var loginResp auth 383 + err = decodeResp(resp.Body, &loginResp) 384 + if err != nil { 385 + return err 386 + } 387 + 388 + d.auth = loginResp 389 + 390 + return nil 391 + } 392 + 393 + func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) { 394 + url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID) 395 + request, err := http.NewRequest("GET", url, nil) 396 + if err != nil { 397 + return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err) 398 + } 399 + 400 + request.Header.Add("Content-Type", "application/json") 401 + request.Header.Add("Accept", "application/json") 402 + request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat") 403 + request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt)) 404 + 405 + resp, err := d.httpClient.Do(request) 406 + if err != nil { 407 + return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err) 408 + } 409 + defer resp.Body.Close() 410 + 411 + if resp.StatusCode != http.StatusOK { 412 + var errorResp ErrorResponse 413 + err = decodeResp(resp.Body, &errorResp) 414 + if err != nil { 415 + return MessageResp{}, err 416 + } 417 + 418 + return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error) 419 + } 420 + 421 + var messageResp MessageResp 422 + err = decodeResp(resp.Body, &messageResp) 423 + if err != nil { 424 + return MessageResp{}, err 425 + } 426 + 427 + return messageResp, nil 428 + } 429 + 430 + func decodeResp(body io.Reader, result any) error { 431 + resBody, err := io.ReadAll(body) 432 + if err != nil { 433 + return errors.Wrap(err, "failed to read response") 434 + } 435 + 436 + err = json.Unmarshal(resBody, result) 437 + if err != nil { 438 + return errors.Wrap(err, "failed to unmarshal response") 439 + } 440 + return nil 441 + }