a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add post splitting for long replies

splits text exceeding 300 graphemes into a self-reply thread, preferring
paragraph breaks > sentence boundaries > word boundaries. includes
regression tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+133 -8
+77 -8
src/bot/core/atproto_client.py
··· 2 2 from pathlib import Path 3 3 4 4 from atproto import Client, Session, SessionEvent 5 + from atproto_client import models 5 6 6 7 from bot.config import settings 7 8 from bot.core.rich_text import create_facets ··· 37 38 _save_session_string(session.export()) 38 39 39 40 41 + MAX_GRAPHEMES = 300 42 + 43 + 44 + def _split_text(text: str, max_len: int = MAX_GRAPHEMES) -> list[str]: 45 + """Split text into chunks that fit within bluesky's grapheme limit. 46 + 47 + Prefers splitting at paragraph breaks, then sentence boundaries, then word boundaries. 48 + """ 49 + if len(text) <= max_len: 50 + return [text] 51 + 52 + chunks = [] 53 + remaining = text 54 + 55 + while remaining: 56 + if len(remaining) <= max_len: 57 + chunks.append(remaining) 58 + break 59 + 60 + # scan backwards from limit for best break point 61 + split_at = -1 62 + 63 + # prefer paragraph break (newline) 64 + for i in range(max_len - 1, max_len // 2, -1): 65 + if remaining[i] == "\n": 66 + split_at = i + 1 67 + break 68 + 69 + # then sentence boundary (.!?) followed by space or end 70 + if split_at < 0: 71 + for i in range(max_len - 1, max_len // 2, -1): 72 + if remaining[i] in ".!?" and (i + 1 >= len(remaining) or remaining[i + 1] in " \n"): 73 + split_at = i + 1 74 + break 75 + 76 + # then word boundary 77 + if split_at < 0: 78 + split_at = remaining.rfind(" ", 0, max_len) 79 + if split_at < max_len // 2: 80 + split_at = max_len # hard break as last resort 81 + 82 + chunks.append(remaining[:split_at].rstrip()) 83 + remaining = remaining[split_at:].lstrip() 84 + 85 + return chunks 86 + 87 + 40 88 class BotClient: 41 89 def __init__(self): 42 90 self.client = Client(base_url=settings.bluesky_service) ··· 92 140 self.client.app.bsky.notification.update_seen({"seenAt": seen_at}) 93 141 94 142 async def create_post(self, text: str, reply_to=None): 95 - """Create a new post or reply with rich text support""" 143 + """Create a new post or reply. Splits long text into a self-reply thread.""" 96 144 await self.authenticate() 97 145 98 - # Create facets for mentions and URLs 99 - facets = create_facets(text, self.client) 146 + if len(text) <= 300: 147 + facets = create_facets(text, self.client) 148 + if reply_to: 149 + return self.client.send_post(text=text, reply_to=reply_to, facets=facets) 150 + return self.client.send_post(text=text, facets=facets) 151 + 152 + chunks = _split_text(text) 153 + root_ref = reply_to.root if reply_to else None 154 + last_result = None 155 + 156 + for i, chunk in enumerate(chunks): 157 + facets = create_facets(chunk, self.client) 158 + 159 + if i == 0: 160 + last_result = self.client.send_post(text=chunk, reply_to=reply_to, facets=facets) 161 + if root_ref is None: 162 + root_ref = models.ComAtprotoRepoStrongRef.Main( 163 + uri=last_result.uri, cid=last_result.cid 164 + ) 165 + else: 166 + parent_ref = models.ComAtprotoRepoStrongRef.Main( 167 + uri=last_result.uri, cid=last_result.cid 168 + ) 169 + thread_ref = models.AppBskyFeedPost.ReplyRef( 170 + parent=parent_ref, root=root_ref 171 + ) 172 + last_result = self.client.send_post(text=chunk, reply_to=thread_ref, facets=facets) 100 173 101 - # Use send_post with facets 102 - if reply_to: 103 - return self.client.send_post(text=text, reply_to=reply_to, facets=facets) 104 - else: 105 - return self.client.send_post(text=text, facets=facets) 174 + return last_result 106 175 107 176 async def get_thread(self, uri: str, depth: int = 10): 108 177 """Get a thread by URI"""
+56
tests/test_split_text.py
··· 1 + """Regression tests for post splitting (grapheme limit 300).""" 2 + 3 + from bot.core.atproto_client import _split_text 4 + 5 + 6 + def test_short_text_unchanged(): 7 + assert _split_text("hello world") == ["hello world"] 8 + 9 + 10 + def test_exactly_300_unchanged(): 11 + text = "a" * 300 12 + assert _split_text(text) == [text] 13 + 14 + 15 + def test_splits_at_sentence_boundary(): 16 + # Two sentences, second pushes past 300 17 + first = "a" * 250 + "." 18 + second = " " + "b" * 100 19 + text = first + second 20 + chunks = _split_text(text) 21 + assert len(chunks) == 2 22 + assert chunks[0] == first 23 + assert chunks[1] == "b" * 100 24 + 25 + 26 + def test_splits_at_word_boundary(): 27 + # No sentence boundaries, should split at last space 28 + text = " ".join(["word"] * 100) # 499 chars 29 + chunks = _split_text(text) 30 + assert all(len(c) <= 300 for c in chunks) 31 + assert " ".join(chunks) == text 32 + 33 + 34 + def test_splits_at_paragraph_break(): 35 + first = "a" * 200 + "\n" 36 + second = "b" * 200 37 + text = first + second 38 + chunks = _split_text(text) 39 + assert len(chunks) == 2 40 + assert chunks[0] == "a" * 200 41 + assert chunks[1] == "b" * 200 42 + 43 + 44 + def test_three_way_split(): 45 + text = ". ".join(["x" * 280] * 3) 46 + chunks = _split_text(text) 47 + assert len(chunks) == 3 48 + assert all(len(c) <= 300 for c in chunks) 49 + 50 + 51 + def test_hard_break_no_spaces(): 52 + text = "a" * 600 53 + chunks = _split_text(text) 54 + assert len(chunks) == 2 55 + assert chunks[0] == "a" * 300 56 + assert chunks[1] == "a" * 300