#!/usr/bin/env python3 """CLI tool for publishing stream.thought.blip records to ATProto.""" import sys import json import logging from typing import Optional, List from datetime import datetime, timezone from pathlib import Path import click from rich.console import Console from rich.logging import RichHandler from atproto import Client try: from .config_loader import load_config, get_bluesky_config from .models import PublishRequest, BlipRecord except ImportError: # Handle running as script directly import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from config_loader import load_config, get_bluesky_config from models import PublishRequest, BlipRecord # Set up logging console = Console() logging.basicConfig( level=logging.INFO, format="%(message)s", datefmt="[%X]", handlers=[RichHandler(console=console, rich_tracebacks=True)] ) logger = logging.getLogger(__name__) class BlipPublisher: """Publisher for stream.thought.blip records.""" def __init__(self, config: dict): """Initialize the publisher.""" self.config = config self.bluesky_config = get_bluesky_config(config) self.client: Optional[Client] = None def authenticate(self) -> bool: """Authenticate with ATProto using configured credentials.""" try: username = self.bluesky_config['username'] password = self.bluesky_config['password'] pds_uri = self.bluesky_config.get('pds_uri', 'https://bsky.social') logger.debug(f"Authenticating {username} via {pds_uri}") self.client = Client(base_url=pds_uri) self.client.login(username, password) logger.debug("Authentication successful") return True except Exception as e: logger.error(f"Authentication failed: {e}") return False @property def user_did(self) -> Optional[str]: """Get the authenticated user's DID.""" if self.client and hasattr(self.client, 'me'): return self.client.me.did return None def publish_blip(self, content: str, created_at: Optional[datetime] = None) -> Optional[dict]: """ Publish a single blip record. Args: content: The blip content created_at: Optional timestamp (defaults to now) Returns: Response dict with uri and cid if successful, None otherwise """ if not self.client: logger.error("Not authenticated") return None try: # Create publish request request = PublishRequest(content=content, created_at=created_at) blip_record = request.to_record() # Convert to dict for ATProto client record_data = { "$type": "stream.thought.blip", "content": blip_record.content, "createdAt": blip_record.created_at.isoformat().replace("+00:00", "Z") } # Get user DID from session user_did = self.client.me.did # Create the record using the low-level API response = self.client.com.atproto.repo.create_record({ "repo": user_did, "collection": "stream.thought.blip", "record": record_data }) logger.debug(f"Published: {response.uri}") return { "uri": response.uri, "cid": response.cid, "content": content, "createdAt": record_data["createdAt"] } except Exception as e: logger.error(f"Failed to publish blip: {e}") return None def publish_batch(self, messages: List[str]) -> List[Optional[dict]]: """ Publish multiple blip records. Args: messages: List of message contents Returns: List of responses (None for failed publishes) """ results = [] for i, content in enumerate(messages, 1): logger.debug(f"Publishing {i}/{len(messages)}") result = self.publish_blip(content) results.append(result) # Small delay between messages to avoid rate limiting if i < len(messages): import time time.sleep(0.5) successful = sum(1 for r in results if r is not None) logger.debug(f"Published {successful}/{len(messages)} successfully") return results def read_from_stdin() -> Optional[str]: """Read content from stdin.""" if sys.stdin.isatty(): return None try: content = sys.stdin.read().strip() return content if content else None except Exception as e: logger.error(f"Failed to read from stdin: {e}") return None def read_from_file(file_path: str) -> List[str]: """Read messages from a file (one per line).""" try: path = Path(file_path) if not path.exists(): logger.error(f"File not found: {file_path}") return [] with open(path, 'r', encoding='utf-8') as f: lines = [line.strip() for line in f if line.strip()] logger.info(f"Read {len(lines)} messages from {file_path}") return lines except Exception as e: logger.error(f"Failed to read file {file_path}: {e}") return [] def interactive_input() -> Optional[str]: """Get content through interactive input.""" try: console.print("Enter your blip content (press Ctrl+D when done):") lines = [] while True: try: line = input() lines.append(line) except EOFError: break content = '\n'.join(lines).strip() return content if content else None except KeyboardInterrupt: console.print("\nCancelled") return None @click.command() @click.argument('content', required=False) @click.option('--config', '-c', type=click.Path(exists=True), help='Path to configuration file') @click.option('--file', '-f', type=click.Path(exists=True), help='File containing messages (one per line)') @click.option('--interactive', '-i', is_flag=True, help='Interactive input mode') @click.option('--output', type=click.Choice(['json', 'simple']), default='simple', help='Output format') @click.option('--verbose', '-v', is_flag=True, help='Enable verbose logging') def main(content: Optional[str], config: Optional[str], file: Optional[str], interactive: bool, output: str, verbose: bool): """ Publish stream.thought.blip records to ATProto. CONTENT can be provided as: - Command line argument - Piped from stdin - From a file (--file) - Interactive input (--interactive) """ # Set up logging level if verbose: logging.getLogger().setLevel(logging.DEBUG) try: # Load configuration app_config = load_config(config) # Create publisher and authenticate publisher = BlipPublisher(app_config) if not publisher.authenticate(): sys.exit(1) # Determine input source and get content messages = [] if file: # Read from file messages = read_from_file(file) if not messages: logger.error("No messages found in file") sys.exit(1) elif interactive: # Interactive input content = interactive_input() if not content: logger.error("No content provided") sys.exit(1) messages = [content] elif content: # Command line argument messages = [content] else: # Try stdin stdin_content = read_from_stdin() if stdin_content: messages = [stdin_content] else: logger.error("No content provided. Use --help for usage information.") sys.exit(1) # Publish messages if len(messages) == 1: result = publisher.publish_blip(messages[0]) if result: if output == 'json': console.print(json.dumps(result, indent=2)) else: console.print(f"✅ Published: {result['uri']}") else: sys.exit(1) else: results = publisher.publish_batch(messages) successful_results = [r for r in results if r is not None] if output == 'json': console.print(json.dumps(successful_results, indent=2)) else: console.print(f"✅ Published {len(successful_results)}/{len(messages)} messages") for result in successful_results: console.print(f" - {result['uri']}") if len(successful_results) < len(messages): sys.exit(1) except KeyboardInterrupt: logger.info("Interrupted by user") sys.exit(1) except Exception as e: logger.error(f"Fatal error: {e}") sys.exit(1) if __name__ == '__main__': main()