backend for xcvr appview
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 618 lines 14 kB view raw
1package db 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "os" 8 "rvcx/internal/atputils" 9 "rvcx/internal/lex" 10 "rvcx/internal/types" 11 "time" 12 13 "github.com/jackc/pgx/v5" 14 "github.com/jackc/pgx/v5/pgxpool" 15) 16 17type Store struct { 18 pool *pgxpool.Pool 19} 20 21func Init() (*Store, error) { 22 pool, err := initialize() 23 return &Store{pool}, err 24} 25 26func (s *Store) Close() { 27 s.pool.Close() 28} 29 30func initialize() (*pgxpool.Pool, error) { 31 dbuser := os.Getenv("POSTGRES_USER") 32 dbpass := os.Getenv("POSTGRES_PASSWORD") 33 dbhost := "localhost" 34 dbport := os.Getenv("POSTGRES_PORT") 35 dbdb := os.Getenv("POSTGRES_DB") 36 dburl := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", dbuser, dbpass, dbhost, dbport, dbdb) 37 pool, err := pgxpool.New(context.Background(), dburl) 38 if err != nil { 39 return nil, err 40 } 41 pingErr := pool.Ping(context.Background()) 42 if pingErr != nil { 43 return nil, pingErr 44 } 45 fmt.Println("connected!") 46 return pool, nil 47} 48 49func (s *Store) ResolveHandle(handle string, ctx context.Context) (string, error) { 50 row := s.pool.QueryRow(ctx, `SELECT h.did FROM did_handles h WHERE h.handle = $1`, handle) 51 var did string 52 err := row.Scan(&did) 53 if err != nil { 54 return "", err 55 } 56 return did, nil 57} 58 59func (s *Store) FullResolveHandle(hdl string, ctx context.Context) (string, error) { 60 did, err := s.ResolveHandle(hdl, ctx) 61 if err == nil { 62 return did, nil 63 } 64 did, err = atputils.TryLookupHandle(ctx, hdl) 65 if err != nil { 66 return "", errors.New("couldn't resolve: " + err.Error()) 67 } 68 s.StoreDidHandle(did, hdl, ctx) 69 return did, nil 70} 71 72func (s *Store) ResolveDid(did string, ctx context.Context) (string, error) { 73 row := s.pool.QueryRow(ctx, `SELECT h.handle FROM did_handles h WHERE h.did = $1`, did) 74 var handle string 75 err := row.Scan(&handle) 76 if err != nil { 77 return "", errors.New("error scanning row for handle: " + err.Error()) 78 } 79 return handle, nil 80} 81 82func (s *Store) FullResolveDid(did string, ctx context.Context) (string, error) { 83 hdl, err := s.ResolveDid(did, ctx) 84 if err == nil { 85 return hdl, nil 86 } 87 hdl, err = atputils.TryLookupDid(ctx, did) 88 if err != nil { 89 return "", errors.New("couldn't resolve: " + err.Error()) 90 } 91 s.StoreDidHandle(did, hdl, ctx) 92 return hdl, nil 93} 94 95func (s *Store) StoreDidHandle(did string, handle string, ctx context.Context) error { 96 _, err := s.pool.Exec(ctx, `INSERT INTO did_handles ( 97 handle, 98 did 99 ) VALUES ($1, $2) ON CONFLICT (handle) DO NOTHING`, handle, did) 100 if err != nil { 101 return errors.New("error storing did/handle: " + err.Error()) 102 } 103 return nil 104} 105 106func (s *Store) GetLastSeen(did string, ctx context.Context) (where *string, when *time.Time) { 107 row := s.pool.QueryRow(ctx, `SELECT 108 s.channel_uri, m.posted_at 109 FROM messages m 110 JOIN signets s ON m.signet_uri = s.uri 111 JOIN did_handles dh ON m.did = dh.did 112 WHERE m.did = $1 AND dh.handle = s.author_handle 113 ORDER BY m.posted_at DESC`, did) 114 row.Scan(&where, &when) 115 return 116} 117 118func (s *Store) GetHistory(channelURI string, limit int, cursor *int, ctx context.Context) ([]types.SignedItemView, error) { 119 queryFmt := ` 120 SELECT 121 'message' AS content_type, 122 m.uri, 123 m.did, 124 dh.handle, 125 p.display_name, 126 p.status, 127 p.color, 128 p.avatar_cid, 129 p.default_nick, 130 m.body, 131 NULL AS blob_cid, 132 NULL AS blob_mime, 133 NULL AS alt, 134 NULL AS height, 135 NULL AS width, 136 m.nick, 137 m.color, 138 s.uri, 139 s.issuer_did, 140 s.channel_uri, 141 s.message_id, 142 s.author, 143 s.author_handle, 144 s.started_at, 145 m.posted_at 146 FROM signets s 147 JOIN messages m ON s.uri = m.signet_uri 148 JOIN did_handles dh ON m.did = dh.did 149 JOIN profiles p ON m.did = p.did 150 WHERE s.channel_uri = $2 AND m.did = s.author %s 151 152 UNION ALL 153 154 SELECT 155 'image' AS content_type, 156 i.uri, 157 i.did, 158 dh.handle, 159 p.display_name, 160 p.status, 161 p.color, 162 p.avatar_cid, 163 p.default_nick, 164 NULL AS body, 165 i.blob_cid, 166 i.blob_mime, 167 i.alt, 168 i.height, 169 i.width, 170 i.nick, 171 i.color, 172 s.uri, 173 s.issuer_did, 174 s.channel_uri, 175 s.message_id, 176 s.author, 177 s.author_handle, 178 s.started_at, 179 i.posted_at 180 FROM signets s 181 JOIN images i ON s.uri = i.signet_uri 182 JOIN did_handles dh ON i.did = dh.did 183 JOIN profiles p ON i.did = p.did 184 WHERE s.channel_uri = $2 AND i.did = s.author %s 185 186 ORDER BY message_id DESC 187 LIMIT $1 188 ` 189 var query string 190 if cursor != nil { 191 query = fmt.Sprintf(queryFmt, "AND s.message_id < $3", "AND s.message_id < $3") 192 return s.evalGetItems(query, ctx, limit, channelURI, *cursor) 193 } else { 194 query = fmt.Sprintf(queryFmt, "", "") 195 return s.evalGetItems(query, ctx, limit, channelURI) 196 } 197} 198func (s *Store) evalGetItems(query string, ctx context.Context, limit int, params ...any) ([]types.SignedItemView, error) { 199 args := []any{limit} 200 args = append(args, params...) 201 rows, err := s.pool.Query(ctx, query, args...) 202 if err != nil { 203 return nil, err 204 } 205 defer rows.Close() 206 var items = make([]types.SignedItemView, 0) 207 for rows.Next() { 208 var t string 209 var p types.ProfileView 210 var uri string 211 var body *string 212 var image types.Image 213 var alt *string 214 var nick string 215 var color uint32 216 var s types.SignetView 217 var time time.Time 218 219 err := rows.Scan( 220 &t, 221 &uri, 222 &p.DID, 223 &p.Handle, 224 &p.DisplayName, 225 &p.Status, 226 &p.Color, 227 &p.Avatar, 228 &p.DefaultNick, 229 &body, 230 &image.BlobCID, 231 &image.BlobMIME, 232 &alt, 233 &image.Height, 234 &image.Width, 235 236 &nick, 237 &color, 238 239 &s.URI, 240 &s.Issuer, 241 &s.ChannelURI, 242 &s.LrcId, 243 &s.Author, 244 &s.AuthorHandle, 245 &s.StartedAt, 246 &time, 247 ) 248 if err != nil { 249 return nil, err 250 } 251 if t == "message" { 252 var msg types.SignedMessageView 253 if body != nil { 254 255 msg.Body = *body 256 } 257 if nick != "" { 258 msg.Nick = &nick 259 } 260 if color != 0 { 261 msg.Color = &color 262 } 263 msg.Author = p 264 msg.Signet = s 265 msg.PostedAt = time 266 msg.URI = uri 267 268 items = append(items, msg) 269 } else if t == "image" { 270 var img types.SignedMediaView 271 var imgview types.ImageView 272 if image.Height != nil && image.Width != nil { 273 var aspect lex.AspectRatio 274 aspect.Width = *image.Width 275 aspect.Height = *image.Height 276 imgview.AspectRatio = &aspect 277 } 278 if alt != nil { 279 imgview.Alt = *alt 280 } 281 base := os.Getenv("MY_IDENTITY") 282 src := fmt.Sprintf("https://%s/xrpc/org.xcvr.lrc.getImage?uri=%s", base, uri) 283 imgview.Src = &src 284 img.Image = &imgview 285 if nick != "" { 286 img.Nick = &nick 287 } 288 if color != 0 { 289 img.Color = &color 290 } 291 img.Author = p 292 img.Signet = s 293 img.PostedAt = time 294 img.URI = uri 295 296 items = append(items, img) 297 } else { 298 return nil, errors.New("recieved strange type t: " + t) 299 } 300 } 301 return items, nil 302 303} 304 305func (s *Store) GetMessages(channelURI string, limit int, cursor *int, ctx context.Context) ([]types.SignedMessageView, error) { 306 queryFmt := ` 307 SELECT 308 m.uri, 309 m.did, 310 dh.handle, 311 p.display_name, 312 p.status, 313 p.color, 314 p.avatar_cid, 315 p.default_nick, 316 m.body, 317 m.nick, 318 m.color, 319 s.uri, 320 issuer_dh.handle, 321 s.channel_uri, 322 s.message_id, 323 s.author_handle, 324 s.started_at, 325 m.posted_at 326 FROM messages m 327 JOIN signets s ON m.signet_uri = s.uri 328 JOIN did_handles dh ON m.did = dh.did 329 LEFT JOIN profiles p ON m.did = p.did 330 JOIN did_handles issuer_dh ON s.issuer_did = issuer_dh.did 331 WHERE s.channel_uri = $2 AND dh.handle = s.author_handle %s 332 ORDER BY s.message_id DESC 333 LIMIT $1 334 ` 335 var query string 336 if cursor != nil { 337 query = fmt.Sprintf(queryFmt, "AND s.message_id < $3") 338 return s.evalGetMessages(query, ctx, limit, channelURI, *cursor) 339 } else { 340 query = fmt.Sprintf(queryFmt, "") 341 return s.evalGetMessages(query, ctx, limit, channelURI) 342 } 343} 344 345func (s *Store) evalGetMessages(query string, ctx context.Context, limit int, params ...any) ([]types.SignedMessageView, error) { 346 args := []any{limit} 347 args = append(args, params...) 348 rows, err := s.pool.Query(ctx, query, args...) 349 if err != nil { 350 return nil, err 351 } 352 defer rows.Close() 353 var msgs = make([]types.SignedMessageView, 0) 354 for rows.Next() { 355 var msg types.SignedMessageView 356 err := rows.Scan( 357 &msg.URI, 358 359 &msg.Author.DID, 360 &msg.Author.Handle, 361 &msg.Author.DisplayName, 362 &msg.Author.Status, 363 &msg.Author.Color, 364 &msg.Author.Avatar, 365 &msg.Author.DefaultNick, 366 367 &msg.Body, 368 &msg.Nick, 369 &msg.Color, 370 371 &msg.Signet.URI, 372 &msg.Signet.Issuer, 373 &msg.Signet.ChannelURI, 374 &msg.Signet.LrcId, 375 &msg.Signet.AuthorHandle, 376 &msg.Signet.StartedAt, 377 378 &msg.PostedAt, 379 ) 380 if err != nil { 381 return nil, err 382 } 383 msgs = append(msgs, msg) 384 } 385 return msgs, nil 386} 387 388func (s *Store) GetChannelURI(handle string, title string, ctx context.Context) (string, error) { 389 rows, err := s.pool.Query(ctx, ` 390 SELECT 391 channels.uri 392 FROM channels 393 LEFT JOIN did_handles ON channels.did = did_handles.did 394 WHERE channels.title = $1 AND did_handles.handle = $2 395 ORDER BY channels.created_at DESC 396 LIMIT 1 397 `, title, handle) 398 if err != nil { 399 return "", err 400 } 401 defer rows.Close() 402 var uri string 403 rows.Next() 404 err = rows.Scan(&uri) 405 if err != nil { 406 return "", err 407 } 408 return uri, nil 409} 410 411type URIHost struct { 412 URI string 413 Host string 414 Topic string 415 LastID uint32 416} 417 418func (s *Store) GetChannelURIs(ctx context.Context) ([]URIHost, error) { 419 rows, err := s.pool.Query(ctx, ` 420 SELECT 421 channels.uri, 422 channels.host, 423 channels.topic 424 FROM channels 425 `) 426 if err != nil { 427 return nil, err 428 } 429 defer rows.Close() 430 var urihosts = make([]URIHost, 0, 100) 431 for rows.Next() { 432 var urihost URIHost 433 err := rows.Scan(&urihost.URI, &urihost.Host, &urihost.Topic) 434 if err != nil { 435 return nil, err 436 } 437 var maxMessageID uint32 438 err = s.pool.QueryRow(ctx, ` 439 SELECT COALESCE(MAX(message_id), 0) 440 FROM signets 441 WHERE channel_uri = $1 442 `, urihost.URI).Scan(&maxMessageID) 443 if err != nil { 444 return nil, err 445 } 446 urihost.LastID = maxMessageID 447 urihosts = append(urihosts, urihost) 448 } 449 return urihosts, nil 450} 451 452func (s *Store) GetChannelViews(limit int, ctx context.Context) ([]types.ChannelView, error) { 453 rows, err := s.pool.Query(ctx, ` 454 SELECT 455 channels.uri, 456 channels.host, 457 channels.title, 458 channels.topic, 459 channels.created_at, 460 did_handles.did, 461 did_handles.handle, 462 profiles.display_name, 463 profiles.status, 464 profiles.color, 465 profiles.avatar_cid 466 FROM channels 467 LEFT JOIN profiles ON channels.did = profiles.did 468 LEFT JOIN did_handles ON profiles.did = did_handles.did 469 ORDER BY channels.created_at DESC 470 LIMIT $1 471 `, limit) 472 if err != nil { 473 return nil, err 474 } 475 defer rows.Close() 476 var chans = make([]types.ChannelView, 0, limit) 477 for rows.Next() { 478 var c types.ChannelView 479 var p types.ProfileView 480 err := rows.Scan(&c.URI, &c.Host, &c.Title, &c.Topic, &c.CreatedAt, &p.DID, &p.Handle, &p.DisplayName, &p.Status, &p.Color, &p.Avatar) 481 if err != nil { 482 return nil, err 483 } 484 c.Creator = p 485 chans = append(chans, c) 486 } 487 return chans, nil 488} 489 490func (s *Store) GetChannelView(uri string, ctx context.Context) (*types.ChannelView, error) { 491 row := s.pool.QueryRow(ctx, ` 492 SELECT 493 channels.uri, 494 channels.host, 495 channels.title, 496 channels.topic, 497 channels.created_at, 498 did_handles.did, 499 did_handles.handle, 500 profiles.display_name, 501 profiles.status, 502 profiles.color, 503 profiles.avatar_cid 504 FROM channels 505 LEFT JOIN profiles ON channels.did = profiles.did 506 LEFT JOIN did_handles ON profiles.did = did_handles.did 507 WHERE channels.uri = $1 508 `, uri) 509 var c types.ChannelView 510 var p types.ProfileView 511 err := row.Scan(&c.URI, &c.Host, &c.Title, &c.Topic, &c.CreatedAt, &p.DID, &p.Handle, &p.DisplayName, &p.Status, &p.Color, &p.Avatar) 512 if err != nil { 513 return nil, err 514 } 515 c.Creator = p 516 return &c, nil 517} 518func (s *Store) GetChannelViewHR(handle string, rkey string, ctx context.Context) (*types.ChannelView, error) { 519 did, err := s.ResolveHandle(handle, ctx) 520 if err != nil { 521 return nil, err 522 } 523 uri := fmt.Sprintf("at://%s/org.xcvr.feed.channel/%s", did, rkey) 524 row := s.pool.QueryRow(ctx, ` 525 SELECT 526 channels.uri, 527 channels.host, 528 channels.title, 529 channels.topic, 530 channels.created_at, 531 did_handles.did, 532 did_handles.handle, 533 profiles.display_name, 534 profiles.status, 535 profiles.color, 536 profiles.avatar_cid 537 FROM channels 538 LEFT JOIN profiles ON channels.did = profiles.did 539 LEFT JOIN did_handles ON profiles.did = did_handles.did 540 WHERE channels.uri = $1 541 `, uri) 542 var c types.ChannelView 543 var p types.ProfileView 544 err = row.Scan(&c.URI, &c.Host, &c.Title, &c.Topic, &c.CreatedAt, &p.DID, &p.Handle, &p.DisplayName, &p.Status, &p.Color, &p.Avatar) 545 if err != nil { 546 return nil, err 547 } 548 c.Creator = p 549 return &c, nil 550} 551 552func (s *Store) DeleteChannel(uri string, ctx context.Context) error { 553 _, err := s.pool.Exec(ctx, `DELETE FROM channels WHERE uri = $1`, uri) 554 return err 555} 556 557func (s *Store) GetBanned(did string, ctx context.Context) (*types.Ban, error) { 558 row := s.pool.QueryRow(ctx, `SELECT 559 id, 560 reason, 561 till, 562 banned_at 563 FROM bans WHERE did = $1 ORDER BY id DESC`, did) 564 var ban types.Ban 565 err := row.Scan(&ban.Id, &ban.Reason, &ban.Till, &ban.BannedAt) 566 if err != nil { 567 return nil, err 568 } 569 ban.Did = did 570 return &ban, nil 571} 572 573func (s *Store) GetBanId(id int, ctx context.Context) (*types.Ban, error) { 574 row := s.pool.QueryRow(ctx, `SELECT 575 did, 576 reason, 577 till, 578 banned_at 579 FROM bans WHERE id = $1`, id) 580 var ban types.Ban 581 err := row.Scan(&ban.Did, &ban.Reason, &ban.Till, &ban.BannedAt) 582 if err != nil { 583 return nil, err 584 } 585 ban.Id = id 586 return &ban, nil 587} 588 589func (s *Store) AddBan(did string, reason *string, till *time.Time, ctx context.Context) error { 590 _, err := s.pool.Exec(ctx, `INSERT INTO bans ( 591 did, 592 reason, 593 till 594 ) VALUES ( 595 $1, $2, $3 596 ) 597 `, did, reason, till) 598 return err 599} 600 601func (s *Store) IsBanned(did string, ctx context.Context) (bool, error) { 602 ban, err := s.GetBanned(did, ctx) 603 if ban != nil { 604 defbanned := false 605 if ban.Till == nil { 606 defbanned = true 607 } else { 608 defbanned = time.Now().Before(*ban.Till) 609 } 610 if defbanned { 611 return true, nil 612 } 613 } 614 if err != nil && !errors.Is(err, pgx.ErrNoRows) { 615 return false, err 616 } 617 return false, nil 618}