this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

1import click 2import datetime 3import html 4import json 5import pathlib 6import re 7import time 8import urllib.parse 9import zipfile 10 11from dateutil import parser 12from requests_oauthlib import OAuth1Session 13import sqlite_utils 14 15# Twitter API error codes 16RATE_LIMIT_ERROR_CODE = 88 17 18SINCE_ID_TYPES = { 19 "user": 1, 20 "home": 2, 21 "mentions": 3, 22 "search": 4, 23} 24COUNT_HISTORY_TYPES = { 25 "followers": 1, 26 "friends": 2, 27 "listed": 3, 28 # Don't track these - they're uninteresting and really noisy in terms 29 # of writing new rows to the count_history table: 30 # "favourites": 4, 31 # "statuses": 5, 32} 33 34source_re = re.compile('<a href="(?P<url>.*?)".*?>(?P<name>.*?)</a>') 35 36 37def open_database(db_path): 38 db = sqlite_utils.Database(db_path) 39 # Only run migrations if this is an existing DB (has tables) 40 if db.tables: 41 migrate(db) 42 return db 43 44 45def migrate(db): 46 from twitter_to_sqlite.migrations import MIGRATIONS 47 48 if "migrations" not in db.table_names(): 49 db["migrations"].create({"name": str, "applied": str}, pk="name") 50 applied_migrations = { 51 m[0] for m in db.conn.execute("select name from migrations").fetchall() 52 } 53 for migration in MIGRATIONS: 54 name = migration.__name__ 55 if name in applied_migrations: 56 continue 57 migration(db) 58 db["migrations"].insert( 59 {"name": name, "applied": datetime.datetime.utcnow().isoformat()} 60 ) 61 62 63def session_for_auth(auth): 64 return OAuth1Session( 65 client_key=auth["api_key"], 66 client_secret=auth["api_secret_key"], 67 resource_owner_key=auth["access_token"], 68 resource_owner_secret=auth["access_token_secret"], 69 ) 70 71 72def fetch_user_list_chunks( 73 session, user_id=None, screen_name=None, sleep=61, noun="followers" 74): 75 cursor = -1 76 users = [] 77 while cursor: 78 headers, body = fetch_user_list(session, cursor, user_id, screen_name, noun) 79 yield body["users"] 80 cursor = body["next_cursor"] 81 if not cursor: 82 break 83 time.sleep(sleep) # Rate limit = 15 per 15 minutes! 84 85 86def fetch_user_list(session, cursor, user_id=None, screen_name=None, noun="followers"): 87 args = user_args(user_id, screen_name) 88 args.update({"count": 200, "cursor": cursor}) 89 r = session.get( 90 "https://api.twitter.com/1.1/{}/list.json?".format(noun) 91 + urllib.parse.urlencode(args) 92 ) 93 return r.headers, r.json() 94 95 96def get_profile(db, session, user_id=None, screen_name=None): 97 if not (user_id or screen_name): 98 profile = session.get( 99 "https://api.twitter.com/1.1/account/verify_credentials.json" 100 ).json() 101 else: 102 args = user_args(user_id, screen_name) 103 url = "https://api.twitter.com/1.1/users/show.json" 104 if args: 105 url += "?" + urllib.parse.urlencode(args) 106 profile = session.get(url).json() 107 save_users(db, [profile]) 108 return profile 109 110 111def fetch_timeline( 112 session, 113 url, 114 db, 115 args=None, 116 sleep=1, 117 stop_after=None, 118 key=None, 119 since_id=None, 120 since=False, 121 since_type=None, 122 since_key=None, 123): 124 # See https://developer.twitter.com/en/docs/tweets/timelines/guides/working-with-timelines 125 if since and since_id: 126 raise click.ClickException("Use either --since or --since_id, not both") 127 128 since_type_id = None 129 last_since_id = None 130 if since_type is not None: 131 assert since_key is not None 132 since_type_id = SINCE_ID_TYPES[since_type] 133 # Figure out the last since_id in case we need it 134 try: 135 last_since_id = db.conn.execute( 136 """ 137 select since_id from since_ids 138 where type = ? and key = ? 139 """, 140 [since_type_id, since_key], 141 ).fetchall()[0][0] 142 except IndexError: 143 pass 144 145 if since: 146 # Load since_id from database 147 since_id = last_since_id 148 149 args = dict(args or {}) 150 args["count"] = 200 151 if stop_after is not None: 152 args["count"] = stop_after 153 if since_id: 154 args["since_id"] = since_id 155 args["tweet_mode"] = "extended" 156 min_seen_id = None 157 num_rate_limit_errors = 0 158 while True: 159 if min_seen_id is not None: 160 args["max_id"] = min_seen_id - 1 161 response = session.get(url, params=args) 162 tweets = response.json() 163 if "errors" in tweets: 164 # Was it a rate limit error? If so sleep and try again 165 if RATE_LIMIT_ERROR_CODE == tweets["errors"][0]["code"]: 166 num_rate_limit_errors += 1 167 assert num_rate_limit_errors < 5, "More than 5 rate limit errors" 168 print( 169 "Rate limit exceeded - will sleep 15s and try again {}".format( 170 repr(response.headers) 171 ) 172 ) 173 time.sleep(15) 174 continue 175 else: 176 raise Exception(str(tweets["errors"])) 177 if key is not None: 178 tweets = tweets[key] 179 if not tweets: 180 break 181 for tweet in tweets: 182 yield tweet 183 min_seen_id = min(t["id"] for t in tweets) 184 max_seen_id = max(t["id"] for t in tweets) 185 if last_since_id is not None: 186 max_seen_id = max((last_since_id, max_seen_id)) 187 last_since_id = max_seen_id 188 db["since_ids"].insert( 189 {"type": since_type_id, "key": since_key, "since_id": max_seen_id,}, 190 replace=True, 191 ) 192 if stop_after is not None: 193 break 194 time.sleep(sleep) 195 196 197def fetch_user_timeline( 198 session, 199 db, 200 user_id=None, 201 screen_name=None, 202 stop_after=None, 203 since_id=None, 204 since=False, 205): 206 args = user_args(user_id, screen_name) 207 yield from fetch_timeline( 208 session, 209 "https://api.twitter.com/1.1/statuses/user_timeline.json", 210 db, 211 args, 212 sleep=1, 213 stop_after=stop_after, 214 since_id=since_id, 215 since_type="user", 216 since_key="id:{}".format(user_id) if user_id else screen_name, 217 since=since, 218 ) 219 220 221def fetch_favorites(session, db, user_id=None, screen_name=None, stop_after=None): 222 args = user_args(user_id, screen_name) 223 # Rate limit 75/15 mins = 5/minute = every 12 seconds 224 sleep = 12 225 yield from fetch_timeline( 226 session, 227 "https://api.twitter.com/1.1/favorites/list.json", 228 db, 229 args, 230 sleep=sleep, 231 stop_after=stop_after, 232 ) 233 234 235def user_args(user_id, screen_name): 236 args = {} 237 if user_id: 238 args["user_id"] = user_id 239 if screen_name: 240 args["screen_name"] = screen_name 241 return args 242 243 244def expand_entities(s, entities): 245 for _, ents in entities.items(): 246 for ent in ents: 247 if "url" in ent: 248 replacement = ent["expanded_url"] or ent["url"] 249 s = s.replace(ent["url"], replacement) 250 return s 251 252 253def transform_user(user): 254 user["created_at"] = parser.parse(user["created_at"]) 255 if user["description"] and "description" in user.get("entities", {}): 256 user["description"] = expand_entities( 257 user["description"], user["entities"]["description"] 258 ) 259 if user["url"] and "url" in user.get("entities", {}): 260 user["url"] = expand_entities(user["url"], user["entities"]["url"]) 261 user.pop("entities", None) 262 user.pop("status", None) 263 to_remove = [k for k in user if k.endswith("_str")] 264 for key in to_remove: 265 del user[key] 266 267 268def transform_tweet(tweet): 269 tweet["full_text"] = html.unescape( 270 expand_entities(tweet["full_text"], tweet.pop("entities")) 271 ) 272 to_remove = [k for k in tweet if k.endswith("_str")] + [ 273 "quoted_status_id", 274 "quoted_status_permalink", 275 ] 276 for key in to_remove: 277 if key in tweet: 278 del tweet[key] 279 tweet["created_at"] = parser.parse(tweet["created_at"]).isoformat() 280 281 282def ensure_tables(db): 283 table_names = set(db.table_names()) 284 if "places" not in table_names: 285 db["places"].create({"id": str}, pk="id") 286 if "sources" not in table_names: 287 db["sources"].create({"id": str, "name": str, "url": str}, pk="id") 288 if "users" not in table_names: 289 db["users"].create( 290 { 291 "id": int, 292 "screen_name": str, 293 "name": str, 294 "description": str, 295 "location": str, 296 }, 297 pk="id", 298 ) 299 db["users"].enable_fts( 300 ["name", "screen_name", "description", "location"], create_triggers=True 301 ) 302 if "tweets" not in table_names: 303 db["tweets"].create( 304 { 305 "id": int, 306 "user": int, 307 "created_at": str, 308 "full_text": str, 309 "retweeted_status": int, 310 "quoted_status": int, 311 "place": str, 312 "source": str, 313 }, 314 pk="id", 315 foreign_keys=( 316 ("user", "users", "id"), 317 ("place", "places", "id"), 318 ("source", "sources", "id"), 319 ), 320 ) 321 db["tweets"].enable_fts(["full_text"], create_triggers=True) 322 db["tweets"].add_foreign_key("retweeted_status", "tweets") 323 db["tweets"].add_foreign_key("quoted_status", "tweets") 324 if "following" not in table_names: 325 db["following"].create( 326 {"followed_id": int, "follower_id": int, "first_seen": str}, 327 pk=("followed_id", "follower_id"), 328 foreign_keys=( 329 ("followed_id", "users", "id"), 330 ("follower_id", "users", "id"), 331 ), 332 ) 333 # Ensure following has indexes 334 following_indexes = {tuple(i.columns) for i in db["following"].indexes} 335 if ("followed_id",) not in following_indexes: 336 db["following"].create_index(["followed_id"]) 337 if ("follower_id",) not in following_indexes: 338 db["following"].create_index(["follower_id"]) 339 340 # Tables for tracking --since 341 if "since_ids" not in table_names: 342 db["since_id_types"].create({"id": int, "name": str,}, pk="id") 343 db["since_id_types"].insert_all( 344 [{"id": id, "name": name} for name, id in SINCE_ID_TYPES.items()] 345 ) 346 db["since_ids"].create( 347 {"type": int, "key": str, "since_id": int}, 348 pk=("type", "key"), 349 foreign_keys=(("type", "since_id_types", "id"),), 350 ) 351 352 # Tables for recording history of user follower counts etc 353 if "count_history" not in table_names: 354 db["count_history_types"].create({"id": int, "name": str,}, pk="id") 355 db["count_history_types"].insert_all( 356 [{"id": id, "name": name} for name, id in COUNT_HISTORY_TYPES.items()] 357 ) 358 db["count_history"].create( 359 {"type": int, "user": int, "datetime": str, "count": int}, 360 pk=("type", "user", "datetime"), 361 foreign_keys=( 362 ("type", "count_history_types", "id"), 363 ("user", "users", "id"), 364 ), 365 ) 366 367 368def save_tweets(db, tweets, favorited_by=None): 369 ensure_tables(db) 370 for tweet in tweets: 371 transform_tweet(tweet) 372 user = tweet.pop("user") 373 transform_user(user) 374 tweet["user"] = user["id"] 375 tweet["source"] = extract_and_save_source(db, tweet["source"]) 376 if tweet.get("place"): 377 db["places"].insert(tweet["place"], pk="id", alter=True, replace=True) 378 tweet["place"] = tweet["place"]["id"] 379 # extended_entities contains media 380 extended_entities = tweet.pop("extended_entities", None) 381 # Deal with nested retweeted_status / quoted_status 382 nested = [] 383 for tweet_key in ("quoted_status", "retweeted_status"): 384 if tweet.get(tweet_key): 385 nested.append(tweet[tweet_key]) 386 tweet[tweet_key] = tweet[tweet_key]["id"] 387 if nested: 388 save_tweets(db, nested) 389 db["users"].insert(user, pk="id", alter=True, replace=True) 390 save_user_counts(db, user) 391 table = db["tweets"].insert(tweet, pk="id", alter=True, replace=True) 392 if favorited_by is not None: 393 db["favorited_by"].insert( 394 {"tweet": tweet["id"], "user": favorited_by}, 395 pk=("user", "tweet"), 396 foreign_keys=("tweet", "user"), 397 replace=True, 398 ) 399 if extended_entities and extended_entities.get("media"): 400 for media in extended_entities["media"]: 401 # TODO: Remove this line when .m2m() grows alter=True 402 db["media"].insert(media, pk="id", alter=True, replace=True) 403 table.m2m("media", media, pk="id") 404 405 406def save_users(db, users, followed_id=None, follower_id=None): 407 assert not (followed_id and follower_id) 408 ensure_tables(db) 409 for user in users: 410 transform_user(user) 411 db["users"].insert_all(users, pk="id", alter=True, replace=True) 412 for user in users: 413 save_user_counts(db, user) 414 if followed_id or follower_id: 415 first_seen = datetime.datetime.utcnow().isoformat() 416 db["following"].insert_all( 417 ( 418 { 419 "followed_id": followed_id or user["id"], 420 "follower_id": follower_id or user["id"], 421 "first_seen": first_seen, 422 } 423 for user in users 424 ), 425 ignore=True, 426 ) 427 428 429def fetch_user_batches(session, ids_or_screen_names, use_ids=False, sleep=1): 430 # Yields lists of up to 70 users (tried 100 but got this error: 431 # # {'code': 18, 'message': 'Too many terms specified in query.'} ) 432 batches = [] 433 batch = [] 434 for id in ids_or_screen_names: 435 batch.append(id) 436 if len(batch) == 70: 437 batches.append(batch) 438 batch = [] 439 if batch: 440 batches.append(batch) 441 url = "https://api.twitter.com/1.1/users/lookup.json" 442 for batch in batches: 443 if use_ids: 444 args = {"user_id": ",".join(map(str, batch))} 445 else: 446 args = {"screen_name": ",".join(batch)} 447 users = session.get(url, params=args).json() 448 yield users 449 time.sleep(sleep) 450 451 452def fetch_status_batches(session, tweet_ids, sleep=1): 453 # Yields lists of up to 100 tweets 454 batches = [] 455 batch = [] 456 for id in tweet_ids: 457 batch.append(id) 458 if len(batch) == 100: 459 batches.append(batch) 460 batch = [] 461 if batch: 462 batches.append(batch) 463 url = "https://api.twitter.com/1.1/statuses/lookup.json" 464 for batch in batches: 465 args = {"id": ",".join(map(str, batch)), "tweet_mode": "extended"} 466 tweets = session.get(url, params=args).json() 467 yield tweets 468 time.sleep(sleep) 469 470 471def resolve_identifiers(db, identifiers, attach, sql): 472 if sql: 473 if attach: 474 for filepath in attach: 475 if ":" in filepath: 476 alias, filepath = filepath.split(":", 1) 477 else: 478 alias = filepath.split("/")[-1].split(".")[0] 479 attach_sql = """ 480 ATTACH DATABASE '{}' AS [{}]; 481 """.format( 482 str(pathlib.Path(filepath).resolve()), alias 483 ) 484 db.conn.execute(attach_sql) 485 sql_identifiers = [r[0] for r in db.conn.execute(sql).fetchall()] 486 else: 487 sql_identifiers = [] 488 return list(identifiers) + sql_identifiers 489 490 491def fetch_and_save_list(db, session, identifier, identifier_is_id=False): 492 show_url = "https://api.twitter.com/1.1/lists/show.json" 493 args = {} 494 if identifier_is_id: 495 args["list_id"] = identifier 496 else: 497 screen_name, slug = identifier.split("/") 498 args.update({"owner_screen_name": screen_name, "slug": slug}) 499 # First fetch the list details 500 data = session.get(show_url, params=args).json() 501 list_id = data["id"] 502 del data["id_str"] 503 user = data.pop("user") 504 save_users(db, [user]) 505 data["user"] = user["id"] 506 data["created_at"] = parser.parse(data["created_at"]) 507 db["lists"].insert(data, pk="id", foreign_keys=("user",), replace=True) 508 # Now fetch the members 509 url = "https://api.twitter.com/1.1/lists/members.json" 510 cursor = -1 511 while cursor: 512 args.update({"count": 5000, "cursor": cursor}) 513 body = session.get(url, params=args).json() 514 users = body["users"] 515 save_users(db, users) 516 db["list_members"].insert_all( 517 ({"list": list_id, "user": user["id"]} for user in users), 518 pk=("list", "user"), 519 foreign_keys=("list", "user"), 520 replace=True, 521 ) 522 cursor = body["next_cursor"] 523 if not cursor: 524 break 525 time.sleep(1) # Rate limit = 900 per 15 minutes 526 527 528def cursor_paginate(session, url, args, key, page_size=200, sleep=None): 529 "Execute cursor pagination, yelding 'key' for each page" 530 args = dict(args) 531 args["page_size"] = page_size 532 cursor = -1 533 while cursor: 534 args["cursor"] = cursor 535 r = session.get(url, params=args) 536 raise_if_error(r) 537 body = r.json() 538 yield body[key] 539 cursor = body["next_cursor"] 540 if not cursor: 541 break 542 if sleep is not None: 543 time.sleep(sleep) 544 545 546class TwitterApiError(Exception): 547 def __init__(self, headers, body): 548 self.headers = headers 549 self.body = body 550 551 def __repr__(self): 552 return "{}: {}".format(self.body, self.headers) 553 554 555def raise_if_error(r): 556 if "errors" in r.json(): 557 raise TwitterApiError(r.headers, r.json()["errors"]) 558 559 560def stream_filter(session, track=None, follow=None, locations=None, language=None): 561 session.stream = True 562 args = {"tweet_mode": "extended"} 563 for key, value in ( 564 ("track", track), 565 ("follow", follow), 566 ("locations", locations), 567 ("language", language), 568 ): 569 if value is None: 570 continue 571 if not isinstance(value, str): 572 value = ",".join(map(str, value)) 573 args[key] = value 574 while True: 575 response = session.post( 576 "https://stream.twitter.com/1.1/statuses/filter.json", params=args 577 ) 578 for line in response.iter_lines(chunk_size=10000): 579 if line.strip().startswith(b"{"): 580 tweet = json.loads(line) 581 # Only yield tweet if it has an 'id' and 'created_at' 582 # - otherwise it's probably a maintenance message, see 583 # https://developer.twitter.com/en/docs/tweets/filter-realtime/overview/statuses-filter 584 if "id" in tweet and "created_at" in tweet: 585 # 'Fix' weird tweets from streaming API 586 fix_streaming_tweet(tweet) 587 yield tweet 588 else: 589 print(tweet) 590 time.sleep(1) 591 592 593def fix_streaming_tweet(tweet): 594 if "extended_tweet" in tweet: 595 tweet.update(tweet.pop("extended_tweet")) 596 if "full_text" not in tweet: 597 tweet["full_text"] = tweet["text"] 598 if "retweeted_status" in tweet: 599 fix_streaming_tweet(tweet["retweeted_status"]) 600 if "quoted_status" in tweet: 601 fix_streaming_tweet(tweet["quoted_status"]) 602 603 604def user_ids_for_screen_names(db, screen_names): 605 sql = "select id from users where lower(screen_name) in ({})".format( 606 ", ".join(["?"] * len(screen_names)) 607 ) 608 return [ 609 r[0] for r in db.conn.execute(sql, [s.lower() for s in screen_names]).fetchall() 610 ] 611 612 613def read_archive_js(filepath): 614 "Open zip file, return (filename, content) for all .js" 615 zf = zipfile.ZipFile(filepath) 616 for zi in zf.filelist: 617 if zi.filename.endswith(".js"): 618 yield zi.filename, zf.open(zi.filename).read() 619 620 621def extract_and_save_source(db, source): 622 m = source_re.match(source) 623 details = m.groupdict() 624 return db["sources"].insert(details, hash_id="id", replace=True).last_pk 625 626 627def save_user_counts(db, user): 628 for type_name, type_id in COUNT_HISTORY_TYPES.items(): 629 previous_count = None 630 try: 631 previous_count = db.conn.execute( 632 """ 633 select count from count_history 634 where type = ? and user = ? 635 order by datetime desc limit 1 636 """, 637 [type_id, user["id"]], 638 ).fetchall()[0][0] 639 except IndexError: 640 pass 641 current_count = user["{}_count".format(type_name)] 642 if current_count != previous_count: 643 db["count_history"].insert( 644 { 645 "type": type_id, 646 "user": user["id"], 647 "datetime": datetime.datetime.utcnow().isoformat().split(".")[0] 648 + "+00:00", 649 "count": current_count, 650 }, 651 replace=True, 652 )