Retro Bulletin Board Systems on atproto. Web app and TUI. lazy mirror of alyraffauf/atbbs atbbs.xyz
forums python tui atproto bbs
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

tui: add sysop screens

+549 -1
+33 -1
tui/app.tcss
··· 104 104 } 105 105 106 106 ComposeThreadScreen Vertical, 107 - ComposeReplyScreen Vertical { 107 + ComposeReplyScreen Vertical, 108 + ComposeNewsScreen Vertical { 109 + padding: 1 4; 110 + } 111 + 112 + SysopEditScreen VerticalScroll, 113 + SysopModerateScreen VerticalScroll { 108 114 padding: 1 4; 109 115 } 110 116 ··· 170 176 171 177 LoginScreen Input:focus { 172 178 border: solid #404040; 179 + } 180 + 181 + /* sysop menu */ 182 + SysopScreen { 183 + align: center middle; 184 + } 185 + 186 + SysopScreen ListView { 187 + width: 40; 188 + height: auto; 189 + } 190 + 191 + /* sysop delete confirmation */ 192 + SysopDeleteScreen { 193 + align: center middle; 194 + } 195 + 196 + SysopDeleteScreen Vertical { 197 + width: 60; 198 + height: auto; 199 + padding: 1 2; 200 + } 201 + 202 + SysopDeleteScreen Button { 203 + width: 100%; 204 + margin-top: 1; 173 205 } 174 206 175 207 Footer {
+11
tui/screens/site.py
··· 9 9 from tui.screens.board import BoardScreen 10 10 from tui.screens.compose import ComposeNewsScreen 11 11 from tui.screens.news import NewsScreen 12 + from tui.screens.sysop import SysopScreen 12 13 from tui.util import require_session 13 14 from core.util import format_datetime_local as format_datetime 14 15 from tui.widgets.breadcrumb import Breadcrumb ··· 18 19 BINDINGS = [ 19 20 ("escape", "app.pop_screen", "back"), 20 21 ("ctrl+n", "new_news", "news"), 22 + ("ctrl+a", "sysop", "sysop"), 21 23 ] 22 24 23 25 def __init__(self, bbs: BBS, handle: str) -> None: ··· 75 77 self.app.push_screen(SiteScreen(bbs, self.handle)) 76 78 except Exception: 77 79 self.notify("Could not refresh.", severity="error") 80 + 81 + def action_sysop(self) -> None: 82 + session = require_session(self) 83 + if not session: 84 + return 85 + if session["did"] != self.bbs.identity.did: 86 + self.notify("Only the sysop can manage this BBS.", severity="error") 87 + return 88 + self.app.push_screen(SysopScreen(self.bbs, self.handle)) 78 89 79 90 def action_new_news(self) -> None: 80 91 session = require_session(self)
+505
tui/screens/sysop.py
··· 1 + from textual import work 2 + from textual.app import ComposeResult 3 + from textual.containers import Vertical, VerticalScroll 4 + from textual.screen import Screen 5 + from textual.widgets import Button, Footer, Input, ListItem, ListView, Static, TextArea 6 + 7 + from core import lexicon 8 + from core.models import AtUri, AuthError, BBS 9 + from core.records import ( 10 + create_ban_record, 11 + create_hidden_record, 12 + delete_record, 13 + list_pds_records, 14 + put_board_record, 15 + put_site_record, 16 + ) 17 + from core.slingshot import resolve_identities_batch, resolve_identity 18 + from core.util import now_iso 19 + from tui.util import require_session 20 + from tui.widgets.breadcrumb import Breadcrumb 21 + 22 + 23 + class SysopScreen(Screen): 24 + BINDINGS = [("escape", "app.pop_screen", "back")] 25 + 26 + def __init__(self, bbs: BBS, handle: str) -> None: 27 + super().__init__() 28 + self.bbs = bbs 29 + self.handle = handle 30 + 31 + def compose(self) -> ComposeResult: 32 + yield ListView( 33 + ListItem(Static(" Edit BBS"), name="edit"), 34 + ListItem(Static(" Moderation"), name="moderate"), 35 + ListItem(Static(" Delete BBS"), name="delete"), 36 + id="sysop-menu", 37 + ) 38 + yield Footer() 39 + 40 + def on_mount(self) -> None: 41 + self.query_one("#sysop-menu", ListView).focus() 42 + 43 + def on_list_view_selected(self, event: ListView.Selected) -> None: 44 + name = event.item.name 45 + if name == "edit": 46 + self.app.push_screen(SysopEditScreen(self.bbs, self.handle)) 47 + elif name == "moderate": 48 + self.app.push_screen(SysopModerateScreen(self.bbs, self.handle)) 49 + elif name == "delete": 50 + self.app.push_screen(SysopDeleteScreen(self.bbs, self.handle)) 51 + 52 + 53 + class SysopEditScreen(Screen): 54 + BINDINGS = [ 55 + ("escape", "app.pop_screen", "back"), 56 + ("ctrl+s", "save", "save"), 57 + ("ctrl+n", "add_board", "add board"), 58 + ("ctrl+d", "remove_board", "remove board"), 59 + ] 60 + 61 + def __init__(self, bbs: BBS, handle: str) -> None: 62 + super().__init__() 63 + self.bbs = bbs 64 + self.handle = handle 65 + self._boards = [ 66 + {"slug": b.slug, "name": b.name, "description": b.description, "created_at": b.created_at} 67 + for b in bbs.site.boards 68 + ] 69 + 70 + def compose(self) -> ComposeResult: 71 + yield Breadcrumb( 72 + ("@bbs", 3), 73 + (self.bbs.site.name, 2), 74 + ("sysop", 1), 75 + ("edit", 0), 76 + ) 77 + with VerticalScroll(id="edit-scroll"): 78 + yield Static("NAME", classes="section-label") 79 + yield Input(value=self.bbs.site.name, id="edit-name") 80 + yield Static("DESCRIPTION", classes="section-label") 81 + yield Input(value=self.bbs.site.description, id="edit-desc") 82 + yield Static("INTRO", classes="section-label") 83 + yield TextArea(self.bbs.site.intro, id="edit-intro", language=None) 84 + yield Static("BOARDS (ctrl+n add, ctrl+d remove)", classes="section-label", id="boards-label") 85 + for b in self._boards: 86 + yield Static(f" {b['slug']}", classes="subtitle", id=f"board-label-{b['slug']}") 87 + yield Input(value=b["name"], id=f"board-name-{b['slug']}") 88 + yield Input(value=b["description"], id=f"board-desc-{b['slug']}") 89 + yield Footer() 90 + 91 + def on_mount(self) -> None: 92 + self.query_one("#edit-name", Input).focus() 93 + 94 + def action_add_board(self) -> None: 95 + # Find a unique slug 96 + i = len(self._boards) + 1 97 + while any(b["slug"] == f"board-{i}" for b in self._boards): 98 + i += 1 99 + slug = f"board-{i}" 100 + self._boards.append({"slug": slug, "name": slug, "description": "", "created_at": now_iso()}) 101 + 102 + scroll = self.query_one("#edit-scroll", VerticalScroll) 103 + label = Static(f" {slug}", classes="subtitle", id=f"board-label-{slug}") 104 + name_input = Input(value=slug, id=f"board-name-{slug}") 105 + desc_input = Input(value="", id=f"board-desc-{slug}") 106 + scroll.mount(label) 107 + scroll.mount(name_input) 108 + scroll.mount(desc_input) 109 + name_input.focus() 110 + 111 + def action_remove_board(self) -> None: 112 + if len(self._boards) <= 1: 113 + self.notify("Must have at least one board.", severity="warning") 114 + return 115 + 116 + # Remove the last board 117 + board = self._boards.pop() 118 + slug = board["slug"] 119 + for widget_id in (f"board-label-{slug}", f"board-name-{slug}", f"board-desc-{slug}"): 120 + try: 121 + self.query_one(f"#{widget_id}").remove() 122 + except Exception: 123 + pass 124 + 125 + def action_save(self) -> None: 126 + self._do_save() 127 + 128 + @work(exclusive=True) 129 + async def _do_save(self) -> None: 130 + session = require_session(self) 131 + if not session: 132 + return 133 + 134 + store = self.app.session_store 135 + 136 + async def updater(d, field, value): 137 + store.update_session_field(d, field, value) 138 + 139 + name = self.query_one("#edit-name", Input).value.strip() 140 + description = self.query_one("#edit-desc", Input).value.strip() 141 + intro = self.query_one("#edit-intro", TextArea).text 142 + 143 + if not name: 144 + self.notify("Name cannot be empty.", severity="error") 145 + return 146 + 147 + now = now_iso() 148 + 149 + try: 150 + # Update/create board records 151 + for b in self._boards: 152 + board_name = self.query_one( 153 + f"#board-name-{b['slug']}", Input 154 + ).value.strip() 155 + board_desc = self.query_one( 156 + f"#board-desc-{b['slug']}", Input 157 + ).value.strip() 158 + await put_board_record( 159 + self.app.http_client, 160 + session, 161 + b["slug"], 162 + board_name or b["slug"], 163 + board_desc, 164 + b["created_at"], 165 + updater, 166 + ) 167 + 168 + # Delete removed boards 169 + current_slugs = {b["slug"] for b in self._boards} 170 + for board in self.bbs.site.boards: 171 + if board.slug not in current_slugs: 172 + await delete_record( 173 + self.app.http_client, session, lexicon.BOARD, board.slug, updater 174 + ) 175 + 176 + # Update site record 177 + await put_site_record( 178 + self.app.http_client, 179 + session, 180 + { 181 + "$type": lexicon.SITE, 182 + "name": name, 183 + "description": description, 184 + "intro": intro, 185 + "boards": [b["slug"] for b in self._boards], 186 + "createdAt": self.bbs.site.created_at or now, 187 + "updatedAt": now, 188 + }, 189 + updater, 190 + ) 191 + self.notify("BBS updated.") 192 + self.app.pop_screen() 193 + except AuthError: 194 + self.notify("Session expired. Please log in again.", severity="error") 195 + except Exception as e: 196 + self.notify(f"Could not update BBS: {e}", severity="error") 197 + 198 + 199 + class SysopModerateScreen(Screen): 200 + BINDINGS = [ 201 + ("escape", "app.pop_screen", "back"), 202 + ("ctrl+d", "remove", "remove"), 203 + ("ctrl+b", "add_ban", "ban"), 204 + ("ctrl+x", "add_hide", "hide"), 205 + ] 206 + 207 + def __init__(self, bbs: BBS, handle: str) -> None: 208 + super().__init__() 209 + self.bbs = bbs 210 + self.handle = handle 211 + self._ban_rkeys: dict[str, str] = {} 212 + self._hide_rkeys: dict[str, str] = {} 213 + 214 + def compose(self) -> ComposeResult: 215 + yield Breadcrumb( 216 + ("@bbs", 3), 217 + (self.bbs.site.name, 2), 218 + ("sysop", 1), 219 + ("moderation", 0), 220 + ) 221 + with VerticalScroll(): 222 + yield Static("BANNED USERS", classes="section-label") 223 + yield Input(placeholder="handle or DID to ban", id="ban-input") 224 + yield ListView(id="ban-list") 225 + yield Static("HIDDEN POSTS", classes="section-label") 226 + yield Input(placeholder="at:// URI to hide", id="hide-input") 227 + yield ListView(id="hide-list") 228 + yield Footer() 229 + 230 + def on_mount(self) -> None: 231 + self._load_data() 232 + 233 + @work(exclusive=True) 234 + async def _load_data(self) -> None: 235 + client = self.app.http_client 236 + session = self.app.user_session 237 + 238 + # Fetch ban records 239 + try: 240 + ban_records = await list_pds_records( 241 + client, session["pds_url"], session["did"], lexicon.BAN 242 + ) 243 + self._ban_rkeys = { 244 + r["value"]["did"]: r["uri"].split("/")[-1] for r in ban_records 245 + } 246 + except Exception: 247 + self._ban_rkeys = {} 248 + 249 + # Resolve banned handles 250 + banned_dids = list(self._ban_rkeys.keys()) 251 + if banned_dids: 252 + try: 253 + authors = await resolve_identities_batch(client, banned_dids) 254 + banned_handles = {did: authors[did].handle for did in authors} 255 + except Exception: 256 + banned_handles = {} 257 + else: 258 + banned_handles = {} 259 + 260 + ban_list = self.query_one("#ban-list", ListView) 261 + ban_list.clear() 262 + for did in banned_dids: 263 + label = banned_handles.get(did, did) 264 + await ban_list.append( 265 + ListItem(Static(f" {label}"), name=f"ban:{did}") 266 + ) 267 + 268 + # Fetch hide records 269 + try: 270 + hide_records = await list_pds_records( 271 + client, session["pds_url"], session["did"], lexicon.HIDE 272 + ) 273 + self._hide_rkeys = { 274 + r["value"]["uri"]: r["uri"].split("/")[-1] for r in hide_records 275 + } 276 + except Exception: 277 + self._hide_rkeys = {} 278 + 279 + hide_list = self.query_one("#hide-list", ListView) 280 + hide_list.clear() 281 + for uri in self._hide_rkeys: 282 + await hide_list.append( 283 + ListItem(Static(f" {uri}"), name=f"hide:{uri}") 284 + ) 285 + 286 + # Focus first list with items 287 + if banned_dids: 288 + ban_list.focus() 289 + elif self._hide_rkeys: 290 + hide_list.focus() 291 + 292 + def action_remove(self) -> None: 293 + for lv_id in ("ban-list", "hide-list"): 294 + lv = self.query_one(f"#{lv_id}", ListView) 295 + if lv.index is not None and lv.has_focus: 296 + item = lv.children[lv.index] 297 + if item.name: 298 + self._do_remove(item.name, item) 299 + return 300 + 301 + @work 302 + async def _do_remove(self, key: str, item) -> None: 303 + session = self.app.user_session 304 + store = self.app.session_store 305 + 306 + async def updater(d, field, value): 307 + store.update_session_field(d, field, value) 308 + 309 + kind, _, value = key.partition(":") 310 + try: 311 + if kind == "ban" and value in self._ban_rkeys: 312 + rkey = self._ban_rkeys[value] 313 + await delete_record( 314 + self.app.http_client, session, lexicon.BAN, rkey, updater 315 + ) 316 + del self._ban_rkeys[value] 317 + self.notify(f"Unbanned {value}.") 318 + elif kind == "hide" and value in self._hide_rkeys: 319 + rkey = self._hide_rkeys[value] 320 + await delete_record( 321 + self.app.http_client, session, lexicon.HIDE, rkey, updater 322 + ) 323 + del self._hide_rkeys[value] 324 + self.notify("Post unhidden.") 325 + await item.remove() 326 + except AuthError: 327 + self.notify("Session expired. Please log in again.", severity="error") 328 + except Exception: 329 + self.notify("Could not remove record.", severity="error") 330 + 331 + def action_add_ban(self) -> None: 332 + identifier = self.query_one("#ban-input", Input).value.strip() 333 + if not identifier: 334 + self.notify("Enter a handle or DID.", severity="warning") 335 + return 336 + self._do_add_ban(identifier) 337 + 338 + @work 339 + async def _do_add_ban(self, identifier: str) -> None: 340 + session = self.app.user_session 341 + store = self.app.session_store 342 + client = self.app.http_client 343 + 344 + async def updater(d, field, value): 345 + store.update_session_field(d, field, value) 346 + 347 + # Resolve handle to DID if needed 348 + did = identifier 349 + if not identifier.startswith("did:"): 350 + try: 351 + identity = await resolve_identity(client, identifier) 352 + did = identity.did 353 + except Exception: 354 + self.notify(f"Could not resolve {identifier}.", severity="error") 355 + return 356 + 357 + if did in self._ban_rkeys: 358 + self.notify("Already banned.", severity="warning") 359 + return 360 + 361 + try: 362 + await create_ban_record(client, session, did, updater) 363 + self.notify(f"Banned {did}.") 364 + self.query_one("#ban-input", Input).value = "" 365 + self._load_data() 366 + except AuthError: 367 + self.notify("Session expired. Please log in again.", severity="error") 368 + except Exception: 369 + self.notify("Could not ban user.", severity="error") 370 + 371 + def action_add_hide(self) -> None: 372 + uri = self.query_one("#hide-input", Input).value.strip() 373 + if not uri or not uri.startswith("at://"): 374 + self.notify("Enter a valid AT-URI.", severity="warning") 375 + return 376 + self._do_add_hide(uri) 377 + 378 + @work 379 + async def _do_add_hide(self, uri: str) -> None: 380 + session = self.app.user_session 381 + store = self.app.session_store 382 + 383 + async def updater(d, field, value): 384 + store.update_session_field(d, field, value) 385 + 386 + if uri in self._hide_rkeys: 387 + self.notify("Already hidden.", severity="warning") 388 + return 389 + 390 + try: 391 + await create_hidden_record(self.app.http_client, session, uri, updater) 392 + self.notify("Post hidden.") 393 + self.query_one("#hide-input", Input).value = "" 394 + self._load_data() 395 + except AuthError: 396 + self.notify("Session expired. Please log in again.", severity="error") 397 + except Exception: 398 + self.notify("Could not hide post.", severity="error") 399 + 400 + def refresh_data(self) -> None: 401 + self._load_data() 402 + 403 + 404 + class SysopDeleteScreen(Screen): 405 + BINDINGS = [("escape", "app.pop_screen", "cancel")] 406 + 407 + def __init__(self, bbs: BBS, handle: str) -> None: 408 + super().__init__() 409 + self.bbs = bbs 410 + self.handle = handle 411 + 412 + def compose(self) -> ComposeResult: 413 + with Vertical(): 414 + yield Static("Delete your BBS?", classes="title") 415 + yield Static( 416 + "This will delete your site record, all boards, news, " 417 + "bans, and hidden post records. Threads and replies from " 418 + "users will remain in their repos.", 419 + ) 420 + yield Button("delete", id="delete-confirm", variant="error") 421 + yield Button("cancel", id="delete-cancel") 422 + yield Footer() 423 + 424 + def on_mount(self) -> None: 425 + self.query_one("#delete-cancel", Button).focus() 426 + 427 + def on_button_pressed(self, event: Button.Pressed) -> None: 428 + if event.button.id == "delete-confirm": 429 + self._do_delete() 430 + else: 431 + self.app.pop_screen() 432 + 433 + @work(exclusive=True) 434 + async def _do_delete(self) -> None: 435 + session = self.app.user_session 436 + store = self.app.session_store 437 + client = self.app.http_client 438 + 439 + async def updater(d, field, value): 440 + store.update_session_field(d, field, value) 441 + 442 + failed = [] 443 + 444 + # Delete boards 445 + for board in self.bbs.site.boards: 446 + try: 447 + await delete_record( 448 + client, session, lexicon.BOARD, board.slug, updater 449 + ) 450 + except Exception: 451 + failed.append(f"board/{board.slug}") 452 + 453 + # Delete news 454 + from core.constellation import get_news 455 + 456 + site_uri = str(AtUri(session["did"], lexicon.SITE, "self")) 457 + try: 458 + backlinks = await get_news(client, site_uri) 459 + for ref in backlinks.records: 460 + if ref.did == session["did"]: 461 + try: 462 + await delete_record( 463 + client, session, lexicon.NEWS, ref.rkey, updater 464 + ) 465 + except Exception: 466 + failed.append(f"news/{ref.rkey}") 467 + except Exception: 468 + failed.append("news lookup") 469 + 470 + # Delete ban and hide records 471 + for collection in (lexicon.BAN, lexicon.HIDE): 472 + try: 473 + records = await list_pds_records( 474 + client, session["pds_url"], session["did"], collection 475 + ) 476 + for r in records: 477 + rkey = r["uri"].split("/")[-1] 478 + try: 479 + await delete_record( 480 + client, session, collection, rkey, updater 481 + ) 482 + except Exception: 483 + failed.append(f"{collection}/{rkey}") 484 + except Exception: 485 + failed.append(f"{collection} lookup") 486 + 487 + if failed: 488 + self.notify( 489 + f"Could not delete: {', '.join(failed)}. Site record not deleted.", 490 + severity="error", 491 + ) 492 + return 493 + 494 + # Delete site record 495 + try: 496 + await delete_record(client, session, lexicon.SITE, "self", updater) 497 + except Exception: 498 + self.notify("Could not delete site record.", severity="error") 499 + return 500 + 501 + self.notify("BBS deleted.") 502 + # Pop delete screen + sysop screen + site screen 503 + self.app.pop_screen() 504 + self.app.pop_screen() 505 + self.app.pop_screen()