this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 717 lines 23 kB view raw
1import click 2import datetime 3import html 4import json 5import pathlib 6import re 7import sqlite3 8import time 9import urllib.parse 10import zipfile 11 12from dateutil import parser 13from requests import Session 14from requests.auth import AuthBase 15from requests_oauthlib import OAuth1Session 16import sqlite_utils 17 18# Twitter API error codes 19RATE_LIMIT_ERROR_CODE = 88 20 21SINCE_ID_TYPES = { 22 "user": 1, 23 "home": 2, 24 "mentions": 3, 25 "search": 4, 26} 27COUNT_HISTORY_TYPES = { 28 "followers": 1, 29 "friends": 2, 30 "listed": 3, 31 # Don't track these - they're uninteresting and really noisy in terms 32 # of writing new rows to the count_history table: 33 # "favourites": 4, 34 # "statuses": 5, 35} 36 37source_re = re.compile('<a href="(?P<url>.*?)".*?>(?P<name>.*?)</a>') 38 39 40class UserDoesNotExist(click.ClickException): 41 def __init__(self, identifier): 42 super().__init__("User '{}' does not exist".format(identifier)) 43 44 45def open_database(db_path): 46 db = sqlite_utils.Database(db_path) 47 # Only run migrations if this is an existing DB (has tables) 48 if db.tables: 49 migrate(db) 50 return db 51 52 53def migrate(db): 54 from twitter_to_sqlite.migrations import MIGRATIONS 55 56 if "migrations" not in db.table_names(): 57 db["migrations"].create({"name": str, "applied": str}, pk="name") 58 applied_migrations = { 59 m[0] for m in db.conn.execute("select name from migrations").fetchall() 60 } 61 for migration in MIGRATIONS: 62 name = migration.__name__ 63 if name in applied_migrations: 64 continue 65 migration(db) 66 db["migrations"].insert( 67 {"name": name, "applied": datetime.datetime.utcnow().isoformat()} 68 ) 69 70 71class BearerTokenAuth(AuthBase): 72 def __init__(self, bearer_token): 73 self.bearer_token = bearer_token 74 75 def __call__(self, r): 76 r.headers["Authorization"] = "Bearer " + self.bearer_token 77 return r 78 79 80def session_for_auth(auth): 81 if "bearer_token" in auth: 82 session = Session() 83 session.auth = BearerTokenAuth(auth["bearer_token"]) 84 return session 85 else: 86 return OAuth1Session( 87 client_key=auth["api_key"], 88 client_secret=auth["api_secret_key"], 89 resource_owner_key=auth["access_token"], 90 resource_owner_secret=auth["access_token_secret"], 91 ) 92 93 94def fetch_user_list_chunks( 95 session, user_id=None, screen_name=None, sleep=61, noun="followers" 96): 97 cursor = -1 98 users = [] 99 while cursor: 100 headers, body = fetch_user_list(session, cursor, user_id, screen_name, noun) 101 yield body["users"] 102 cursor = body["next_cursor"] 103 if not cursor: 104 break 105 time.sleep(sleep) # Rate limit = 15 per 15 minutes! 106 107 108def fetch_user_list(session, cursor, user_id=None, screen_name=None, noun="followers"): 109 args = user_args(user_id, screen_name) 110 args.update({"count": 200, "cursor": cursor}) 111 r = session.get( 112 "https://api.twitter.com/1.1/{}/list.json?".format(noun) 113 + urllib.parse.urlencode(args) 114 ) 115 return r.headers, r.json() 116 117 118def fetch_lists(db, session, user_id=None, screen_name=None): 119 lists_url = "https://api.twitter.com/1.1/lists/ownerships.json" 120 args = user_args(user_id, screen_name) 121 args["count"] = 1000 122 fetched_lists = [] 123 # For the moment we don't paginate 124 for list_row in session.get(lists_url, params=args).json()["lists"]: 125 del list_row["id_str"] 126 user = list_row.pop("user") 127 save_users(db, [user]) 128 list_row["user"] = user["id"] 129 list_row["created_at"] = parser.parse(list_row["created_at"]) 130 fetched_lists.append(list_row) 131 db["lists"].insert_all(fetched_lists, pk="id", foreign_keys=("user",), replace=True) 132 return fetched_lists 133 134 135def get_profile(db, session, user_id=None, screen_name=None): 136 if not (user_id or screen_name): 137 profile = session.get( 138 "https://api.twitter.com/1.1/account/verify_credentials.json" 139 ).json() 140 else: 141 args = user_args(user_id, screen_name) 142 url = "https://api.twitter.com/1.1/users/show.json" 143 if args: 144 url += "?" + urllib.parse.urlencode(args) 145 response = session.get(url) 146 if response.status_code == 404: 147 raise UserDoesNotExist(screen_name or user_id) 148 profile = response.json() 149 save_users(db, [profile]) 150 return profile 151 152 153def fetch_timeline( 154 session, 155 url, 156 db, 157 args=None, 158 sleep=1, 159 stop_after=None, 160 key=None, 161 since_id=None, 162 since=False, 163 since_type=None, 164 since_key=None, 165): 166 # See https://developer.twitter.com/en/docs/tweets/timelines/guides/working-with-timelines 167 if since and since_id: 168 raise click.ClickException("Use either --since or --since_id, not both") 169 170 since_type_id = None 171 last_since_id = None 172 if since_type is not None: 173 assert since_key is not None 174 since_type_id = SINCE_ID_TYPES[since_type] 175 # Figure out the last since_id in case we need it 176 try: 177 last_since_id = db.conn.execute( 178 """ 179 select since_id from since_ids 180 where type = ? and key = ? 181 """, 182 [since_type_id, since_key], 183 ).fetchall()[0][0] 184 except (IndexError, sqlite3.OperationalError): 185 pass 186 187 if since: 188 # Load since_id from database 189 since_id = last_since_id 190 191 args = dict(args or {}) 192 args["count"] = 200 193 if stop_after is not None: 194 args["count"] = stop_after 195 if since_id: 196 args["since_id"] = since_id 197 args["tweet_mode"] = "extended" 198 min_seen_id = None 199 num_rate_limit_errors = 0 200 while True: 201 if min_seen_id is not None: 202 args["max_id"] = min_seen_id - 1 203 response = session.get(url, params=args) 204 tweets = response.json() 205 if "errors" in tweets: 206 # Was it a rate limit error? If so sleep and try again 207 if RATE_LIMIT_ERROR_CODE == tweets["errors"][0]["code"]: 208 num_rate_limit_errors += 1 209 assert num_rate_limit_errors < 5, "More than 5 rate limit errors" 210 print( 211 "Rate limit exceeded - will sleep 15s and try again {}".format( 212 repr(response.headers) 213 ) 214 ) 215 time.sleep(15) 216 continue 217 else: 218 raise Exception(str(tweets["errors"])) 219 if key is not None: 220 tweets = tweets[key] 221 if not tweets: 222 break 223 for tweet in tweets: 224 yield tweet 225 min_seen_id = min(t["id"] for t in tweets) 226 max_seen_id = max(t["id"] for t in tweets) 227 if last_since_id is not None: 228 max_seen_id = max((last_since_id, max_seen_id)) 229 last_since_id = max_seen_id 230 if since_type_id is not None and since_key is not None: 231 db["since_ids"].insert( 232 { 233 "type": since_type_id, 234 "key": since_key, 235 "since_id": max_seen_id, 236 }, 237 replace=True, 238 ) 239 if stop_after is not None: 240 break 241 time.sleep(sleep) 242 243 244def fetch_user_timeline( 245 session, 246 db, 247 user_id=None, 248 screen_name=None, 249 stop_after=None, 250 since_id=None, 251 since=False, 252): 253 args = user_args(user_id, screen_name) 254 yield from fetch_timeline( 255 session, 256 "https://api.twitter.com/1.1/statuses/user_timeline.json", 257 db, 258 args, 259 sleep=1, 260 stop_after=stop_after, 261 since_id=since_id, 262 since_type="user", 263 since_key="id:{}".format(user_id) if user_id else screen_name, 264 since=since, 265 ) 266 267 268def fetch_favorites(session, db, user_id=None, screen_name=None, stop_after=None): 269 args = user_args(user_id, screen_name) 270 # Rate limit 75/15 mins = 5/minute = every 12 seconds 271 sleep = 12 272 yield from fetch_timeline( 273 session, 274 "https://api.twitter.com/1.1/favorites/list.json", 275 db, 276 args, 277 sleep=sleep, 278 stop_after=stop_after, 279 ) 280 281 282def user_args(user_id, screen_name): 283 args = {} 284 if user_id: 285 args["user_id"] = user_id 286 if screen_name: 287 args["screen_name"] = screen_name 288 return args 289 290 291def expand_entities(s, entities): 292 for _, ents in entities.items(): 293 for ent in ents: 294 if "url" in ent: 295 replacement = ent["expanded_url"] or ent["url"] 296 s = s.replace(ent["url"], replacement) 297 return s 298 299 300def transform_user(user): 301 user["created_at"] = parser.parse(user["created_at"]) 302 if user["description"] and "description" in user.get("entities", {}): 303 user["description"] = expand_entities( 304 user["description"], user["entities"]["description"] 305 ) 306 if user["url"] and "url" in user.get("entities", {}): 307 user["url"] = expand_entities(user["url"], user["entities"]["url"]) 308 user.pop("entities", None) 309 user.pop("status", None) 310 to_remove = [k for k in user if k.endswith("_str")] 311 for key in to_remove: 312 del user[key] 313 314 315def transform_tweet(tweet): 316 tweet["full_text"] = html.unescape( 317 expand_entities(tweet["full_text"], tweet.pop("entities")) 318 ) 319 to_remove = [k for k in tweet if k.endswith("_str")] + [ 320 "quoted_status_id", 321 "quoted_status_permalink", 322 ] 323 for key in to_remove: 324 if key in tweet: 325 del tweet[key] 326 tweet["created_at"] = parser.parse(tweet["created_at"]).isoformat() 327 328 329def ensure_tables(db): 330 table_names = set(db.table_names()) 331 if "places" not in table_names: 332 db["places"].create({"id": str}, pk="id") 333 if "sources" not in table_names: 334 db["sources"].create({"id": str, "name": str, "url": str}, pk="id") 335 if "users" not in table_names: 336 db["users"].create( 337 { 338 "id": int, 339 "screen_name": str, 340 "name": str, 341 "description": str, 342 "location": str, 343 }, 344 pk="id", 345 ) 346 db["users"].enable_fts( 347 ["name", "screen_name", "description", "location"], create_triggers=True 348 ) 349 if "tweets" not in table_names: 350 db["tweets"].create( 351 { 352 "id": int, 353 "user": int, 354 "created_at": str, 355 "full_text": str, 356 "retweeted_status": int, 357 "quoted_status": int, 358 "place": str, 359 "source": str, 360 }, 361 pk="id", 362 foreign_keys=( 363 ("user", "users", "id"), 364 ("place", "places", "id"), 365 ("source", "sources", "id"), 366 ), 367 ) 368 db["tweets"].enable_fts(["full_text"], create_triggers=True) 369 db["tweets"].add_foreign_key("retweeted_status", "tweets") 370 db["tweets"].add_foreign_key("quoted_status", "tweets") 371 if "following" not in table_names: 372 db["following"].create( 373 {"followed_id": int, "follower_id": int, "first_seen": str}, 374 pk=("followed_id", "follower_id"), 375 foreign_keys=( 376 ("followed_id", "users", "id"), 377 ("follower_id", "users", "id"), 378 ), 379 ) 380 # Ensure following has indexes 381 following_indexes = {tuple(i.columns) for i in db["following"].indexes} 382 if ("followed_id",) not in following_indexes: 383 db["following"].create_index(["followed_id"]) 384 if ("follower_id",) not in following_indexes: 385 db["following"].create_index(["follower_id"]) 386 387 # Tables for tracking --since 388 if "since_ids" not in table_names: 389 db["since_id_types"].create( 390 { 391 "id": int, 392 "name": str, 393 }, 394 pk="id", 395 ) 396 db["since_id_types"].insert_all( 397 [{"id": id, "name": name} for name, id in SINCE_ID_TYPES.items()] 398 ) 399 db["since_ids"].create( 400 {"type": int, "key": str, "since_id": int}, 401 pk=("type", "key"), 402 foreign_keys=(("type", "since_id_types", "id"),), 403 ) 404 405 # Tables for recording history of user follower counts etc 406 if "count_history" not in table_names: 407 db["count_history_types"].create( 408 { 409 "id": int, 410 "name": str, 411 }, 412 pk="id", 413 ) 414 db["count_history_types"].insert_all( 415 [{"id": id, "name": name} for name, id in COUNT_HISTORY_TYPES.items()] 416 ) 417 db["count_history"].create( 418 {"type": int, "user": int, "datetime": str, "count": int}, 419 pk=("type", "user", "datetime"), 420 foreign_keys=( 421 ("type", "count_history_types", "id"), 422 ("user", "users", "id"), 423 ), 424 ) 425 426 427def save_tweets(db, tweets, favorited_by=None): 428 ensure_tables(db) 429 for tweet in tweets: 430 transform_tweet(tweet) 431 user = tweet.pop("user") 432 transform_user(user) 433 tweet["user"] = user["id"] 434 tweet["source"] = extract_and_save_source(db, tweet["source"]) 435 if tweet.get("place"): 436 db["places"].insert(tweet["place"], pk="id", alter=True, replace=True) 437 tweet["place"] = tweet["place"]["id"] 438 # extended_entities contains media 439 extended_entities = tweet.pop("extended_entities", None) 440 # Deal with nested retweeted_status / quoted_status 441 nested = [] 442 for tweet_key in ("quoted_status", "retweeted_status"): 443 if tweet.get(tweet_key): 444 nested.append(tweet[tweet_key]) 445 tweet[tweet_key] = tweet[tweet_key]["id"] 446 if nested: 447 save_tweets(db, nested) 448 db["users"].insert(user, pk="id", alter=True, replace=True) 449 save_user_counts(db, user) 450 table = db["tweets"].insert(tweet, pk="id", alter=True, replace=True) 451 if favorited_by is not None: 452 db["favorited_by"].insert( 453 {"tweet": tweet["id"], "user": favorited_by}, 454 pk=("user", "tweet"), 455 foreign_keys=("tweet", "user"), 456 replace=True, 457 ) 458 if extended_entities and extended_entities.get("media"): 459 for media in extended_entities["media"]: 460 # TODO: Remove this line when .m2m() grows alter=True 461 db["media"].insert(media, pk="id", alter=True, replace=True) 462 table.m2m("media", media, pk="id") 463 464 465def save_users(db, users, followed_id=None, follower_id=None): 466 assert not (followed_id and follower_id) 467 ensure_tables(db) 468 for user in users: 469 transform_user(user) 470 db["users"].insert_all(users, pk="id", alter=True, replace=True) 471 for user in users: 472 save_user_counts(db, user) 473 if followed_id or follower_id: 474 first_seen = datetime.datetime.utcnow().isoformat() 475 db["following"].insert_all( 476 ( 477 { 478 "followed_id": followed_id or user["id"], 479 "follower_id": follower_id or user["id"], 480 "first_seen": first_seen, 481 } 482 for user in users 483 ), 484 ignore=True, 485 ) 486 487 488def fetch_user_batches(session, ids_or_screen_names, use_ids=False, sleep=1): 489 # Yields lists of up to 70 users (tried 100 but got this error: 490 # # {'code': 18, 'message': 'Too many terms specified in query.'} ) 491 batches = [] 492 batch = [] 493 for id in ids_or_screen_names: 494 batch.append(id) 495 if len(batch) == 70: 496 batches.append(batch) 497 batch = [] 498 if batch: 499 batches.append(batch) 500 url = "https://api.twitter.com/1.1/users/lookup.json" 501 for batch in batches: 502 if use_ids: 503 args = {"user_id": ",".join(map(str, batch))} 504 else: 505 args = {"screen_name": ",".join(batch)} 506 users = session.get(url, params=args).json() 507 yield users 508 time.sleep(sleep) 509 510 511def fetch_status_batches(session, tweet_ids, sleep=1): 512 # Yields lists of up to 100 tweets 513 batches = [] 514 batch = [] 515 for id in tweet_ids: 516 batch.append(id) 517 if len(batch) == 100: 518 batches.append(batch) 519 batch = [] 520 if batch: 521 batches.append(batch) 522 url = "https://api.twitter.com/1.1/statuses/lookup.json" 523 for batch in batches: 524 args = {"id": ",".join(map(str, batch)), "tweet_mode": "extended"} 525 tweets = session.get(url, params=args).json() 526 yield tweets 527 time.sleep(sleep) 528 529 530def resolve_identifiers(db, identifiers, attach, sql): 531 if sql: 532 if attach: 533 for filepath in attach: 534 if ":" in filepath: 535 alias, filepath = filepath.split(":", 1) 536 else: 537 alias = filepath.split("/")[-1].split(".")[0] 538 attach_sql = """ 539 ATTACH DATABASE '{}' AS [{}]; 540 """.format( 541 str(pathlib.Path(filepath).resolve()), alias 542 ) 543 db.conn.execute(attach_sql) 544 sql_identifiers = [r[0] for r in db.conn.execute(sql).fetchall()] 545 else: 546 sql_identifiers = [] 547 return list(identifiers) + sql_identifiers 548 549 550def fetch_and_save_list(db, session, identifier, identifier_is_id=False): 551 show_url = "https://api.twitter.com/1.1/lists/show.json" 552 args = {} 553 if identifier_is_id: 554 args["list_id"] = identifier 555 else: 556 screen_name, slug = identifier.split("/") 557 args.update({"owner_screen_name": screen_name, "slug": slug}) 558 # First fetch the list details 559 data = session.get(show_url, params=args).json() 560 list_id = data["id"] 561 del data["id_str"] 562 user = data.pop("user") 563 save_users(db, [user]) 564 data["user"] = user["id"] 565 data["created_at"] = parser.parse(data["created_at"]) 566 db["lists"].insert(data, pk="id", foreign_keys=("user",), replace=True) 567 # Now fetch the members 568 url = "https://api.twitter.com/1.1/lists/members.json" 569 cursor = -1 570 while cursor: 571 args.update({"count": 5000, "cursor": cursor}) 572 body = session.get(url, params=args).json() 573 users = body["users"] 574 save_users(db, users) 575 db["list_members"].insert_all( 576 ({"list": list_id, "user": user["id"]} for user in users), 577 pk=("list", "user"), 578 foreign_keys=("list", "user"), 579 replace=True, 580 ) 581 cursor = body["next_cursor"] 582 if not cursor: 583 break 584 time.sleep(1) # Rate limit = 900 per 15 minutes 585 586 587def cursor_paginate(session, url, args, key, page_size=200, sleep=None): 588 "Execute cursor pagination, yelding 'key' for each page" 589 args = dict(args) 590 args["page_size"] = page_size 591 cursor = -1 592 while cursor: 593 args["cursor"] = cursor 594 r = session.get(url, params=args) 595 raise_if_error(r) 596 body = r.json() 597 yield body[key] 598 cursor = body["next_cursor"] 599 if not cursor: 600 break 601 if sleep is not None: 602 time.sleep(sleep) 603 604 605class TwitterApiError(Exception): 606 def __init__(self, headers, body): 607 self.headers = headers 608 self.body = body 609 610 def __repr__(self): 611 return "{}: {}".format(self.body, self.headers) 612 613 614def raise_if_error(r): 615 if "errors" in r.json(): 616 raise TwitterApiError(r.headers, r.json()["errors"]) 617 618 619def stream_filter(session, track=None, follow=None, locations=None, language=None): 620 session.stream = True 621 args = {"tweet_mode": "extended"} 622 for key, value in ( 623 ("track", track), 624 ("follow", follow), 625 ("locations", locations), 626 ("language", language), 627 ): 628 if value is None: 629 continue 630 if not isinstance(value, str): 631 value = ",".join(map(str, value)) 632 args[key] = value 633 while True: 634 response = session.post( 635 "https://stream.twitter.com/1.1/statuses/filter.json", params=args 636 ) 637 for line in response.iter_lines(chunk_size=10000): 638 if line.strip().startswith(b"{"): 639 tweet = json.loads(line) 640 # Only yield tweet if it has an 'id' and 'created_at' 641 # - otherwise it's probably a maintenance message, see 642 # https://developer.twitter.com/en/docs/tweets/filter-realtime/overview/statuses-filter 643 if "id" in tweet and "created_at" in tweet: 644 # 'Fix' weird tweets from streaming API 645 fix_streaming_tweet(tweet) 646 yield tweet 647 else: 648 print(tweet) 649 time.sleep(1) 650 651 652def fix_streaming_tweet(tweet): 653 if "extended_tweet" in tweet: 654 tweet.update(tweet.pop("extended_tweet")) 655 if "full_text" not in tweet: 656 tweet["full_text"] = tweet["text"] 657 if "retweeted_status" in tweet: 658 fix_streaming_tweet(tweet["retweeted_status"]) 659 if "quoted_status" in tweet: 660 fix_streaming_tweet(tweet["quoted_status"]) 661 662 663def user_ids_for_screen_names(db, screen_names): 664 sql = "select id from users where lower(screen_name) in ({})".format( 665 ", ".join(["?"] * len(screen_names)) 666 ) 667 return [ 668 r[0] for r in db.conn.execute(sql, [s.lower() for s in screen_names]).fetchall() 669 ] 670 671 672def read_archive_js(filepath): 673 "Open zip file, return (filename, content) for all .js" 674 zf = zipfile.ZipFile(filepath) 675 for zi in zf.filelist: 676 # Ignore files in a assets dir -- these are for Twitter's archive 677 # browser thingie -- and only use final filenames since some archives 678 # appear to put data in a data/ subdir, which can screw up the filename 679 # -> importer mapping. 680 if zi.filename.endswith(".js") and not zi.filename.startswith("assets/"): 681 yield pathlib.Path(zi.filename).name, zf.open(zi.filename).read() 682 683 684def extract_and_save_source(db, source): 685 if not source: 686 return None 687 m = source_re.match(source) 688 details = m.groupdict() 689 return db["sources"].insert(details, hash_id="id", replace=True).last_pk 690 691 692def save_user_counts(db, user): 693 for type_name, type_id in COUNT_HISTORY_TYPES.items(): 694 previous_count = None 695 try: 696 previous_count = db.conn.execute( 697 """ 698 select count from count_history 699 where type = ? and user = ? 700 order by datetime desc limit 1 701 """, 702 [type_id, user["id"]], 703 ).fetchall()[0][0] 704 except IndexError: 705 pass 706 current_count = user["{}_count".format(type_name)] 707 if current_count != previous_count: 708 db["count_history"].insert( 709 { 710 "type": type_id, 711 "user": user["id"], 712 "datetime": datetime.datetime.utcnow().isoformat().split(".")[0] 713 + "+00:00", 714 "count": current_count, 715 }, 716 replace=True, 717 )