this repo has no description
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

Clean up scripts, remove duplicated code

+104 -176
+2 -32
scripts/crawl_follows.py
··· 11 11 from atproto import exceptions as at_exceptions 12 12 import pandas as pd 13 13 14 + from .utils import RateLimit 15 + 14 16 logger = logging.getLogger(__name__) 15 17 logger.setLevel(logging.INFO) 16 18 ··· 28 30 BATCH_SIZE = 10 29 31 CHECKPOINT_THRESHOLD = 100 30 32 REQUIRED_ENV = ("BSKY_USER", "BSKY_APP_PW") 31 - 32 - 33 - class RateLimit: 34 - def __init__(self, per_second: int): 35 - self.per_second = per_second 36 - self.cur_count = 0 37 - self.refresh_event = asyncio.Event() 38 - self.refresh_running = False 39 - 40 - async def sleep_then_refresh(self): 41 - await asyncio.sleep(1) 42 - self.cur_count = 0 43 - self.refresh_event.set() 44 - self.refresh_running = False 45 - 46 - async def acquire(self): 47 - # If we have remaining capacity in this second, dont block 48 - if self.cur_count < self.per_second: 49 - # Start timer for when our rate allocation refreshes 50 - if not self.refresh_running: 51 - self.refresh_running = True 52 - asyncio.create_task(self.sleep_then_refresh()) 53 - self.cur_count += 1 54 - return 55 - 56 - # Otherwise we need to wait until current second is over 57 - # and our rate allocation refreshes 58 - self.refresh_event.clear() 59 - await self.refresh_event.wait() 60 - 61 - # Just recursively call after waiting 62 - return await self.acquire() 63 33 64 34 65 35 async def get_all_follows(
+3 -48
scripts/get_follows.py
··· 1 1 import argparse 2 2 import asyncio 3 - from datetime import datetime 4 3 import gzip 5 4 import logging 6 5 import os ··· 10 9 from atproto import AsyncClient 11 10 from atproto import exceptions as at_exceptions 12 11 from atproto_client.models.app.bsky.graph.follow import Record as FollowRecord 13 - import pandas as pd 14 12 15 - from crawl_follows import RateLimit 13 + from utils import load_checkpoint, get_accounts, RateLimit 16 14 17 15 logger = logging.getLogger(__name__) 18 16 logger.setLevel(logging.INFO) ··· 80 78 graph_file: str, 81 79 checkpoint_dir: str, 82 80 ): 83 - 84 - # If checkpoint dir doesn't exist, try to create it 85 - if not os.path.isdir(checkpoint_dir): 86 - logger.info("Checkpoint dir doesn't exist, creating...") 87 - try: 88 - os.mkdir(checkpoint_dir) 89 - except Exception as e: 90 - logger.error(f"Failed to created checkpoint dir, {checkpoint_dir}\n{e}") 91 - sys.exit(1) 92 - 93 - # Checkpoint folders contain one file per user 94 - completed_accounts = set() 95 - try: 96 - files = os.listdir(checkpoint_dir) 97 - for file in files: 98 - # Grab entire file name except for .gz extension 99 - completed_accounts.add(file[:-3]) 100 - except Exception as e: 101 - logger.error( 102 - f"Failed to recover from checkpoint dir, {checkpoint_dir}\n{e}", 103 - exc_info=1, 104 - ) 105 - sys.exit(1) 106 - 107 - # Load follow graph parquet file 108 - to_explore = dict() 109 - try: 110 - logger.info("Parsing follower graph file...") 111 - follow_df = pd.read_parquet(graph_file) 112 - # Limit to only accounts following between 100 and 1000 followers 113 - follow_df = follow_df.loc[follow_df["follows"].str.len().between(100, 1000)] 114 - except Exception as e: 115 - logger.error(f"Failed to open follow graph file, {graph_file}\n{e}") 116 - sys.exit(1) 117 - 118 - for _, row in follow_df.iterrows(): 119 - for acct in row["follows"]: 120 - if acct not in completed_accounts: 121 - if acct not in to_explore: 122 - to_explore[acct] = 0 123 - to_explore[acct] += 1 124 - 125 - accts = [(acct, follows) for acct, follows in to_explore.items()] 126 - accts.sort(key=lambda x: -1 * x[1]) 127 - 81 + completed_accounts = load_checkpoint(checkpoint_dir) 82 + accts = get_accounts(graph_file, completed_accounts) 128 83 logger.info(f"Num of accounts to retrieve follows from: {len(accts)}") 129 84 130 85 client = AsyncClient()
+3 -46
scripts/get_likes.py
··· 10 10 from atproto import AsyncClient 11 11 from atproto import exceptions as at_exceptions 12 12 from atproto_client.models.app.bsky.feed.like import Record as LikeRecord 13 - import pandas as pd 14 13 15 - from crawl_follows import RateLimit 14 + from utils import load_checkpoint, get_accounts, RateLimit 16 15 17 16 logger = logging.getLogger(__name__) 18 17 logger.setLevel(logging.INFO) ··· 94 93 start_dt: datetime, 95 94 end_dt: datetime, 96 95 ): 97 - 98 - # If checkpoint dir doesn't exist, try to create it 99 - if not os.path.isdir(checkpoint_dir): 100 - logger.info("Checkpoint dir doesn't exist, creating...") 101 - try: 102 - os.mkdir(checkpoint_dir) 103 - except Exception as e: 104 - logger.error(f"Failed to created checkpoint dir, {checkpoint_dir}\n{e}") 105 - sys.exit(1) 106 - 107 96 # Checkpoint folders contain one file per user 108 - completed_accounts = set() 109 - try: 110 - files = os.listdir(checkpoint_dir) 111 - for file in files: 112 - # Grab entire file name except for .gz extension 113 - completed_accounts.add(file[:-3]) 114 - except Exception as e: 115 - logger.error( 116 - f"Failed to recover from checkpoint dir, {checkpoint_dir}\n{e}", 117 - exc_info=1, 118 - ) 119 - sys.exit(1) 120 - 121 - # Load follow graph parquet file 122 - to_explore = dict() 123 - try: 124 - logger.info("Parsing follower graph file...") 125 - follow_df = pd.read_parquet(graph_file) 126 - # Limit to only accounts following between 100 and 1000 followers 127 - follow_df = follow_df.loc[follow_df["follows"].str.len().between(100, 1000)] 128 - except Exception as e: 129 - logger.error(f"Failed to open follow graph file, {graph_file}\n{e}") 130 - sys.exit(1) 131 - 132 - for _, row in follow_df.iterrows(): 133 - for acct in row["follows"]: 134 - if acct not in completed_accounts: 135 - if acct not in to_explore: 136 - to_explore[acct] = 0 137 - to_explore[acct] += 1 138 - 139 - accts = [(acct, follows) for acct, follows in to_explore.items()] 140 - accts.sort(key=lambda x: -1 * x[1]) 141 - 97 + completed_accounts = load_checkpoint(checkpoint_dir) 98 + accts = get_accounts(graph_file, completed_accts=completed_accounts) 142 99 logger.info(f"Num of accounts to retrieve posts from: {len(accts)}") 143 100 144 101 client = AsyncClient()
+3 -50
scripts/get_posts.py
··· 1 1 import argparse 2 2 import asyncio 3 3 from datetime import datetime 4 - import decimal 5 4 import gzip 6 5 import json 7 6 import logging 8 7 import os 9 8 import sys 10 - import time 11 9 from typing import Tuple, List, Dict 12 10 13 11 from atproto import AsyncClient 14 12 from atproto import exceptions as at_exceptions 15 - from atproto_client.models.app.bsky.embed.record import ViewBlocked 16 13 from atproto_client.models.app.bsky.feed.defs import FeedViewPost 17 - import pandas as pd 18 - from rich import print 19 14 20 - from crawl_follows import RateLimit 15 + from utils import get_accounts, load_checkpoint, RateLimit 21 16 22 17 logger = logging.getLogger(__name__) 23 18 logger.setLevel(logging.INFO) ··· 157 152 start_dt: datetime, 158 153 end_dt: datetime, 159 154 ): 160 - 161 - # If checkpoint dir doesn't exist, try to create it 162 - if not os.path.isdir(checkpoint_dir): 163 - logger.info("Checkpoint dir doesn't exist, creating...") 164 - try: 165 - os.mkdir(checkpoint_dir) 166 - except Exception as e: 167 - logger.error(f"Failed to created checkpoint dir, {checkpoint_dir}\n{e}") 168 - sys.exit(1) 169 - 170 155 # Checkpoint folders contain one file per user 171 - completed_accounts = set() 172 - try: 173 - files = os.listdir(checkpoint_dir) 174 - for file in files: 175 - # Grab entire file name except for .gz extension 176 - completed_accounts.add(file[:-3]) 177 - except Exception as e: 178 - logger.error( 179 - f"Failed to recover from checkpoint dir, {checkpoint_dir}\n{e}", 180 - exc_info=1, 181 - ) 182 - sys.exit(1) 183 - 184 - # Load follow graph parquet file 185 - to_explore = dict() 186 - try: 187 - logger.info("Parsing follower graph file...") 188 - follow_df = pd.read_parquet(graph_file) 189 - # Limit to only accounts following between 100 and 1000 followers 190 - follow_df = follow_df.loc[follow_df["follows"].str.len().between(100, 1000)] 191 - except Exception as e: 192 - logger.error(f"Failed to open follow graph file, {graph_file}\n{e}") 193 - sys.exit(1) 194 - 195 - for _, row in follow_df.iterrows(): 196 - for acct in row["follows"]: 197 - if acct not in completed_accounts: 198 - if acct not in to_explore: 199 - to_explore[acct] = 0 200 - to_explore[acct] += 1 201 - 202 - accts = [(acct, follows) for acct, follows in to_explore.items()] 203 - accts.sort(key=lambda x: -1 * x[1]) 204 - 156 + completed_accounts = load_checkpoint(checkpoint_dir) 157 + accts = get_accounts(graph_file, completed_accounts) 205 158 logger.info(f"Num of accounts to retrieve posts from: {len(accts)}") 206 159 207 160 client = AsyncClient()
+93
scripts/utils.py
··· 1 + import asyncio 2 + import logging 3 + import os 4 + import sys 5 + 6 + import pandas as pd 7 + 8 + 9 + logger = logging.getLogger(__name__) 10 + logger.setLevel(logging.INFO) 11 + 12 + 13 + class RateLimit: 14 + def __init__(self, per_second: int): 15 + self.per_second = per_second 16 + self.cur_count = 0 17 + self.refresh_event = asyncio.Event() 18 + self.refresh_running = False 19 + 20 + async def sleep_then_refresh(self): 21 + await asyncio.sleep(1) 22 + self.cur_count = 0 23 + self.refresh_event.set() 24 + self.refresh_running = False 25 + 26 + async def acquire(self): 27 + # If we have remaining capacity in this second, dont block 28 + if self.cur_count < self.per_second: 29 + # Start timer for when our rate allocation refreshes 30 + if not self.refresh_running: 31 + self.refresh_running = True 32 + asyncio.create_task(self.sleep_then_refresh()) 33 + self.cur_count += 1 34 + return 35 + 36 + # Otherwise we need to wait until current second is over 37 + # and our rate allocation refreshes 38 + self.refresh_event.clear() 39 + await self.refresh_event.wait() 40 + 41 + # Just recursively call after waiting 42 + return await self.acquire() 43 + 44 + 45 + def load_checkpoint(ckpt_dir: str) -> set[str]: 46 + # If checkpoint dir doesn't exist, try to create it 47 + if not os.path.isdir(ckpt_dir): 48 + logger.info("Checkpoint dir doesn't exist, creating...") 49 + try: 50 + os.mkdir(ckpt_dir) 51 + except Exception as e: 52 + logger.error(f"Failed to created checkpoint dir, {ckpt_dir}\n{e}") 53 + sys.exit(1) 54 + 55 + # Checkpoint folders contain one file per user 56 + completed_accounts = set() 57 + try: 58 + files = os.listdir(ckpt_dir) 59 + for file in files: 60 + # Grab entire file name except for .gz extension 61 + completed_accounts.add(file[:-3]) 62 + except Exception as e: 63 + logger.error( 64 + f"Failed to recover from checkpoint dir, {ckpt_dir}\n{e}", 65 + exc_info=1, 66 + ) 67 + sys.exit(1) 68 + 69 + return completed_accounts 70 + 71 + 72 + def get_accounts(graph_path: str, completed_accts: set[str]) -> list[tuple[str, int]]: 73 + # Load follow graph parquet file 74 + to_explore = dict() 75 + try: 76 + logger.info("Parsing follower graph file...") 77 + follow_df = pd.read_parquet(graph_path) 78 + # Limit to only accounts following between 100 and 1000 followers 79 + follow_df = follow_df.loc[follow_df["follows"].str.len().between(100, 1000)] 80 + except Exception as e: 81 + logger.error(f"Failed to open follow graph file, {graph_path}\n{e}") 82 + sys.exit(1) 83 + 84 + for _, row in follow_df.iterrows(): 85 + for acct in row["follows"]: 86 + if acct not in completed_accts: 87 + if acct not in to_explore: 88 + to_explore[acct] = 0 89 + to_explore[acct] += 1 90 + 91 + accts = [(acct, follows) for acct, follows in to_explore.items()] 92 + accts.sort(key=lambda x: -1 * x[1]) 93 + return accts