this repo has no description
40
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e4cd3e0e2cdbfd9b720547dee2a2f43d6b3cb694 922 lines 33 kB view raw
1import os 2import logging 3from typing import Optional, Dict, Any, List 4from atproto_client import Client, Session, SessionEvent, models 5 6# Configure logging 7logging.basicConfig( 8 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 9) 10logger = logging.getLogger("bluesky_session_handler") 11 12# Load the environment variables 13import dotenv 14dotenv.load_dotenv(override=True) 15 16import yaml 17import json 18 19# Strip fields. A list of fields to remove from a JSON object 20STRIP_FIELDS = [ 21 "cid", 22 "rev", 23 "did", 24 "uri", 25 "langs", 26 "threadgate", 27 "py_type", 28 "labels", 29 "avatar", 30 "viewer", 31 "indexed_at", 32 "tags", 33 "associated", 34 "thread_context", 35 "aspect_ratio", 36 "thumb", 37 "fullsize", 38 "root", 39 "created_at", 40 "verification", 41 "like_count", 42 "quote_count", 43 "reply_count", 44 "repost_count", 45 "embedding_disabled", 46 "thread_muted", 47 "reply_disabled", 48 "pinned", 49 "like", 50 "repost", 51 "blocked_by", 52 "blocking", 53 "blocking_by_list", 54 "followed_by", 55 "following", 56 "known_followers", 57 "muted", 58 "muted_by_list", 59 "root_author_like", 60 "entities", 61 "ref", 62 "mime_type", 63 "size", 64] 65def convert_to_basic_types(obj): 66 """Convert complex Python objects to basic types for JSON/YAML serialization.""" 67 if hasattr(obj, '__dict__'): 68 # Convert objects with __dict__ to their dictionary representation 69 return convert_to_basic_types(obj.__dict__) 70 elif isinstance(obj, dict): 71 return {key: convert_to_basic_types(value) for key, value in obj.items()} 72 elif isinstance(obj, list): 73 return [convert_to_basic_types(item) for item in obj] 74 elif isinstance(obj, (str, int, float, bool)) or obj is None: 75 return obj 76 else: 77 # For other types, try to convert to string 78 return str(obj) 79 80 81def strip_fields(obj, strip_field_list): 82 """Recursively strip fields from a JSON object.""" 83 if isinstance(obj, dict): 84 keys_flagged_for_removal = [] 85 86 # Remove fields from strip list and pydantic metadata 87 for field in list(obj.keys()): 88 if field in strip_field_list or field.startswith("__"): 89 keys_flagged_for_removal.append(field) 90 91 # Remove flagged keys 92 for key in keys_flagged_for_removal: 93 obj.pop(key, None) 94 95 # Recursively process remaining values 96 for key, value in list(obj.items()): 97 obj[key] = strip_fields(value, strip_field_list) 98 # Remove empty/null values after processing 99 if ( 100 obj[key] is None 101 or (isinstance(obj[key], dict) and len(obj[key]) == 0) 102 or (isinstance(obj[key], list) and len(obj[key]) == 0) 103 or (isinstance(obj[key], str) and obj[key].strip() == "") 104 ): 105 obj.pop(key, None) 106 107 elif isinstance(obj, list): 108 for i, value in enumerate(obj): 109 obj[i] = strip_fields(value, strip_field_list) 110 # Remove None values from list 111 obj[:] = [item for item in obj if item is not None] 112 113 return obj 114 115 116def flatten_thread_structure(thread_data): 117 """ 118 Flatten a nested thread structure into a list while preserving all data. 119 120 Args: 121 thread_data: The thread data from get_post_thread 122 123 Returns: 124 Dict with 'posts' key containing a list of posts in chronological order 125 """ 126 posts = [] 127 128 def traverse_thread(node): 129 """Recursively traverse the thread structure to collect posts.""" 130 if not node: 131 return 132 133 # If this node has a parent, traverse it first (to maintain chronological order) 134 if hasattr(node, 'parent') and node.parent: 135 traverse_thread(node.parent) 136 137 # Then add this node's post 138 if hasattr(node, 'post') and node.post: 139 # Convert to dict if needed to ensure we can process it 140 if hasattr(node.post, '__dict__'): 141 post_dict = node.post.__dict__.copy() 142 elif isinstance(node.post, dict): 143 post_dict = node.post.copy() 144 else: 145 post_dict = {} 146 147 posts.append(post_dict) 148 149 # Handle the thread structure 150 if hasattr(thread_data, 'thread'): 151 # Start from the main thread node 152 traverse_thread(thread_data.thread) 153 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__: 154 traverse_thread(thread_data.__dict__['thread']) 155 156 # Return a simple structure with posts list 157 return {'posts': posts} 158 159 160def thread_to_yaml_string(thread, strip_metadata=True): 161 """ 162 Convert thread data to a YAML-formatted string for LLM parsing. 163 164 Args: 165 thread: The thread data from get_post_thread 166 strip_metadata: Whether to strip metadata fields for cleaner output 167 168 Returns: 169 YAML-formatted string representation of the thread 170 """ 171 # First flatten the thread structure to avoid deep nesting 172 flattened = flatten_thread_structure(thread) 173 174 # Convert complex objects to basic types 175 basic_thread = convert_to_basic_types(flattened) 176 177 if strip_metadata: 178 # Create a copy and strip unwanted fields 179 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS) 180 else: 181 cleaned_thread = basic_thread 182 183 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) 184 185 186 187 188 189 190 191def get_session(username: str) -> Optional[str]: 192 try: 193 with open(f"session_{username}.txt", encoding="UTF-8") as f: 194 return f.read() 195 except FileNotFoundError: 196 logger.debug(f"No existing session found for {username}") 197 return None 198 199def save_session(username: str, session_string: str) -> None: 200 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: 201 f.write(session_string) 202 logger.debug(f"Session saved for {username}") 203 204def on_session_change(username: str, event: SessionEvent, session: Session) -> None: 205 logger.debug(f"Session changed: {event} {repr(session)}") 206 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 207 logger.debug(f"Saving changed session for {username}") 208 save_session(username, session.export()) 209 210def init_client(username: str, password: str) -> Client: 211 from config_loader import get_bluesky_config 212 try: 213 bluesky_config = get_bluesky_config() 214 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social') 215 except (ValueError, KeyError): 216 pds_uri = os.getenv("PDS_URI", "https://bsky.social") 217 logger.warning( 218 "Failed to load PDS URI from config. Using environment variable or default." 219 ) 220 221 # Print the PDS URI 222 logger.debug(f"Using PDS URI: {pds_uri}") 223 224 client = Client(pds_uri) 225 client.on_session_change( 226 lambda event, session: on_session_change(username, event, session) 227 ) 228 229 session_string = get_session(username) 230 if session_string: 231 logger.debug(f"Reusing existing session for {username}") 232 client.login(session_string=session_string) 233 else: 234 logger.debug(f"Creating new session for {username}") 235 client.login(username, password) 236 237 return client 238 239 240def default_login() -> Client: 241 from config_loader import get_bluesky_config 242 try: 243 bluesky_config = get_bluesky_config() 244 username = bluesky_config['username'] 245 password = bluesky_config['password'] 246 except (ValueError, KeyError) as e: 247 logger.error(f"Failed to load Bluesky configuration: {e}") 248 exit() 249 250 251 if username is None: 252 logger.error( 253 "No username provided. Please provide a username using the BSKY_USERNAME environment variable." 254 ) 255 exit() 256 257 if password is None: 258 logger.error( 259 "No password provided. Please provide a password using the BSKY_PASSWORD environment variable." 260 ) 261 exit() 262 263 return init_client(username, password) 264 265def remove_outside_quotes(text: str) -> str: 266 """ 267 Remove outside double quotes from response text. 268 269 Only handles double quotes to avoid interfering with contractions: 270 - Double quotes: "text" → text 271 - Preserves single quotes and internal quotes 272 273 Args: 274 text: The text to process 275 276 Returns: 277 Text with outside double quotes removed 278 """ 279 if not text or len(text) < 2: 280 return text 281 282 text = text.strip() 283 284 # Only remove double quotes from start and end 285 if text.startswith('"') and text.endswith('"'): 286 return text[1:-1] 287 288 return text 289 290def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]: 291 """ 292 Reply to a post on Bluesky with rich text support. 293 294 Args: 295 client: Authenticated Bluesky client 296 text: The reply text 297 reply_to_uri: The URI of the post being replied to (parent) 298 reply_to_cid: The CID of the post being replied to (parent) 299 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri 300 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid 301 lang: Language code for the post (e.g., 'en-US', 'es', 'ja') 302 303 Returns: 304 The response from sending the post 305 """ 306 import re 307 308 # If root is not provided, this is a reply to the root post 309 if root_uri is None: 310 root_uri = reply_to_uri 311 root_cid = reply_to_cid 312 313 # Create references for the reply 314 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) 315 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) 316 317 # Parse rich text facets (mentions and URLs) 318 facets = [] 319 text_bytes = text.encode("UTF-8") 320 321 # Parse mentions - fixed to handle @ at start of text 322 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 323 324 for m in re.finditer(mention_regex, text_bytes): 325 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 326 # Adjust byte positions to account for the optional prefix 327 mention_start = m.start(1) 328 mention_end = m.end(1) 329 try: 330 # Resolve handle to DID using the API 331 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) 332 if resolve_resp and hasattr(resolve_resp, 'did'): 333 facets.append( 334 models.AppBskyRichtextFacet.Main( 335 index=models.AppBskyRichtextFacet.ByteSlice( 336 byteStart=mention_start, 337 byteEnd=mention_end 338 ), 339 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] 340 ) 341 ) 342 except Exception as e: 343 logger.debug(f"Failed to resolve handle {handle}: {e}") 344 continue 345 346 # Parse URLs - fixed to handle URLs at start of text 347 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 348 349 for m in re.finditer(url_regex, text_bytes): 350 url = m.group(1).decode("UTF-8") 351 # Adjust byte positions to account for the optional prefix 352 url_start = m.start(1) 353 url_end = m.end(1) 354 facets.append( 355 models.AppBskyRichtextFacet.Main( 356 index=models.AppBskyRichtextFacet.ByteSlice( 357 byteStart=url_start, 358 byteEnd=url_end 359 ), 360 features=[models.AppBskyRichtextFacet.Link(uri=url)] 361 ) 362 ) 363 364 # Send the reply with facets if any were found 365 if facets: 366 response = client.send_post( 367 text=text, 368 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 369 facets=facets, 370 langs=[lang] if lang else None 371 ) 372 else: 373 response = client.send_post( 374 text=text, 375 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 376 langs=[lang] if lang else None 377 ) 378 379 logger.info(f"Reply sent successfully: {response.uri}") 380 return response 381 382 383def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: 384 """ 385 Get the thread containing a post to find root post information. 386 387 Args: 388 client: Authenticated Bluesky client 389 uri: The URI of the post 390 391 Returns: 392 The thread data or None if not found 393 """ 394 try: 395 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) 396 return thread 397 except Exception as e: 398 logger.error(f"Error fetching post thread: {e}") 399 return None 400 401 402def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US") -> Optional[Dict[str, Any]]: 403 """ 404 Reply to a notification (mention or reply). 405 406 Args: 407 client: Authenticated Bluesky client 408 notification: The notification object from list_notifications 409 reply_text: The text to reply with 410 lang: Language code for the post (defaults to "en-US") 411 412 Returns: 413 The response from sending the reply or None if failed 414 """ 415 try: 416 # Get the post URI and CID from the notification (handle both dict and object) 417 if isinstance(notification, dict): 418 post_uri = notification.get('uri') 419 post_cid = notification.get('cid') 420 # Check if the notification record has reply info with root 421 record = notification.get('record', {}) 422 reply_info = record.get('reply') if isinstance(record, dict) else None 423 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 424 post_uri = notification.uri 425 post_cid = notification.cid 426 # Check if the notification record has reply info with root 427 reply_info = None 428 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 429 reply_info = notification.record.reply 430 else: 431 post_uri = None 432 post_cid = None 433 reply_info = None 434 435 if not post_uri or not post_cid: 436 logger.error("Notification doesn't have required uri/cid fields") 437 return None 438 439 # Determine root: if post has reply info, use its root; otherwise this post IS the root 440 if reply_info: 441 # Extract root from the notification's reply structure 442 if isinstance(reply_info, dict): 443 root_ref = reply_info.get('root') 444 if root_ref and isinstance(root_ref, dict): 445 root_uri = root_ref.get('uri', post_uri) 446 root_cid = root_ref.get('cid', post_cid) 447 else: 448 # No root in reply info, use post as root 449 root_uri = post_uri 450 root_cid = post_cid 451 elif hasattr(reply_info, 'root'): 452 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 453 root_uri = reply_info.root.uri 454 root_cid = reply_info.root.cid 455 else: 456 root_uri = post_uri 457 root_cid = post_cid 458 else: 459 root_uri = post_uri 460 root_cid = post_cid 461 else: 462 # No reply info means this post IS the root 463 root_uri = post_uri 464 root_cid = post_cid 465 466 # Reply to the notification 467 return reply_to_post( 468 client=client, 469 text=reply_text, 470 reply_to_uri=post_uri, 471 reply_to_cid=post_cid, 472 root_uri=root_uri, 473 root_cid=root_cid, 474 lang=lang 475 ) 476 477 except Exception as e: 478 logger.error(f"Error replying to notification: {e}") 479 return None 480 481 482def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US") -> Optional[List[Dict[str, Any]]]: 483 """ 484 Reply to a notification with a threaded chain of messages (max 15). 485 486 Args: 487 client: Authenticated Bluesky client 488 notification: The notification object from list_notifications 489 reply_messages: List of reply texts (max 15 messages, each max 300 chars) 490 lang: Language code for the posts (defaults to "en-US") 491 492 Returns: 493 List of responses from sending the replies or None if failed 494 """ 495 try: 496 # Validate input 497 if not reply_messages or len(reply_messages) == 0: 498 logger.error("Reply messages list cannot be empty") 499 return None 500 if len(reply_messages) > 15: 501 logger.error(f"Cannot send more than 15 reply messages (got {len(reply_messages)})") 502 return None 503 504 # Get the post URI and CID from the notification (handle both dict and object) 505 if isinstance(notification, dict): 506 post_uri = notification.get('uri') 507 post_cid = notification.get('cid') 508 # Check if the notification record has reply info with root 509 record = notification.get('record', {}) 510 reply_info = record.get('reply') if isinstance(record, dict) else None 511 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 512 post_uri = notification.uri 513 post_cid = notification.cid 514 # Check if the notification record has reply info with root 515 reply_info = None 516 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 517 reply_info = notification.record.reply 518 else: 519 post_uri = None 520 post_cid = None 521 reply_info = None 522 523 if not post_uri or not post_cid: 524 logger.error("Notification doesn't have required uri/cid fields") 525 return None 526 527 # Determine root: if post has reply info, use its root; otherwise this post IS the root 528 if reply_info: 529 # Extract root from the notification's reply structure 530 if isinstance(reply_info, dict): 531 root_ref = reply_info.get('root') 532 if root_ref and isinstance(root_ref, dict): 533 root_uri = root_ref.get('uri', post_uri) 534 root_cid = root_ref.get('cid', post_cid) 535 else: 536 # No root in reply info, use post as root 537 root_uri = post_uri 538 root_cid = post_cid 539 elif hasattr(reply_info, 'root'): 540 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 541 root_uri = reply_info.root.uri 542 root_cid = reply_info.root.cid 543 else: 544 root_uri = post_uri 545 root_cid = post_cid 546 else: 547 root_uri = post_uri 548 root_cid = post_cid 549 else: 550 # No reply info means this post IS the root 551 root_uri = post_uri 552 root_cid = post_cid 553 554 # Send replies in sequence, creating a thread 555 responses = [] 556 current_parent_uri = post_uri 557 current_parent_cid = post_cid 558 559 for i, message in enumerate(reply_messages): 560 logger.info(f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...") 561 562 # Send this reply 563 response = reply_to_post( 564 client=client, 565 text=message, 566 reply_to_uri=current_parent_uri, 567 reply_to_cid=current_parent_cid, 568 root_uri=root_uri, 569 root_cid=root_cid, 570 lang=lang 571 ) 572 573 if not response: 574 logger.error(f"Failed to send reply {i+1}, posting system failure message") 575 # Try to post a system failure message 576 failure_response = reply_to_post( 577 client=client, 578 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]", 579 reply_to_uri=current_parent_uri, 580 reply_to_cid=current_parent_cid, 581 root_uri=root_uri, 582 root_cid=root_cid, 583 lang=lang 584 ) 585 if failure_response: 586 responses.append(failure_response) 587 current_parent_uri = failure_response.uri 588 current_parent_cid = failure_response.cid 589 else: 590 logger.error("Could not even send system failure message, stopping thread") 591 return responses if responses else None 592 else: 593 responses.append(response) 594 # Update parent references for next reply (if any) 595 if i < len(reply_messages) - 1: # Not the last message 596 current_parent_uri = response.uri 597 current_parent_cid = response.cid 598 599 logger.info(f"Successfully sent {len(responses)} threaded replies") 600 return responses 601 602 except Exception as e: 603 logger.error(f"Error sending threaded reply to notification: {e}") 604 return None 605 606 607def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]: 608 """ 609 Create a stream.thought.ack record for synthesis without a target post. 610 611 This creates a synthesis acknowledgment with null subject field. 612 613 Args: 614 client: Authenticated Bluesky client 615 note: The synthesis note/content 616 617 Returns: 618 The response from creating the acknowledgment record or None if failed 619 """ 620 try: 621 import requests 622 import json 623 from datetime import datetime, timezone 624 625 # Get session info from the client 626 access_token = None 627 user_did = None 628 629 # Try different ways to get the session info 630 if hasattr(client, '_session') and client._session: 631 access_token = client._session.access_jwt 632 user_did = client._session.did 633 elif hasattr(client, 'access_jwt'): 634 access_token = client.access_jwt 635 user_did = client.did if hasattr(client, 'did') else None 636 else: 637 logger.error("Cannot access client session information") 638 return None 639 640 if not access_token or not user_did: 641 logger.error("Missing access token or DID from session") 642 return None 643 644 from config_loader import get_bluesky_config 645 try: 646 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social') 647 except: 648 pds_host = os.getenv("PDS_URI", "https://bsky.social") 649 650 # Create acknowledgment record with null subject 651 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 652 ack_record = { 653 "$type": "stream.thought.ack", 654 "subject": None, # Null subject for synthesis 655 "createdAt": now, 656 "note": note 657 } 658 659 # Create the record 660 headers = {"Authorization": f"Bearer {access_token}"} 661 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 662 663 create_data = { 664 "repo": user_did, 665 "collection": "stream.thought.ack", 666 "record": ack_record 667 } 668 669 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 670 response.raise_for_status() 671 result = response.json() 672 673 logger.info(f"Successfully created synthesis acknowledgment") 674 return result 675 676 except Exception as e: 677 logger.error(f"Error creating synthesis acknowledgment: {e}") 678 return None 679 680 681def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]: 682 """ 683 Create a stream.thought.ack record to acknowledge a post. 684 685 This creates a custom acknowledgment record instead of a standard Bluesky like, 686 allowing void to track which posts it has engaged with. 687 688 Args: 689 client: Authenticated Bluesky client 690 post_uri: The URI of the post to acknowledge 691 post_cid: The CID of the post to acknowledge 692 note: Optional note to attach to the acknowledgment 693 694 Returns: 695 The response from creating the acknowledgment record or None if failed 696 """ 697 try: 698 import requests 699 import json 700 from datetime import datetime, timezone 701 702 # Get session info from the client 703 # The atproto Client stores the session differently 704 access_token = None 705 user_did = None 706 707 # Try different ways to get the session info 708 if hasattr(client, '_session') and client._session: 709 access_token = client._session.access_jwt 710 user_did = client._session.did 711 elif hasattr(client, 'access_jwt'): 712 access_token = client.access_jwt 713 user_did = client.did if hasattr(client, 'did') else None 714 else: 715 logger.error("Cannot access client session information") 716 return None 717 718 if not access_token or not user_did: 719 logger.error("Missing access token or DID from session") 720 return None 721 722 from config_loader import get_bluesky_config 723 try: 724 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social') 725 except: 726 pds_host = os.getenv("PDS_URI", "https://bsky.social") 727 728 # Create acknowledgment record with stream.thought.ack type 729 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 730 ack_record = { 731 "$type": "stream.thought.ack", 732 "subject": { 733 "uri": post_uri, 734 "cid": post_cid 735 }, 736 "createdAt": now, 737 "note": note # Will be null if no note provided 738 } 739 740 # Create the record 741 headers = {"Authorization": f"Bearer {access_token}"} 742 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 743 744 create_data = { 745 "repo": user_did, 746 "collection": "stream.thought.ack", 747 "record": ack_record 748 } 749 750 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 751 response.raise_for_status() 752 result = response.json() 753 754 logger.info(f"Successfully acknowledged post: {post_uri}") 755 return result 756 757 except Exception as e: 758 logger.error(f"Error acknowledging post: {e}") 759 return None 760 761 762def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 763 """ 764 Create a stream.thought.tool_call record to track tool usage. 765 766 This creates a record of tool calls made by void during processing, 767 allowing for analysis of tool usage patterns and debugging. 768 769 Args: 770 client: Authenticated Bluesky client 771 tool_name: Name of the tool being called 772 arguments: Raw JSON string of the tool arguments 773 tool_call_id: Optional ID of the tool call for correlation 774 775 Returns: 776 The response from creating the tool call record or None if failed 777 """ 778 try: 779 import requests 780 import json 781 from datetime import datetime, timezone 782 783 # Get session info from the client 784 access_token = None 785 user_did = None 786 787 # Try different ways to get the session info 788 if hasattr(client, '_session') and client._session: 789 access_token = client._session.access_jwt 790 user_did = client._session.did 791 elif hasattr(client, 'access_jwt'): 792 access_token = client.access_jwt 793 user_did = client.did if hasattr(client, 'did') else None 794 else: 795 logger.error("Cannot access client session information") 796 return None 797 798 if not access_token or not user_did: 799 logger.error("Missing access token or DID from session") 800 return None 801 802 from config_loader import get_bluesky_config 803 try: 804 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social') 805 except: 806 pds_host = os.getenv("PDS_URI", "https://bsky.social") 807 808 # Create tool call record 809 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 810 tool_record = { 811 "$type": "stream.thought.tool.call", 812 "tool_name": tool_name, 813 "arguments": arguments, # Store as string to avoid parsing issues 814 "createdAt": now 815 } 816 817 # Add tool_call_id if provided 818 if tool_call_id: 819 tool_record["tool_call_id"] = tool_call_id 820 821 # Create the record 822 headers = {"Authorization": f"Bearer {access_token}"} 823 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 824 825 create_data = { 826 "repo": user_did, 827 "collection": "stream.thought.tool.call", 828 "record": tool_record 829 } 830 831 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 832 if response.status_code != 200: 833 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}") 834 response.raise_for_status() 835 result = response.json() 836 837 logger.debug(f"Successfully recorded tool call: {tool_name}") 838 return result 839 840 except Exception as e: 841 logger.error(f"Error creating tool call record: {e}") 842 return None 843 844 845def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]: 846 """ 847 Create a stream.thought.reasoning record to track agent reasoning. 848 849 This creates a record of void's reasoning during message processing, 850 providing transparency into the decision-making process. 851 852 Args: 853 client: Authenticated Bluesky client 854 reasoning_text: The reasoning text from the agent 855 856 Returns: 857 The response from creating the reasoning record or None if failed 858 """ 859 try: 860 import requests 861 import json 862 from datetime import datetime, timezone 863 864 # Get session info from the client 865 access_token = None 866 user_did = None 867 868 # Try different ways to get the session info 869 if hasattr(client, '_session') and client._session: 870 access_token = client._session.access_jwt 871 user_did = client._session.did 872 elif hasattr(client, 'access_jwt'): 873 access_token = client.access_jwt 874 user_did = client.did if hasattr(client, 'did') else None 875 else: 876 logger.error("Cannot access client session information") 877 return None 878 879 if not access_token or not user_did: 880 logger.error("Missing access token or DID from session") 881 return None 882 883 from config_loader import get_bluesky_config 884 try: 885 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social') 886 except: 887 pds_host = os.getenv("PDS_URI", "https://bsky.social") 888 889 # Create reasoning record 890 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 891 reasoning_record = { 892 "$type": "stream.thought.reasoning", 893 "reasoning": reasoning_text, 894 "createdAt": now 895 } 896 897 # Create the record 898 headers = {"Authorization": f"Bearer {access_token}"} 899 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 900 901 create_data = { 902 "repo": user_did, 903 "collection": "stream.thought.reasoning", 904 "record": reasoning_record 905 } 906 907 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 908 response.raise_for_status() 909 result = response.json() 910 911 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)") 912 return result 913 914 except Exception as e: 915 logger.error(f"Error creating reasoning record: {e}") 916 return None 917 918 919if __name__ == "__main__": 920 client = default_login() 921 # do something with the client 922 logger.info("Client is ready to use!")