this repo has no description
1import click
2import datetime
3import html
4import json
5import pathlib
6import re
7import sqlite3
8import time
9import urllib.parse
10import zipfile
11
12from dateutil import parser
13from requests import Session
14from requests.auth import AuthBase
15from requests_oauthlib import OAuth1Session
16import sqlite_utils
17
18# Twitter API error codes
19RATE_LIMIT_ERROR_CODE = 88
20
21SINCE_ID_TYPES = {
22 "user": 1,
23 "home": 2,
24 "mentions": 3,
25 "search": 4,
26}
27COUNT_HISTORY_TYPES = {
28 "followers": 1,
29 "friends": 2,
30 "listed": 3,
31 # Don't track these - they're uninteresting and really noisy in terms
32 # of writing new rows to the count_history table:
33 # "favourites": 4,
34 # "statuses": 5,
35}
36
37source_re = re.compile('<a href="(?P<url>.*?)".*?>(?P<name>.*?)</a>')
38
39
40class UserDoesNotExist(click.ClickException):
41 def __init__(self, identifier):
42 super().__init__("User '{}' does not exist".format(identifier))
43
44
45def open_database(db_path):
46 db = sqlite_utils.Database(db_path)
47 # Only run migrations if this is an existing DB (has tables)
48 if db.tables:
49 migrate(db)
50 return db
51
52
53def migrate(db):
54 from twitter_to_sqlite.migrations import MIGRATIONS
55
56 if "migrations" not in db.table_names():
57 db["migrations"].create({"name": str, "applied": str}, pk="name")
58 applied_migrations = {
59 m[0] for m in db.conn.execute("select name from migrations").fetchall()
60 }
61 for migration in MIGRATIONS:
62 name = migration.__name__
63 if name in applied_migrations:
64 continue
65 migration(db)
66 db["migrations"].insert(
67 {"name": name, "applied": datetime.datetime.utcnow().isoformat()}
68 )
69
70
71class BearerTokenAuth(AuthBase):
72 def __init__(self, bearer_token):
73 self.bearer_token = bearer_token
74
75 def __call__(self, r):
76 r.headers["Authorization"] = "Bearer " + self.bearer_token
77 return r
78
79
80def session_for_auth(auth):
81 if "bearer_token" in auth:
82 session = Session()
83 session.auth = BearerTokenAuth(auth["bearer_token"])
84 return session
85 else:
86 return OAuth1Session(
87 client_key=auth["api_key"],
88 client_secret=auth["api_secret_key"],
89 resource_owner_key=auth["access_token"],
90 resource_owner_secret=auth["access_token_secret"],
91 )
92
93
94def fetch_user_list_chunks(
95 session, user_id=None, screen_name=None, sleep=61, noun="followers"
96):
97 cursor = -1
98 users = []
99 while cursor:
100 headers, body = fetch_user_list(session, cursor, user_id, screen_name, noun)
101 yield body["users"]
102 cursor = body["next_cursor"]
103 if not cursor:
104 break
105 time.sleep(sleep) # Rate limit = 15 per 15 minutes!
106
107
108def fetch_user_list(session, cursor, user_id=None, screen_name=None, noun="followers"):
109 args = user_args(user_id, screen_name)
110 args.update({"count": 200, "cursor": cursor})
111 r = session.get(
112 "https://api.twitter.com/1.1/{}/list.json?".format(noun)
113 + urllib.parse.urlencode(args)
114 )
115 return r.headers, r.json()
116
117
118def fetch_lists(db, session, user_id=None, screen_name=None):
119 lists_url = "https://api.twitter.com/1.1/lists/ownerships.json"
120 args = user_args(user_id, screen_name)
121 args["count"] = 1000
122 fetched_lists = []
123 # For the moment we don't paginate
124 for list_row in session.get(lists_url, params=args).json()["lists"]:
125 del list_row["id_str"]
126 user = list_row.pop("user")
127 save_users(db, [user])
128 list_row["user"] = user["id"]
129 list_row["created_at"] = parser.parse(list_row["created_at"])
130 fetched_lists.append(list_row)
131 db["lists"].insert_all(fetched_lists, pk="id", foreign_keys=("user",), replace=True)
132 return fetched_lists
133
134
135def get_profile(db, session, user_id=None, screen_name=None):
136 if not (user_id or screen_name):
137 profile = session.get(
138 "https://api.twitter.com/1.1/account/verify_credentials.json"
139 ).json()
140 else:
141 args = user_args(user_id, screen_name)
142 url = "https://api.twitter.com/1.1/users/show.json"
143 if args:
144 url += "?" + urllib.parse.urlencode(args)
145 response = session.get(url)
146 if response.status_code == 404:
147 raise UserDoesNotExist(screen_name or user_id)
148 profile = response.json()
149 save_users(db, [profile])
150 return profile
151
152
153def fetch_timeline(
154 session,
155 url,
156 db,
157 args=None,
158 sleep=1,
159 stop_after=None,
160 key=None,
161 since_id=None,
162 since=False,
163 since_type=None,
164 since_key=None,
165):
166 # See https://developer.twitter.com/en/docs/tweets/timelines/guides/working-with-timelines
167 if since and since_id:
168 raise click.ClickException("Use either --since or --since_id, not both")
169
170 since_type_id = None
171 last_since_id = None
172 if since_type is not None:
173 assert since_key is not None
174 since_type_id = SINCE_ID_TYPES[since_type]
175 # Figure out the last since_id in case we need it
176 try:
177 last_since_id = db.conn.execute(
178 """
179 select since_id from since_ids
180 where type = ? and key = ?
181 """,
182 [since_type_id, since_key],
183 ).fetchall()[0][0]
184 except (IndexError, sqlite3.OperationalError):
185 pass
186
187 if since:
188 # Load since_id from database
189 since_id = last_since_id
190
191 args = dict(args or {})
192 args["count"] = 200
193 if stop_after is not None:
194 args["count"] = stop_after
195 if since_id:
196 args["since_id"] = since_id
197 args["tweet_mode"] = "extended"
198 min_seen_id = None
199 num_rate_limit_errors = 0
200 while True:
201 if min_seen_id is not None:
202 args["max_id"] = min_seen_id - 1
203 response = session.get(url, params=args)
204 tweets = response.json()
205 if "errors" in tweets:
206 # Was it a rate limit error? If so sleep and try again
207 if RATE_LIMIT_ERROR_CODE == tweets["errors"][0]["code"]:
208 num_rate_limit_errors += 1
209 assert num_rate_limit_errors < 5, "More than 5 rate limit errors"
210 print(
211 "Rate limit exceeded - will sleep 15s and try again {}".format(
212 repr(response.headers)
213 )
214 )
215 time.sleep(15)
216 continue
217 else:
218 raise Exception(str(tweets["errors"]))
219 if key is not None:
220 tweets = tweets[key]
221 if not tweets:
222 break
223 for tweet in tweets:
224 yield tweet
225 min_seen_id = min(t["id"] for t in tweets)
226 max_seen_id = max(t["id"] for t in tweets)
227 if last_since_id is not None:
228 max_seen_id = max((last_since_id, max_seen_id))
229 last_since_id = max_seen_id
230 if since_type_id is not None and since_key is not None:
231 db["since_ids"].insert(
232 {
233 "type": since_type_id,
234 "key": since_key,
235 "since_id": max_seen_id,
236 },
237 replace=True,
238 )
239 if stop_after is not None:
240 break
241 time.sleep(sleep)
242
243
244def fetch_user_timeline(
245 session,
246 db,
247 user_id=None,
248 screen_name=None,
249 stop_after=None,
250 since_id=None,
251 since=False,
252):
253 args = user_args(user_id, screen_name)
254 yield from fetch_timeline(
255 session,
256 "https://api.twitter.com/1.1/statuses/user_timeline.json",
257 db,
258 args,
259 sleep=1,
260 stop_after=stop_after,
261 since_id=since_id,
262 since_type="user",
263 since_key="id:{}".format(user_id) if user_id else screen_name,
264 since=since,
265 )
266
267
268def fetch_favorites(session, db, user_id=None, screen_name=None, stop_after=None):
269 args = user_args(user_id, screen_name)
270 # Rate limit 75/15 mins = 5/minute = every 12 seconds
271 sleep = 12
272 yield from fetch_timeline(
273 session,
274 "https://api.twitter.com/1.1/favorites/list.json",
275 db,
276 args,
277 sleep=sleep,
278 stop_after=stop_after,
279 )
280
281
282def user_args(user_id, screen_name):
283 args = {}
284 if user_id:
285 args["user_id"] = user_id
286 if screen_name:
287 args["screen_name"] = screen_name
288 return args
289
290
291def expand_entities(s, entities):
292 for _, ents in entities.items():
293 for ent in ents:
294 if "url" in ent:
295 replacement = ent["expanded_url"] or ent["url"]
296 s = s.replace(ent["url"], replacement)
297 return s
298
299
300def transform_user(user):
301 user["created_at"] = parser.parse(user["created_at"])
302 if user["description"] and "description" in user.get("entities", {}):
303 user["description"] = expand_entities(
304 user["description"], user["entities"]["description"]
305 )
306 if user["url"] and "url" in user.get("entities", {}):
307 user["url"] = expand_entities(user["url"], user["entities"]["url"])
308 user.pop("entities", None)
309 user.pop("status", None)
310 to_remove = [k for k in user if k.endswith("_str")]
311 for key in to_remove:
312 del user[key]
313
314
315def transform_tweet(tweet):
316 tweet["full_text"] = html.unescape(
317 expand_entities(tweet["full_text"], tweet.pop("entities"))
318 )
319 to_remove = [k for k in tweet if k.endswith("_str")] + [
320 "quoted_status_id",
321 "quoted_status_permalink",
322 ]
323 for key in to_remove:
324 if key in tweet:
325 del tweet[key]
326 tweet["created_at"] = parser.parse(tweet["created_at"]).isoformat()
327
328
329def ensure_tables(db):
330 table_names = set(db.table_names())
331 if "places" not in table_names:
332 db["places"].create({"id": str}, pk="id")
333 if "sources" not in table_names:
334 db["sources"].create({"id": str, "name": str, "url": str}, pk="id")
335 if "users" not in table_names:
336 db["users"].create(
337 {
338 "id": int,
339 "screen_name": str,
340 "name": str,
341 "description": str,
342 "location": str,
343 },
344 pk="id",
345 )
346 db["users"].enable_fts(
347 ["name", "screen_name", "description", "location"], create_triggers=True
348 )
349 if "tweets" not in table_names:
350 db["tweets"].create(
351 {
352 "id": int,
353 "user": int,
354 "created_at": str,
355 "full_text": str,
356 "retweeted_status": int,
357 "quoted_status": int,
358 "place": str,
359 "source": str,
360 },
361 pk="id",
362 foreign_keys=(
363 ("user", "users", "id"),
364 ("place", "places", "id"),
365 ("source", "sources", "id"),
366 ),
367 )
368 db["tweets"].enable_fts(["full_text"], create_triggers=True)
369 db["tweets"].add_foreign_key("retweeted_status", "tweets")
370 db["tweets"].add_foreign_key("quoted_status", "tweets")
371 if "following" not in table_names:
372 db["following"].create(
373 {"followed_id": int, "follower_id": int, "first_seen": str},
374 pk=("followed_id", "follower_id"),
375 foreign_keys=(
376 ("followed_id", "users", "id"),
377 ("follower_id", "users", "id"),
378 ),
379 )
380 # Ensure following has indexes
381 following_indexes = {tuple(i.columns) for i in db["following"].indexes}
382 if ("followed_id",) not in following_indexes:
383 db["following"].create_index(["followed_id"])
384 if ("follower_id",) not in following_indexes:
385 db["following"].create_index(["follower_id"])
386
387 # Tables for tracking --since
388 if "since_ids" not in table_names:
389 db["since_id_types"].create(
390 {
391 "id": int,
392 "name": str,
393 },
394 pk="id",
395 )
396 db["since_id_types"].insert_all(
397 [{"id": id, "name": name} for name, id in SINCE_ID_TYPES.items()]
398 )
399 db["since_ids"].create(
400 {"type": int, "key": str, "since_id": int},
401 pk=("type", "key"),
402 foreign_keys=(("type", "since_id_types", "id"),),
403 )
404
405 # Tables for recording history of user follower counts etc
406 if "count_history" not in table_names:
407 db["count_history_types"].create(
408 {
409 "id": int,
410 "name": str,
411 },
412 pk="id",
413 )
414 db["count_history_types"].insert_all(
415 [{"id": id, "name": name} for name, id in COUNT_HISTORY_TYPES.items()]
416 )
417 db["count_history"].create(
418 {"type": int, "user": int, "datetime": str, "count": int},
419 pk=("type", "user", "datetime"),
420 foreign_keys=(
421 ("type", "count_history_types", "id"),
422 ("user", "users", "id"),
423 ),
424 )
425
426
427def save_tweets(db, tweets, favorited_by=None):
428 ensure_tables(db)
429 for tweet in tweets:
430 transform_tweet(tweet)
431 user = tweet.pop("user")
432 transform_user(user)
433 tweet["user"] = user["id"]
434 tweet["source"] = extract_and_save_source(db, tweet["source"])
435 if tweet.get("place"):
436 db["places"].insert(tweet["place"], pk="id", alter=True, replace=True)
437 tweet["place"] = tweet["place"]["id"]
438 # extended_entities contains media
439 extended_entities = tweet.pop("extended_entities", None)
440 # Deal with nested retweeted_status / quoted_status
441 nested = []
442 for tweet_key in ("quoted_status", "retweeted_status"):
443 if tweet.get(tweet_key):
444 nested.append(tweet[tweet_key])
445 tweet[tweet_key] = tweet[tweet_key]["id"]
446 if nested:
447 save_tweets(db, nested)
448 db["users"].insert(user, pk="id", alter=True, replace=True)
449 save_user_counts(db, user)
450 table = db["tweets"].insert(tweet, pk="id", alter=True, replace=True)
451 if favorited_by is not None:
452 db["favorited_by"].insert(
453 {"tweet": tweet["id"], "user": favorited_by},
454 pk=("user", "tweet"),
455 foreign_keys=("tweet", "user"),
456 replace=True,
457 )
458 if extended_entities and extended_entities.get("media"):
459 for media in extended_entities["media"]:
460 # TODO: Remove this line when .m2m() grows alter=True
461 db["media"].insert(media, pk="id", alter=True, replace=True)
462 table.m2m("media", media, pk="id")
463
464
465def save_users(db, users, followed_id=None, follower_id=None):
466 assert not (followed_id and follower_id)
467 ensure_tables(db)
468 for user in users:
469 transform_user(user)
470 db["users"].insert_all(users, pk="id", alter=True, replace=True)
471 for user in users:
472 save_user_counts(db, user)
473 if followed_id or follower_id:
474 first_seen = datetime.datetime.utcnow().isoformat()
475 db["following"].insert_all(
476 (
477 {
478 "followed_id": followed_id or user["id"],
479 "follower_id": follower_id or user["id"],
480 "first_seen": first_seen,
481 }
482 for user in users
483 ),
484 ignore=True,
485 )
486
487
488def fetch_user_batches(session, ids_or_screen_names, use_ids=False, sleep=1):
489 # Yields lists of up to 70 users (tried 100 but got this error:
490 # # {'code': 18, 'message': 'Too many terms specified in query.'} )
491 batches = []
492 batch = []
493 for id in ids_or_screen_names:
494 batch.append(id)
495 if len(batch) == 70:
496 batches.append(batch)
497 batch = []
498 if batch:
499 batches.append(batch)
500 url = "https://api.twitter.com/1.1/users/lookup.json"
501 for batch in batches:
502 if use_ids:
503 args = {"user_id": ",".join(map(str, batch))}
504 else:
505 args = {"screen_name": ",".join(batch)}
506 users = session.get(url, params=args).json()
507 yield users
508 time.sleep(sleep)
509
510
511def fetch_status_batches(session, tweet_ids, sleep=1):
512 # Yields lists of up to 100 tweets
513 batches = []
514 batch = []
515 for id in tweet_ids:
516 batch.append(id)
517 if len(batch) == 100:
518 batches.append(batch)
519 batch = []
520 if batch:
521 batches.append(batch)
522 url = "https://api.twitter.com/1.1/statuses/lookup.json"
523 for batch in batches:
524 args = {"id": ",".join(map(str, batch)), "tweet_mode": "extended"}
525 tweets = session.get(url, params=args).json()
526 yield tweets
527 time.sleep(sleep)
528
529
530def resolve_identifiers(db, identifiers, attach, sql):
531 if sql:
532 if attach:
533 for filepath in attach:
534 if ":" in filepath:
535 alias, filepath = filepath.split(":", 1)
536 else:
537 alias = filepath.split("/")[-1].split(".")[0]
538 attach_sql = """
539 ATTACH DATABASE '{}' AS [{}];
540 """.format(
541 str(pathlib.Path(filepath).resolve()), alias
542 )
543 db.conn.execute(attach_sql)
544 sql_identifiers = [r[0] for r in db.conn.execute(sql).fetchall()]
545 else:
546 sql_identifiers = []
547 return list(identifiers) + sql_identifiers
548
549
550def fetch_and_save_list(db, session, identifier, identifier_is_id=False):
551 show_url = "https://api.twitter.com/1.1/lists/show.json"
552 args = {}
553 if identifier_is_id:
554 args["list_id"] = identifier
555 else:
556 screen_name, slug = identifier.split("/")
557 args.update({"owner_screen_name": screen_name, "slug": slug})
558 # First fetch the list details
559 data = session.get(show_url, params=args).json()
560 list_id = data["id"]
561 del data["id_str"]
562 user = data.pop("user")
563 save_users(db, [user])
564 data["user"] = user["id"]
565 data["created_at"] = parser.parse(data["created_at"])
566 db["lists"].insert(data, pk="id", foreign_keys=("user",), replace=True)
567 # Now fetch the members
568 url = "https://api.twitter.com/1.1/lists/members.json"
569 cursor = -1
570 while cursor:
571 args.update({"count": 5000, "cursor": cursor})
572 body = session.get(url, params=args).json()
573 users = body["users"]
574 save_users(db, users)
575 db["list_members"].insert_all(
576 ({"list": list_id, "user": user["id"]} for user in users),
577 pk=("list", "user"),
578 foreign_keys=("list", "user"),
579 replace=True,
580 )
581 cursor = body["next_cursor"]
582 if not cursor:
583 break
584 time.sleep(1) # Rate limit = 900 per 15 minutes
585
586
587def cursor_paginate(session, url, args, key, page_size=200, sleep=None):
588 "Execute cursor pagination, yelding 'key' for each page"
589 args = dict(args)
590 args["page_size"] = page_size
591 cursor = -1
592 while cursor:
593 args["cursor"] = cursor
594 r = session.get(url, params=args)
595 raise_if_error(r)
596 body = r.json()
597 yield body[key]
598 cursor = body["next_cursor"]
599 if not cursor:
600 break
601 if sleep is not None:
602 time.sleep(sleep)
603
604
605class TwitterApiError(Exception):
606 def __init__(self, headers, body):
607 self.headers = headers
608 self.body = body
609
610 def __repr__(self):
611 return "{}: {}".format(self.body, self.headers)
612
613
614def raise_if_error(r):
615 if "errors" in r.json():
616 raise TwitterApiError(r.headers, r.json()["errors"])
617
618
619def stream_filter(session, track=None, follow=None, locations=None, language=None):
620 session.stream = True
621 args = {"tweet_mode": "extended"}
622 for key, value in (
623 ("track", track),
624 ("follow", follow),
625 ("locations", locations),
626 ("language", language),
627 ):
628 if value is None:
629 continue
630 if not isinstance(value, str):
631 value = ",".join(map(str, value))
632 args[key] = value
633 while True:
634 response = session.post(
635 "https://stream.twitter.com/1.1/statuses/filter.json", params=args
636 )
637 for line in response.iter_lines(chunk_size=10000):
638 if line.strip().startswith(b"{"):
639 tweet = json.loads(line)
640 # Only yield tweet if it has an 'id' and 'created_at'
641 # - otherwise it's probably a maintenance message, see
642 # https://developer.twitter.com/en/docs/tweets/filter-realtime/overview/statuses-filter
643 if "id" in tweet and "created_at" in tweet:
644 # 'Fix' weird tweets from streaming API
645 fix_streaming_tweet(tweet)
646 yield tweet
647 else:
648 print(tweet)
649 time.sleep(1)
650
651
652def fix_streaming_tweet(tweet):
653 if "extended_tweet" in tweet:
654 tweet.update(tweet.pop("extended_tweet"))
655 if "full_text" not in tweet:
656 tweet["full_text"] = tweet["text"]
657 if "retweeted_status" in tweet:
658 fix_streaming_tweet(tweet["retweeted_status"])
659 if "quoted_status" in tweet:
660 fix_streaming_tweet(tweet["quoted_status"])
661
662
663def user_ids_for_screen_names(db, screen_names):
664 sql = "select id from users where lower(screen_name) in ({})".format(
665 ", ".join(["?"] * len(screen_names))
666 )
667 return [
668 r[0] for r in db.conn.execute(sql, [s.lower() for s in screen_names]).fetchall()
669 ]
670
671
672def read_archive_js(filepath):
673 "Open zip file, return (filename, content) for all .js"
674 zf = zipfile.ZipFile(filepath)
675 for zi in zf.filelist:
676 # Ignore files in a assets dir -- these are for Twitter's archive
677 # browser thingie -- and only use final filenames since some archives
678 # appear to put data in a data/ subdir, which can screw up the filename
679 # -> importer mapping.
680 if zi.filename.endswith(".js") and not zi.filename.startswith("assets/"):
681 yield pathlib.Path(zi.filename).name, zf.open(zi.filename).read()
682
683
684def extract_and_save_source(db, source):
685 if not source:
686 return None
687 m = source_re.match(source)
688 details = m.groupdict()
689 return db["sources"].insert(details, hash_id="id", replace=True).last_pk
690
691
692def save_user_counts(db, user):
693 for type_name, type_id in COUNT_HISTORY_TYPES.items():
694 previous_count = None
695 try:
696 previous_count = db.conn.execute(
697 """
698 select count from count_history
699 where type = ? and user = ?
700 order by datetime desc limit 1
701 """,
702 [type_id, user["id"]],
703 ).fetchall()[0][0]
704 except IndexError:
705 pass
706 current_count = user["{}_count".format(type_name)]
707 if current_count != previous_count:
708 db["count_history"].insert(
709 {
710 "type": type_id,
711 "user": user["id"],
712 "datetime": datetime.datetime.utcnow().isoformat().split(".")[0]
713 + "+00:00",
714 "count": current_count,
715 },
716 replace=True,
717 )