this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: resurrect most-liked

+120 -59
+2 -5
feedgen.py
··· 10 10 import websockets 11 11 12 12 from feed_manager import feed_manager 13 - from firehose_manager import FirehoseManager 14 13 15 14 logging.basicConfig( 16 15 format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s', ··· 20 19 logging.getLogger('feeds').setLevel(logging.DEBUG) 21 20 logging.getLogger('firehose').setLevel(logging.DEBUG) 22 21 23 - async def firehose_events(firehose_manager): 22 + async def firehose_events(): 24 23 relay_url = 'ws://localhost:6008/subscribe' 25 24 26 25 logger = logging.getLogger('feeds.events') ··· 32 31 yield json.load(payload) 33 32 34 33 async def main(): 35 - firehose_manager = FirehoseManager() 36 34 event_count = 0 37 35 38 - async for commit in firehose_events(firehose_manager): 36 + async for commit in firehose_events(): 39 37 feed_manager.process_commit(commit) 40 38 event_count += 1 41 39 if event_count % 2500 == 0: 42 40 feed_manager.commit_changes() 43 - firehose_manager.set_sequence_number(commit['seq']) 44 41 45 42 if __name__ == '__main__': 46 43 asyncio.run(main())
+118 -24
feeds/mostliked.py
··· 2 2 3 3 import apsw 4 4 import apsw.ext 5 + import threading 6 + import queue 5 7 6 8 from . import BaseFeed 7 9 10 + class DatabaseWorker(threading.Thread): 11 + def __init__(self, name, db_path, task_queue): 12 + super().__init__() 13 + self.db_cnx = apsw.Connection(db_path) 14 + self.db_cnx.pragma('foreign_keys', True) 15 + self.db_cnx.pragma('journal_mode', 'WAL') 16 + self.db_cnx.pragma('wal_autocheckpoint', '0') 17 + self.stop_signal = False 18 + self.task_queue = task_queue 19 + self.logger = logging.getLogger(f'feeds.db.{name}') 20 + self.changes = 0 21 + 22 + def run(self): 23 + while not self.stop_signal: 24 + task = self.task_queue.get(block=True) 25 + if task == 'STOP': 26 + self.stop_signal = True 27 + elif task == 'COMMIT': 28 + self.logger.debug(f'committing {self.changes} changes') 29 + if self.db_cnx.in_transaction: 30 + self.db_cnx.execute('COMMIT') 31 + checkpoint = self.db_cnx.execute('PRAGMA wal_checkpoint(PASSIVE)') 32 + self.logger.debug(f'checkpoint: {checkpoint.fetchall()!r}') 33 + self.changes = 0 34 + self.logger.debug(f'qsize: {self.task_queue.qsize()}') 35 + else: 36 + sql, bindings = task 37 + if not self.db_cnx.in_transaction: 38 + self.db_cnx.execute('BEGIN') 39 + self.db_cnx.execute(sql, bindings) 40 + self.changes += self.db_cnx.changes() 41 + self.task_queue.task_done() 42 + self.db_cnx.close() 43 + 44 + def stop(self): 45 + self.task_queue.put('STOP') 46 + 8 47 class MostLikedFeed(BaseFeed): 9 48 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/most-liked' 10 - SERVE_FEED_QUERY = """ 11 - select uri, create_ts, unixepoch('now', '-24 hours'), create_ts - unixepoch('now', '-24 hours'), likes 12 - from posts 13 - where create_ts >= unixepoch('now', '-24 hours') 14 - order by likes desc, create_ts asc 15 - limit :limit offset :offset 16 - """ 17 49 DELETE_OLD_POSTS_QUERY = """ 18 - delete from posts 19 - where create_ts < unixepoch('now', '-24 hours') 50 + delete from posts where ( 51 + create_ts < unixepoch('now', '-15 minutes') and likes < 2 52 + ) or create_ts < unixepoch('now', '-24 hours'); 20 53 """ 21 54 22 55 def __init__(self): 23 56 self.db_cnx = apsw.Connection('db/mostliked.db') 57 + self.db_cnx.pragma('foreign_keys', True) 24 58 self.db_cnx.pragma('journal_mode', 'WAL') 25 59 self.db_cnx.pragma('wal_autocheckpoint', '0') 26 60 27 61 with self.db_cnx: 28 62 self.db_cnx.execute(""" 29 63 create table if not exists posts ( 30 - uri text, create_ts timestamp, likes int 64 + uri text primary key, 65 + create_ts timestamp, 66 + likes int 67 + ); 68 + create table if not exists langs ( 69 + uri text, 70 + lang text, 71 + foreign key(uri) references posts(uri) on delete cascade 31 72 ); 32 - create unique index if not exists uri_idx on posts(uri); 33 - create index if not exists create_ts_idx on posts(create_ts); 73 + create index if not exists ts_idx on posts(create_ts); 34 74 """) 35 75 36 76 self.logger = logging.getLogger('feeds.mostliked') 37 77 78 + self.db_writes = queue.Queue() 79 + db_worker = DatabaseWorker('mostliked', 'db/mostliked.db', self.db_writes) 80 + db_worker.start() 81 + 38 82 def process_commit(self, commit): 39 - return 83 + if commit['opType'] != 'c': 84 + return 40 85 41 - def delete_old_posts(self): 42 - self.db_cnx.execute(self.DELETE_OLD_POSTS_QUERY) 43 - self.logger.debug('deleted {} old posts'.format(self.db_cnx.changes())) 86 + if commit['collection'] == 'app.bsky.feed.post': 87 + record = commit.get('record') 88 + post_uri = f"at://{commit['did']}/app.bsky.feed.post/{commit['rkey']}" 89 + task = ( 90 + 'insert or ignore into posts (uri, create_ts, likes) values (:uri, :ts, 0)', 91 + {'uri': post_uri, 'ts': self.safe_timestamp(record.get('createdAt')).timestamp()} 92 + ) 93 + self.db_writes.put(task) 94 + 95 + langs = record.get('langs', []) 96 + for lang in langs: 97 + task = ( 98 + 'insert or ignore into langs (uri, lang) values (:uri, :lang)', 99 + {'uri': post_uri, 'lang': lang} 100 + ) 101 + self.db_writes.put(task) 102 + 103 + elif commit['collection'] == 'app.bsky.feed.like': 104 + record = commit.get('record') 105 + try: 106 + subject_uri = record['subject']['uri'] 107 + except KeyError: 108 + return 109 + 110 + task = ( 111 + 'update posts set likes = likes + 1 where uri = :uri', 112 + {'uri': subject_uri} 113 + ) 114 + self.db_writes.put(task) 44 115 45 116 def commit_changes(self): 46 - self.delete_old_posts() 47 - self.logger.debug('committing changes') 48 - self.transaction_commit(self.db_cnx) 49 - self.wal_checkpoint(self.db_cnx, 'RESTART') 117 + self.db_writes.put((self.DELETE_OLD_POSTS_QUERY, {})) 118 + self.db_writes.put('COMMIT') 119 + 120 + def generate_sql(self, limit, offset, langs): 121 + bindings = [] 122 + sql = """ 123 + select posts.uri, create_ts, create_ts - unixepoch('now', '-15 minutes') as rem, likes, lang 124 + from posts 125 + left join langs on posts.uri = langs.uri 126 + where 127 + """ 128 + if not '*' in langs: 129 + lang_values = list(langs.values()) 130 + bindings.extend(lang_values) 131 + sql += " OR ".join(['lang = ?'] * len(lang_values)) 132 + else: 133 + sql += " 1=1 " 134 + sql += """ 135 + order by likes desc, create_ts desc 136 + limit ? offset ? 137 + """ 138 + bindings.extend([limit, offset]) 139 + return sql, bindings 50 140 51 141 def serve_feed(self, limit, offset, langs): 52 - return [ 53 - 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.post/3l3cgg5vbc72i' 54 - ] 142 + sql, bindings = self.generate_sql(limit, offset, langs) 143 + cur = self.db_cnx.execute(sql, bindings) 144 + return [row[0] for row in cur] 55 145 56 146 def serve_feed_debug(self, limit, offset, langs): 57 - pass 147 + sql, bindings = self.generate_sql(limit, offset, langs) 148 + return apsw.ext.format_query_table( 149 + self.db_cnx, sql, bindings, 150 + string_sanitize=2, text_width=9999, use_unicode=True 151 + )
-30
firehose_manager.py
··· 1 - import logging 2 - 3 - import apsw 4 - 5 - class FirehoseManager: 6 - def __init__(self, fname='db/firehose.db'): 7 - self.db_cnx = apsw.Connection(fname) 8 - self.db_cnx.pragma('journal_mode', 'WAL') 9 - with self.db_cnx: 10 - self.db_cnx.execute("create table if not exists firehose(key text unique, value text)") 11 - 12 - self.logger = logging.getLogger('firehose.manager') 13 - 14 - def get_sequence_number(self): 15 - row = self.db_cnx.execute("select * from firehose where key = 'seq'").fetchone() 16 - if row is None: 17 - return None 18 - (key, value) = row 19 - return int(value) 20 - 21 - def set_sequence_number(self, value): 22 - self.logger.debug(f'setting sequence number = {value}') 23 - 24 - with self.db_cnx: 25 - self.db_cnx.execute( 26 - "insert into firehose (key, value) values ('seq', :value) on conflict(key) do update set value = :value", 27 - dict(value=value) 28 - ) 29 - 30 - self.db_cnx.pragma('wal_checkpoint(RESTART)')