this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

1import click 2import datetime 3import html 4import json 5import pathlib 6import re 7import sqlite3 8import time 9import urllib.parse 10import zipfile 11 12from dateutil import parser 13from requests_oauthlib import OAuth1Session 14import sqlite_utils 15 16# Twitter API error codes 17RATE_LIMIT_ERROR_CODE = 88 18 19SINCE_ID_TYPES = { 20 "user": 1, 21 "home": 2, 22 "mentions": 3, 23 "search": 4, 24} 25COUNT_HISTORY_TYPES = { 26 "followers": 1, 27 "friends": 2, 28 "listed": 3, 29 # Don't track these - they're uninteresting and really noisy in terms 30 # of writing new rows to the count_history table: 31 # "favourites": 4, 32 # "statuses": 5, 33} 34 35source_re = re.compile('<a href="(?P<url>.*?)".*?>(?P<name>.*?)</a>') 36 37 38class UserDoesNotExist(click.ClickException): 39 def __init__(self, identifier): 40 super().__init__("User '{}' does not exist".format(identifier)) 41 42 43def open_database(db_path): 44 db = sqlite_utils.Database(db_path) 45 # Only run migrations if this is an existing DB (has tables) 46 if db.tables: 47 migrate(db) 48 return db 49 50 51def migrate(db): 52 from twitter_to_sqlite.migrations import MIGRATIONS 53 54 if "migrations" not in db.table_names(): 55 db["migrations"].create({"name": str, "applied": str}, pk="name") 56 applied_migrations = { 57 m[0] for m in db.conn.execute("select name from migrations").fetchall() 58 } 59 for migration in MIGRATIONS: 60 name = migration.__name__ 61 if name in applied_migrations: 62 continue 63 migration(db) 64 db["migrations"].insert( 65 {"name": name, "applied": datetime.datetime.utcnow().isoformat()} 66 ) 67 68 69def session_for_auth(auth): 70 return OAuth1Session( 71 client_key=auth["api_key"], 72 client_secret=auth["api_secret_key"], 73 resource_owner_key=auth["access_token"], 74 resource_owner_secret=auth["access_token_secret"], 75 ) 76 77 78def fetch_user_list_chunks( 79 session, user_id=None, screen_name=None, sleep=61, noun="followers" 80): 81 cursor = -1 82 users = [] 83 while cursor: 84 headers, body = fetch_user_list(session, cursor, user_id, screen_name, noun) 85 yield body["users"] 86 cursor = body["next_cursor"] 87 if not cursor: 88 break 89 time.sleep(sleep) # Rate limit = 15 per 15 minutes! 90 91 92def fetch_user_list(session, cursor, user_id=None, screen_name=None, noun="followers"): 93 args = user_args(user_id, screen_name) 94 args.update({"count": 200, "cursor": cursor}) 95 r = session.get( 96 "https://api.twitter.com/1.1/{}/list.json?".format(noun) 97 + urllib.parse.urlencode(args) 98 ) 99 return r.headers, r.json() 100 101 102def fetch_lists(db, session, user_id=None, screen_name=None): 103 lists_url = "https://api.twitter.com/1.1/lists/ownerships.json" 104 args = user_args(user_id, screen_name) 105 args["count"] = 1000 106 fetched_lists = [] 107 # For the moment we don't paginate 108 for list_row in session.get(lists_url, params=args).json()["lists"]: 109 del list_row["id_str"] 110 user = list_row.pop("user") 111 save_users(db, [user]) 112 list_row["user"] = user["id"] 113 list_row["created_at"] = parser.parse(list_row["created_at"]) 114 fetched_lists.append(list_row) 115 db["lists"].insert_all(fetched_lists, pk="id", foreign_keys=("user",), replace=True) 116 return fetched_lists 117 118 119def get_profile(db, session, user_id=None, screen_name=None): 120 if not (user_id or screen_name): 121 profile = session.get( 122 "https://api.twitter.com/1.1/account/verify_credentials.json" 123 ).json() 124 else: 125 args = user_args(user_id, screen_name) 126 url = "https://api.twitter.com/1.1/users/show.json" 127 if args: 128 url += "?" + urllib.parse.urlencode(args) 129 response = session.get(url) 130 if response.status_code == 404: 131 raise UserDoesNotExist(screen_name or user_id) 132 profile = response.json() 133 save_users(db, [profile]) 134 return profile 135 136 137def fetch_timeline( 138 session, 139 url, 140 db, 141 args=None, 142 sleep=1, 143 stop_after=None, 144 key=None, 145 since_id=None, 146 since=False, 147 since_type=None, 148 since_key=None, 149): 150 # See https://developer.twitter.com/en/docs/tweets/timelines/guides/working-with-timelines 151 if since and since_id: 152 raise click.ClickException("Use either --since or --since_id, not both") 153 154 since_type_id = None 155 last_since_id = None 156 if since_type is not None: 157 assert since_key is not None 158 since_type_id = SINCE_ID_TYPES[since_type] 159 # Figure out the last since_id in case we need it 160 try: 161 last_since_id = db.conn.execute( 162 """ 163 select since_id from since_ids 164 where type = ? and key = ? 165 """, 166 [since_type_id, since_key], 167 ).fetchall()[0][0] 168 except (IndexError, sqlite3.OperationalError): 169 pass 170 171 if since: 172 # Load since_id from database 173 since_id = last_since_id 174 175 args = dict(args or {}) 176 args["count"] = 200 177 if stop_after is not None: 178 args["count"] = stop_after 179 if since_id: 180 args["since_id"] = since_id 181 args["tweet_mode"] = "extended" 182 min_seen_id = None 183 num_rate_limit_errors = 0 184 while True: 185 if min_seen_id is not None: 186 args["max_id"] = min_seen_id - 1 187 response = session.get(url, params=args) 188 tweets = response.json() 189 if "errors" in tweets: 190 # Was it a rate limit error? If so sleep and try again 191 if RATE_LIMIT_ERROR_CODE == tweets["errors"][0]["code"]: 192 num_rate_limit_errors += 1 193 assert num_rate_limit_errors < 5, "More than 5 rate limit errors" 194 print( 195 "Rate limit exceeded - will sleep 15s and try again {}".format( 196 repr(response.headers) 197 ) 198 ) 199 time.sleep(15) 200 continue 201 else: 202 raise Exception(str(tweets["errors"])) 203 if key is not None: 204 tweets = tweets[key] 205 if not tweets: 206 break 207 for tweet in tweets: 208 yield tweet 209 min_seen_id = min(t["id"] for t in tweets) 210 max_seen_id = max(t["id"] for t in tweets) 211 if last_since_id is not None: 212 max_seen_id = max((last_since_id, max_seen_id)) 213 last_since_id = max_seen_id 214 if since_type_id is not None and since_key is not None: 215 db["since_ids"].insert( 216 { 217 "type": since_type_id, 218 "key": since_key, 219 "since_id": max_seen_id, 220 }, 221 replace=True, 222 ) 223 if stop_after is not None: 224 break 225 time.sleep(sleep) 226 227 228def fetch_user_timeline( 229 session, 230 db, 231 user_id=None, 232 screen_name=None, 233 stop_after=None, 234 since_id=None, 235 since=False, 236): 237 args = user_args(user_id, screen_name) 238 yield from fetch_timeline( 239 session, 240 "https://api.twitter.com/1.1/statuses/user_timeline.json", 241 db, 242 args, 243 sleep=1, 244 stop_after=stop_after, 245 since_id=since_id, 246 since_type="user", 247 since_key="id:{}".format(user_id) if user_id else screen_name, 248 since=since, 249 ) 250 251 252def fetch_favorites(session, db, user_id=None, screen_name=None, stop_after=None): 253 args = user_args(user_id, screen_name) 254 # Rate limit 75/15 mins = 5/minute = every 12 seconds 255 sleep = 12 256 yield from fetch_timeline( 257 session, 258 "https://api.twitter.com/1.1/favorites/list.json", 259 db, 260 args, 261 sleep=sleep, 262 stop_after=stop_after, 263 ) 264 265 266def user_args(user_id, screen_name): 267 args = {} 268 if user_id: 269 args["user_id"] = user_id 270 if screen_name: 271 args["screen_name"] = screen_name 272 return args 273 274 275def expand_entities(s, entities): 276 for _, ents in entities.items(): 277 for ent in ents: 278 if "url" in ent: 279 replacement = ent["expanded_url"] or ent["url"] 280 s = s.replace(ent["url"], replacement) 281 return s 282 283 284def transform_user(user): 285 user["created_at"] = parser.parse(user["created_at"]) 286 if user["description"] and "description" in user.get("entities", {}): 287 user["description"] = expand_entities( 288 user["description"], user["entities"]["description"] 289 ) 290 if user["url"] and "url" in user.get("entities", {}): 291 user["url"] = expand_entities(user["url"], user["entities"]["url"]) 292 user.pop("entities", None) 293 user.pop("status", None) 294 to_remove = [k for k in user if k.endswith("_str")] 295 for key in to_remove: 296 del user[key] 297 298 299def transform_tweet(tweet): 300 tweet["full_text"] = html.unescape( 301 expand_entities(tweet["full_text"], tweet.pop("entities")) 302 ) 303 to_remove = [k for k in tweet if k.endswith("_str")] + [ 304 "quoted_status_id", 305 "quoted_status_permalink", 306 ] 307 for key in to_remove: 308 if key in tweet: 309 del tweet[key] 310 tweet["created_at"] = parser.parse(tweet["created_at"]).isoformat() 311 312 313def ensure_tables(db): 314 table_names = set(db.table_names()) 315 if "places" not in table_names: 316 db["places"].create({"id": str}, pk="id") 317 if "sources" not in table_names: 318 db["sources"].create({"id": str, "name": str, "url": str}, pk="id") 319 if "users" not in table_names: 320 db["users"].create( 321 { 322 "id": int, 323 "screen_name": str, 324 "name": str, 325 "description": str, 326 "location": str, 327 }, 328 pk="id", 329 ) 330 db["users"].enable_fts( 331 ["name", "screen_name", "description", "location"], create_triggers=True 332 ) 333 if "tweets" not in table_names: 334 db["tweets"].create( 335 { 336 "id": int, 337 "user": int, 338 "created_at": str, 339 "full_text": str, 340 "retweeted_status": int, 341 "quoted_status": int, 342 "place": str, 343 "source": str, 344 }, 345 pk="id", 346 foreign_keys=( 347 ("user", "users", "id"), 348 ("place", "places", "id"), 349 ("source", "sources", "id"), 350 ), 351 ) 352 db["tweets"].enable_fts(["full_text"], create_triggers=True) 353 db["tweets"].add_foreign_key("retweeted_status", "tweets") 354 db["tweets"].add_foreign_key("quoted_status", "tweets") 355 if "following" not in table_names: 356 db["following"].create( 357 {"followed_id": int, "follower_id": int, "first_seen": str}, 358 pk=("followed_id", "follower_id"), 359 foreign_keys=( 360 ("followed_id", "users", "id"), 361 ("follower_id", "users", "id"), 362 ), 363 ) 364 # Ensure following has indexes 365 following_indexes = {tuple(i.columns) for i in db["following"].indexes} 366 if ("followed_id",) not in following_indexes: 367 db["following"].create_index(["followed_id"]) 368 if ("follower_id",) not in following_indexes: 369 db["following"].create_index(["follower_id"]) 370 371 # Tables for tracking --since 372 if "since_ids" not in table_names: 373 db["since_id_types"].create( 374 { 375 "id": int, 376 "name": str, 377 }, 378 pk="id", 379 ) 380 db["since_id_types"].insert_all( 381 [{"id": id, "name": name} for name, id in SINCE_ID_TYPES.items()] 382 ) 383 db["since_ids"].create( 384 {"type": int, "key": str, "since_id": int}, 385 pk=("type", "key"), 386 foreign_keys=(("type", "since_id_types", "id"),), 387 ) 388 389 # Tables for recording history of user follower counts etc 390 if "count_history" not in table_names: 391 db["count_history_types"].create( 392 { 393 "id": int, 394 "name": str, 395 }, 396 pk="id", 397 ) 398 db["count_history_types"].insert_all( 399 [{"id": id, "name": name} for name, id in COUNT_HISTORY_TYPES.items()] 400 ) 401 db["count_history"].create( 402 {"type": int, "user": int, "datetime": str, "count": int}, 403 pk=("type", "user", "datetime"), 404 foreign_keys=( 405 ("type", "count_history_types", "id"), 406 ("user", "users", "id"), 407 ), 408 ) 409 410 411def save_tweets(db, tweets, favorited_by=None): 412 ensure_tables(db) 413 for tweet in tweets: 414 transform_tweet(tweet) 415 user = tweet.pop("user") 416 transform_user(user) 417 tweet["user"] = user["id"] 418 tweet["source"] = extract_and_save_source(db, tweet["source"]) 419 if tweet.get("place"): 420 db["places"].insert(tweet["place"], pk="id", alter=True, replace=True) 421 tweet["place"] = tweet["place"]["id"] 422 # extended_entities contains media 423 extended_entities = tweet.pop("extended_entities", None) 424 # Deal with nested retweeted_status / quoted_status 425 nested = [] 426 for tweet_key in ("quoted_status", "retweeted_status"): 427 if tweet.get(tweet_key): 428 nested.append(tweet[tweet_key]) 429 tweet[tweet_key] = tweet[tweet_key]["id"] 430 if nested: 431 save_tweets(db, nested) 432 db["users"].insert(user, pk="id", alter=True, replace=True) 433 save_user_counts(db, user) 434 table = db["tweets"].insert(tweet, pk="id", alter=True, replace=True) 435 if favorited_by is not None: 436 db["favorited_by"].insert( 437 {"tweet": tweet["id"], "user": favorited_by}, 438 pk=("user", "tweet"), 439 foreign_keys=("tweet", "user"), 440 replace=True, 441 ) 442 if extended_entities and extended_entities.get("media"): 443 for media in extended_entities["media"]: 444 # TODO: Remove this line when .m2m() grows alter=True 445 db["media"].insert(media, pk="id", alter=True, replace=True) 446 table.m2m("media", media, pk="id") 447 448 449def save_users(db, users, followed_id=None, follower_id=None): 450 assert not (followed_id and follower_id) 451 ensure_tables(db) 452 for user in users: 453 transform_user(user) 454 db["users"].insert_all(users, pk="id", alter=True, replace=True) 455 for user in users: 456 save_user_counts(db, user) 457 if followed_id or follower_id: 458 first_seen = datetime.datetime.utcnow().isoformat() 459 db["following"].insert_all( 460 ( 461 { 462 "followed_id": followed_id or user["id"], 463 "follower_id": follower_id or user["id"], 464 "first_seen": first_seen, 465 } 466 for user in users 467 ), 468 ignore=True, 469 ) 470 471 472def fetch_user_batches(session, ids_or_screen_names, use_ids=False, sleep=1): 473 # Yields lists of up to 70 users (tried 100 but got this error: 474 # # {'code': 18, 'message': 'Too many terms specified in query.'} ) 475 batches = [] 476 batch = [] 477 for id in ids_or_screen_names: 478 batch.append(id) 479 if len(batch) == 70: 480 batches.append(batch) 481 batch = [] 482 if batch: 483 batches.append(batch) 484 url = "https://api.twitter.com/1.1/users/lookup.json" 485 for batch in batches: 486 if use_ids: 487 args = {"user_id": ",".join(map(str, batch))} 488 else: 489 args = {"screen_name": ",".join(batch)} 490 users = session.get(url, params=args).json() 491 yield users 492 time.sleep(sleep) 493 494 495def fetch_status_batches(session, tweet_ids, sleep=1): 496 # Yields lists of up to 100 tweets 497 batches = [] 498 batch = [] 499 for id in tweet_ids: 500 batch.append(id) 501 if len(batch) == 100: 502 batches.append(batch) 503 batch = [] 504 if batch: 505 batches.append(batch) 506 url = "https://api.twitter.com/1.1/statuses/lookup.json" 507 for batch in batches: 508 args = {"id": ",".join(map(str, batch)), "tweet_mode": "extended"} 509 tweets = session.get(url, params=args).json() 510 yield tweets 511 time.sleep(sleep) 512 513 514def resolve_identifiers(db, identifiers, attach, sql): 515 if sql: 516 if attach: 517 for filepath in attach: 518 if ":" in filepath: 519 alias, filepath = filepath.split(":", 1) 520 else: 521 alias = filepath.split("/")[-1].split(".")[0] 522 attach_sql = """ 523 ATTACH DATABASE '{}' AS [{}]; 524 """.format( 525 str(pathlib.Path(filepath).resolve()), alias 526 ) 527 db.conn.execute(attach_sql) 528 sql_identifiers = [r[0] for r in db.conn.execute(sql).fetchall()] 529 else: 530 sql_identifiers = [] 531 return list(identifiers) + sql_identifiers 532 533 534def fetch_and_save_list(db, session, identifier, identifier_is_id=False): 535 show_url = "https://api.twitter.com/1.1/lists/show.json" 536 args = {} 537 if identifier_is_id: 538 args["list_id"] = identifier 539 else: 540 screen_name, slug = identifier.split("/") 541 args.update({"owner_screen_name": screen_name, "slug": slug}) 542 # First fetch the list details 543 data = session.get(show_url, params=args).json() 544 list_id = data["id"] 545 del data["id_str"] 546 user = data.pop("user") 547 save_users(db, [user]) 548 data["user"] = user["id"] 549 data["created_at"] = parser.parse(data["created_at"]) 550 db["lists"].insert(data, pk="id", foreign_keys=("user",), replace=True) 551 # Now fetch the members 552 url = "https://api.twitter.com/1.1/lists/members.json" 553 cursor = -1 554 while cursor: 555 args.update({"count": 5000, "cursor": cursor}) 556 body = session.get(url, params=args).json() 557 users = body["users"] 558 save_users(db, users) 559 db["list_members"].insert_all( 560 ({"list": list_id, "user": user["id"]} for user in users), 561 pk=("list", "user"), 562 foreign_keys=("list", "user"), 563 replace=True, 564 ) 565 cursor = body["next_cursor"] 566 if not cursor: 567 break 568 time.sleep(1) # Rate limit = 900 per 15 minutes 569 570 571def cursor_paginate(session, url, args, key, page_size=200, sleep=None): 572 "Execute cursor pagination, yelding 'key' for each page" 573 args = dict(args) 574 args["page_size"] = page_size 575 cursor = -1 576 while cursor: 577 args["cursor"] = cursor 578 r = session.get(url, params=args) 579 raise_if_error(r) 580 body = r.json() 581 yield body[key] 582 cursor = body["next_cursor"] 583 if not cursor: 584 break 585 if sleep is not None: 586 time.sleep(sleep) 587 588 589class TwitterApiError(Exception): 590 def __init__(self, headers, body): 591 self.headers = headers 592 self.body = body 593 594 def __repr__(self): 595 return "{}: {}".format(self.body, self.headers) 596 597 598def raise_if_error(r): 599 if "errors" in r.json(): 600 raise TwitterApiError(r.headers, r.json()["errors"]) 601 602 603def stream_filter(session, track=None, follow=None, locations=None, language=None): 604 session.stream = True 605 args = {"tweet_mode": "extended"} 606 for key, value in ( 607 ("track", track), 608 ("follow", follow), 609 ("locations", locations), 610 ("language", language), 611 ): 612 if value is None: 613 continue 614 if not isinstance(value, str): 615 value = ",".join(map(str, value)) 616 args[key] = value 617 while True: 618 response = session.post( 619 "https://stream.twitter.com/1.1/statuses/filter.json", params=args 620 ) 621 for line in response.iter_lines(chunk_size=10000): 622 if line.strip().startswith(b"{"): 623 tweet = json.loads(line) 624 # Only yield tweet if it has an 'id' and 'created_at' 625 # - otherwise it's probably a maintenance message, see 626 # https://developer.twitter.com/en/docs/tweets/filter-realtime/overview/statuses-filter 627 if "id" in tweet and "created_at" in tweet: 628 # 'Fix' weird tweets from streaming API 629 fix_streaming_tweet(tweet) 630 yield tweet 631 else: 632 print(tweet) 633 time.sleep(1) 634 635 636def fix_streaming_tweet(tweet): 637 if "extended_tweet" in tweet: 638 tweet.update(tweet.pop("extended_tweet")) 639 if "full_text" not in tweet: 640 tweet["full_text"] = tweet["text"] 641 if "retweeted_status" in tweet: 642 fix_streaming_tweet(tweet["retweeted_status"]) 643 if "quoted_status" in tweet: 644 fix_streaming_tweet(tweet["quoted_status"]) 645 646 647def user_ids_for_screen_names(db, screen_names): 648 sql = "select id from users where lower(screen_name) in ({})".format( 649 ", ".join(["?"] * len(screen_names)) 650 ) 651 return [ 652 r[0] for r in db.conn.execute(sql, [s.lower() for s in screen_names]).fetchall() 653 ] 654 655 656def read_archive_js(filepath): 657 "Open zip file, return (filename, content) for all .js" 658 zf = zipfile.ZipFile(filepath) 659 for zi in zf.filelist: 660 if zi.filename.endswith(".js"): 661 yield zi.filename, zf.open(zi.filename).read() 662 663 664def extract_and_save_source(db, source): 665 if not source: 666 return None 667 m = source_re.match(source) 668 details = m.groupdict() 669 return db["sources"].insert(details, hash_id="id", replace=True).last_pk 670 671 672def save_user_counts(db, user): 673 for type_name, type_id in COUNT_HISTORY_TYPES.items(): 674 previous_count = None 675 try: 676 previous_count = db.conn.execute( 677 """ 678 select count from count_history 679 where type = ? and user = ? 680 order by datetime desc limit 1 681 """, 682 [type_id, user["id"]], 683 ).fetchall()[0][0] 684 except IndexError: 685 pass 686 current_count = user["{}_count".format(type_name)] 687 if current_count != previous_count: 688 db["count_history"].insert( 689 { 690 "type": type_id, 691 "user": user["id"], 692 "datetime": datetime.datetime.utcnow().isoformat().split(".")[0] 693 + "+00:00", 694 "count": current_count, 695 }, 696 replace=True, 697 )