this repo has no description
1import click
2import datetime
3import html
4import json
5import pathlib
6import re
7import sqlite3
8import time
9import urllib.parse
10import zipfile
11
12from dateutil import parser
13from requests_oauthlib import OAuth1Session
14import sqlite_utils
15
16# Twitter API error codes
17RATE_LIMIT_ERROR_CODE = 88
18
19SINCE_ID_TYPES = {
20 "user": 1,
21 "home": 2,
22 "mentions": 3,
23 "search": 4,
24}
25COUNT_HISTORY_TYPES = {
26 "followers": 1,
27 "friends": 2,
28 "listed": 3,
29 # Don't track these - they're uninteresting and really noisy in terms
30 # of writing new rows to the count_history table:
31 # "favourites": 4,
32 # "statuses": 5,
33}
34
35source_re = re.compile('<a href="(?P<url>.*?)".*?>(?P<name>.*?)</a>')
36
37
38class UserDoesNotExist(click.ClickException):
39 def __init__(self, identifier):
40 super().__init__("User '{}' does not exist".format(identifier))
41
42
43def open_database(db_path):
44 db = sqlite_utils.Database(db_path)
45 # Only run migrations if this is an existing DB (has tables)
46 if db.tables:
47 migrate(db)
48 return db
49
50
51def migrate(db):
52 from twitter_to_sqlite.migrations import MIGRATIONS
53
54 if "migrations" not in db.table_names():
55 db["migrations"].create({"name": str, "applied": str}, pk="name")
56 applied_migrations = {
57 m[0] for m in db.conn.execute("select name from migrations").fetchall()
58 }
59 for migration in MIGRATIONS:
60 name = migration.__name__
61 if name in applied_migrations:
62 continue
63 migration(db)
64 db["migrations"].insert(
65 {"name": name, "applied": datetime.datetime.utcnow().isoformat()}
66 )
67
68
69def session_for_auth(auth):
70 return OAuth1Session(
71 client_key=auth["api_key"],
72 client_secret=auth["api_secret_key"],
73 resource_owner_key=auth["access_token"],
74 resource_owner_secret=auth["access_token_secret"],
75 )
76
77
78def fetch_user_list_chunks(
79 session, user_id=None, screen_name=None, sleep=61, noun="followers"
80):
81 cursor = -1
82 users = []
83 while cursor:
84 headers, body = fetch_user_list(session, cursor, user_id, screen_name, noun)
85 yield body["users"]
86 cursor = body["next_cursor"]
87 if not cursor:
88 break
89 time.sleep(sleep) # Rate limit = 15 per 15 minutes!
90
91
92def fetch_user_list(session, cursor, user_id=None, screen_name=None, noun="followers"):
93 args = user_args(user_id, screen_name)
94 args.update({"count": 200, "cursor": cursor})
95 r = session.get(
96 "https://api.twitter.com/1.1/{}/list.json?".format(noun)
97 + urllib.parse.urlencode(args)
98 )
99 return r.headers, r.json()
100
101
102def fetch_lists(db, session, user_id=None, screen_name=None):
103 lists_url = "https://api.twitter.com/1.1/lists/ownerships.json"
104 args = user_args(user_id, screen_name)
105 args["count"] = 1000
106 fetched_lists = []
107 # For the moment we don't paginate
108 for list_row in session.get(lists_url, params=args).json()["lists"]:
109 del list_row["id_str"]
110 user = list_row.pop("user")
111 save_users(db, [user])
112 list_row["user"] = user["id"]
113 list_row["created_at"] = parser.parse(list_row["created_at"])
114 fetched_lists.append(list_row)
115 db["lists"].insert_all(fetched_lists, pk="id", foreign_keys=("user",), replace=True)
116 return fetched_lists
117
118
119def get_profile(db, session, user_id=None, screen_name=None):
120 if not (user_id or screen_name):
121 profile = session.get(
122 "https://api.twitter.com/1.1/account/verify_credentials.json"
123 ).json()
124 else:
125 args = user_args(user_id, screen_name)
126 url = "https://api.twitter.com/1.1/users/show.json"
127 if args:
128 url += "?" + urllib.parse.urlencode(args)
129 response = session.get(url)
130 if response.status_code == 404:
131 raise UserDoesNotExist(screen_name or user_id)
132 profile = response.json()
133 save_users(db, [profile])
134 return profile
135
136
137def fetch_timeline(
138 session,
139 url,
140 db,
141 args=None,
142 sleep=1,
143 stop_after=None,
144 key=None,
145 since_id=None,
146 since=False,
147 since_type=None,
148 since_key=None,
149):
150 # See https://developer.twitter.com/en/docs/tweets/timelines/guides/working-with-timelines
151 if since and since_id:
152 raise click.ClickException("Use either --since or --since_id, not both")
153
154 since_type_id = None
155 last_since_id = None
156 if since_type is not None:
157 assert since_key is not None
158 since_type_id = SINCE_ID_TYPES[since_type]
159 # Figure out the last since_id in case we need it
160 try:
161 last_since_id = db.conn.execute(
162 """
163 select since_id from since_ids
164 where type = ? and key = ?
165 """,
166 [since_type_id, since_key],
167 ).fetchall()[0][0]
168 except (IndexError, sqlite3.OperationalError):
169 pass
170
171 if since:
172 # Load since_id from database
173 since_id = last_since_id
174
175 args = dict(args or {})
176 args["count"] = 200
177 if stop_after is not None:
178 args["count"] = stop_after
179 if since_id:
180 args["since_id"] = since_id
181 args["tweet_mode"] = "extended"
182 min_seen_id = None
183 num_rate_limit_errors = 0
184 while True:
185 if min_seen_id is not None:
186 args["max_id"] = min_seen_id - 1
187 response = session.get(url, params=args)
188 tweets = response.json()
189 if "errors" in tweets:
190 # Was it a rate limit error? If so sleep and try again
191 if RATE_LIMIT_ERROR_CODE == tweets["errors"][0]["code"]:
192 num_rate_limit_errors += 1
193 assert num_rate_limit_errors < 5, "More than 5 rate limit errors"
194 print(
195 "Rate limit exceeded - will sleep 15s and try again {}".format(
196 repr(response.headers)
197 )
198 )
199 time.sleep(15)
200 continue
201 else:
202 raise Exception(str(tweets["errors"]))
203 if key is not None:
204 tweets = tweets[key]
205 if not tweets:
206 break
207 for tweet in tweets:
208 yield tweet
209 min_seen_id = min(t["id"] for t in tweets)
210 max_seen_id = max(t["id"] for t in tweets)
211 if last_since_id is not None:
212 max_seen_id = max((last_since_id, max_seen_id))
213 last_since_id = max_seen_id
214 if since_type_id is not None and since_key is not None:
215 db["since_ids"].insert(
216 {
217 "type": since_type_id,
218 "key": since_key,
219 "since_id": max_seen_id,
220 },
221 replace=True,
222 )
223 if stop_after is not None:
224 break
225 time.sleep(sleep)
226
227
228def fetch_user_timeline(
229 session,
230 db,
231 user_id=None,
232 screen_name=None,
233 stop_after=None,
234 since_id=None,
235 since=False,
236):
237 args = user_args(user_id, screen_name)
238 yield from fetch_timeline(
239 session,
240 "https://api.twitter.com/1.1/statuses/user_timeline.json",
241 db,
242 args,
243 sleep=1,
244 stop_after=stop_after,
245 since_id=since_id,
246 since_type="user",
247 since_key="id:{}".format(user_id) if user_id else screen_name,
248 since=since,
249 )
250
251
252def fetch_favorites(session, db, user_id=None, screen_name=None, stop_after=None):
253 args = user_args(user_id, screen_name)
254 # Rate limit 75/15 mins = 5/minute = every 12 seconds
255 sleep = 12
256 yield from fetch_timeline(
257 session,
258 "https://api.twitter.com/1.1/favorites/list.json",
259 db,
260 args,
261 sleep=sleep,
262 stop_after=stop_after,
263 )
264
265
266def user_args(user_id, screen_name):
267 args = {}
268 if user_id:
269 args["user_id"] = user_id
270 if screen_name:
271 args["screen_name"] = screen_name
272 return args
273
274
275def expand_entities(s, entities):
276 for _, ents in entities.items():
277 for ent in ents:
278 if "url" in ent:
279 replacement = ent["expanded_url"] or ent["url"]
280 s = s.replace(ent["url"], replacement)
281 return s
282
283
284def transform_user(user):
285 user["created_at"] = parser.parse(user["created_at"])
286 if user["description"] and "description" in user.get("entities", {}):
287 user["description"] = expand_entities(
288 user["description"], user["entities"]["description"]
289 )
290 if user["url"] and "url" in user.get("entities", {}):
291 user["url"] = expand_entities(user["url"], user["entities"]["url"])
292 user.pop("entities", None)
293 user.pop("status", None)
294 to_remove = [k for k in user if k.endswith("_str")]
295 for key in to_remove:
296 del user[key]
297
298
299def transform_tweet(tweet):
300 tweet["full_text"] = html.unescape(
301 expand_entities(tweet["full_text"], tweet.pop("entities"))
302 )
303 to_remove = [k for k in tweet if k.endswith("_str")] + [
304 "quoted_status_id",
305 "quoted_status_permalink",
306 ]
307 for key in to_remove:
308 if key in tweet:
309 del tweet[key]
310 tweet["created_at"] = parser.parse(tweet["created_at"]).isoformat()
311
312
313def ensure_tables(db):
314 table_names = set(db.table_names())
315 if "places" not in table_names:
316 db["places"].create({"id": str}, pk="id")
317 if "sources" not in table_names:
318 db["sources"].create({"id": str, "name": str, "url": str}, pk="id")
319 if "users" not in table_names:
320 db["users"].create(
321 {
322 "id": int,
323 "screen_name": str,
324 "name": str,
325 "description": str,
326 "location": str,
327 },
328 pk="id",
329 )
330 db["users"].enable_fts(
331 ["name", "screen_name", "description", "location"], create_triggers=True
332 )
333 if "tweets" not in table_names:
334 db["tweets"].create(
335 {
336 "id": int,
337 "user": int,
338 "created_at": str,
339 "full_text": str,
340 "retweeted_status": int,
341 "quoted_status": int,
342 "place": str,
343 "source": str,
344 },
345 pk="id",
346 foreign_keys=(
347 ("user", "users", "id"),
348 ("place", "places", "id"),
349 ("source", "sources", "id"),
350 ),
351 )
352 db["tweets"].enable_fts(["full_text"], create_triggers=True)
353 db["tweets"].add_foreign_key("retweeted_status", "tweets")
354 db["tweets"].add_foreign_key("quoted_status", "tweets")
355 if "following" not in table_names:
356 db["following"].create(
357 {"followed_id": int, "follower_id": int, "first_seen": str},
358 pk=("followed_id", "follower_id"),
359 foreign_keys=(
360 ("followed_id", "users", "id"),
361 ("follower_id", "users", "id"),
362 ),
363 )
364 # Ensure following has indexes
365 following_indexes = {tuple(i.columns) for i in db["following"].indexes}
366 if ("followed_id",) not in following_indexes:
367 db["following"].create_index(["followed_id"])
368 if ("follower_id",) not in following_indexes:
369 db["following"].create_index(["follower_id"])
370
371 # Tables for tracking --since
372 if "since_ids" not in table_names:
373 db["since_id_types"].create(
374 {
375 "id": int,
376 "name": str,
377 },
378 pk="id",
379 )
380 db["since_id_types"].insert_all(
381 [{"id": id, "name": name} for name, id in SINCE_ID_TYPES.items()]
382 )
383 db["since_ids"].create(
384 {"type": int, "key": str, "since_id": int},
385 pk=("type", "key"),
386 foreign_keys=(("type", "since_id_types", "id"),),
387 )
388
389 # Tables for recording history of user follower counts etc
390 if "count_history" not in table_names:
391 db["count_history_types"].create(
392 {
393 "id": int,
394 "name": str,
395 },
396 pk="id",
397 )
398 db["count_history_types"].insert_all(
399 [{"id": id, "name": name} for name, id in COUNT_HISTORY_TYPES.items()]
400 )
401 db["count_history"].create(
402 {"type": int, "user": int, "datetime": str, "count": int},
403 pk=("type", "user", "datetime"),
404 foreign_keys=(
405 ("type", "count_history_types", "id"),
406 ("user", "users", "id"),
407 ),
408 )
409
410
411def save_tweets(db, tweets, favorited_by=None):
412 ensure_tables(db)
413 for tweet in tweets:
414 transform_tweet(tweet)
415 user = tweet.pop("user")
416 transform_user(user)
417 tweet["user"] = user["id"]
418 tweet["source"] = extract_and_save_source(db, tweet["source"])
419 if tweet.get("place"):
420 db["places"].insert(tweet["place"], pk="id", alter=True, replace=True)
421 tweet["place"] = tweet["place"]["id"]
422 # extended_entities contains media
423 extended_entities = tweet.pop("extended_entities", None)
424 # Deal with nested retweeted_status / quoted_status
425 nested = []
426 for tweet_key in ("quoted_status", "retweeted_status"):
427 if tweet.get(tweet_key):
428 nested.append(tweet[tweet_key])
429 tweet[tweet_key] = tweet[tweet_key]["id"]
430 if nested:
431 save_tweets(db, nested)
432 db["users"].insert(user, pk="id", alter=True, replace=True)
433 save_user_counts(db, user)
434 table = db["tweets"].insert(tweet, pk="id", alter=True, replace=True)
435 if favorited_by is not None:
436 db["favorited_by"].insert(
437 {"tweet": tweet["id"], "user": favorited_by},
438 pk=("user", "tweet"),
439 foreign_keys=("tweet", "user"),
440 replace=True,
441 )
442 if extended_entities and extended_entities.get("media"):
443 for media in extended_entities["media"]:
444 # TODO: Remove this line when .m2m() grows alter=True
445 db["media"].insert(media, pk="id", alter=True, replace=True)
446 table.m2m("media", media, pk="id")
447
448
449def save_users(db, users, followed_id=None, follower_id=None):
450 assert not (followed_id and follower_id)
451 ensure_tables(db)
452 for user in users:
453 transform_user(user)
454 db["users"].insert_all(users, pk="id", alter=True, replace=True)
455 for user in users:
456 save_user_counts(db, user)
457 if followed_id or follower_id:
458 first_seen = datetime.datetime.utcnow().isoformat()
459 db["following"].insert_all(
460 (
461 {
462 "followed_id": followed_id or user["id"],
463 "follower_id": follower_id or user["id"],
464 "first_seen": first_seen,
465 }
466 for user in users
467 ),
468 ignore=True,
469 )
470
471
472def fetch_user_batches(session, ids_or_screen_names, use_ids=False, sleep=1):
473 # Yields lists of up to 70 users (tried 100 but got this error:
474 # # {'code': 18, 'message': 'Too many terms specified in query.'} )
475 batches = []
476 batch = []
477 for id in ids_or_screen_names:
478 batch.append(id)
479 if len(batch) == 70:
480 batches.append(batch)
481 batch = []
482 if batch:
483 batches.append(batch)
484 url = "https://api.twitter.com/1.1/users/lookup.json"
485 for batch in batches:
486 if use_ids:
487 args = {"user_id": ",".join(map(str, batch))}
488 else:
489 args = {"screen_name": ",".join(batch)}
490 users = session.get(url, params=args).json()
491 yield users
492 time.sleep(sleep)
493
494
495def fetch_status_batches(session, tweet_ids, sleep=1):
496 # Yields lists of up to 100 tweets
497 batches = []
498 batch = []
499 for id in tweet_ids:
500 batch.append(id)
501 if len(batch) == 100:
502 batches.append(batch)
503 batch = []
504 if batch:
505 batches.append(batch)
506 url = "https://api.twitter.com/1.1/statuses/lookup.json"
507 for batch in batches:
508 args = {"id": ",".join(map(str, batch)), "tweet_mode": "extended"}
509 tweets = session.get(url, params=args).json()
510 yield tweets
511 time.sleep(sleep)
512
513
514def resolve_identifiers(db, identifiers, attach, sql):
515 if sql:
516 if attach:
517 for filepath in attach:
518 if ":" in filepath:
519 alias, filepath = filepath.split(":", 1)
520 else:
521 alias = filepath.split("/")[-1].split(".")[0]
522 attach_sql = """
523 ATTACH DATABASE '{}' AS [{}];
524 """.format(
525 str(pathlib.Path(filepath).resolve()), alias
526 )
527 db.conn.execute(attach_sql)
528 sql_identifiers = [r[0] for r in db.conn.execute(sql).fetchall()]
529 else:
530 sql_identifiers = []
531 return list(identifiers) + sql_identifiers
532
533
534def fetch_and_save_list(db, session, identifier, identifier_is_id=False):
535 show_url = "https://api.twitter.com/1.1/lists/show.json"
536 args = {}
537 if identifier_is_id:
538 args["list_id"] = identifier
539 else:
540 screen_name, slug = identifier.split("/")
541 args.update({"owner_screen_name": screen_name, "slug": slug})
542 # First fetch the list details
543 data = session.get(show_url, params=args).json()
544 list_id = data["id"]
545 del data["id_str"]
546 user = data.pop("user")
547 save_users(db, [user])
548 data["user"] = user["id"]
549 data["created_at"] = parser.parse(data["created_at"])
550 db["lists"].insert(data, pk="id", foreign_keys=("user",), replace=True)
551 # Now fetch the members
552 url = "https://api.twitter.com/1.1/lists/members.json"
553 cursor = -1
554 while cursor:
555 args.update({"count": 5000, "cursor": cursor})
556 body = session.get(url, params=args).json()
557 users = body["users"]
558 save_users(db, users)
559 db["list_members"].insert_all(
560 ({"list": list_id, "user": user["id"]} for user in users),
561 pk=("list", "user"),
562 foreign_keys=("list", "user"),
563 replace=True,
564 )
565 cursor = body["next_cursor"]
566 if not cursor:
567 break
568 time.sleep(1) # Rate limit = 900 per 15 minutes
569
570
571def cursor_paginate(session, url, args, key, page_size=200, sleep=None):
572 "Execute cursor pagination, yelding 'key' for each page"
573 args = dict(args)
574 args["page_size"] = page_size
575 cursor = -1
576 while cursor:
577 args["cursor"] = cursor
578 r = session.get(url, params=args)
579 raise_if_error(r)
580 body = r.json()
581 yield body[key]
582 cursor = body["next_cursor"]
583 if not cursor:
584 break
585 if sleep is not None:
586 time.sleep(sleep)
587
588
589class TwitterApiError(Exception):
590 def __init__(self, headers, body):
591 self.headers = headers
592 self.body = body
593
594 def __repr__(self):
595 return "{}: {}".format(self.body, self.headers)
596
597
598def raise_if_error(r):
599 if "errors" in r.json():
600 raise TwitterApiError(r.headers, r.json()["errors"])
601
602
603def stream_filter(session, track=None, follow=None, locations=None, language=None):
604 session.stream = True
605 args = {"tweet_mode": "extended"}
606 for key, value in (
607 ("track", track),
608 ("follow", follow),
609 ("locations", locations),
610 ("language", language),
611 ):
612 if value is None:
613 continue
614 if not isinstance(value, str):
615 value = ",".join(map(str, value))
616 args[key] = value
617 while True:
618 response = session.post(
619 "https://stream.twitter.com/1.1/statuses/filter.json", params=args
620 )
621 for line in response.iter_lines(chunk_size=10000):
622 if line.strip().startswith(b"{"):
623 tweet = json.loads(line)
624 # Only yield tweet if it has an 'id' and 'created_at'
625 # - otherwise it's probably a maintenance message, see
626 # https://developer.twitter.com/en/docs/tweets/filter-realtime/overview/statuses-filter
627 if "id" in tweet and "created_at" in tweet:
628 # 'Fix' weird tweets from streaming API
629 fix_streaming_tweet(tweet)
630 yield tweet
631 else:
632 print(tweet)
633 time.sleep(1)
634
635
636def fix_streaming_tweet(tweet):
637 if "extended_tweet" in tweet:
638 tweet.update(tweet.pop("extended_tweet"))
639 if "full_text" not in tweet:
640 tweet["full_text"] = tweet["text"]
641 if "retweeted_status" in tweet:
642 fix_streaming_tweet(tweet["retweeted_status"])
643 if "quoted_status" in tweet:
644 fix_streaming_tweet(tweet["quoted_status"])
645
646
647def user_ids_for_screen_names(db, screen_names):
648 sql = "select id from users where lower(screen_name) in ({})".format(
649 ", ".join(["?"] * len(screen_names))
650 )
651 return [
652 r[0] for r in db.conn.execute(sql, [s.lower() for s in screen_names]).fetchall()
653 ]
654
655
656def read_archive_js(filepath):
657 "Open zip file, return (filename, content) for all .js"
658 zf = zipfile.ZipFile(filepath)
659 for zi in zf.filelist:
660 if zi.filename.endswith(".js"):
661 yield zi.filename, zf.open(zi.filename).read()
662
663
664def extract_and_save_source(db, source):
665 if not source:
666 return None
667 m = source_re.match(source)
668 details = m.groupdict()
669 return db["sources"].insert(details, hash_id="id", replace=True).last_pk
670
671
672def save_user_counts(db, user):
673 for type_name, type_id in COUNT_HISTORY_TYPES.items():
674 previous_count = None
675 try:
676 previous_count = db.conn.execute(
677 """
678 select count from count_history
679 where type = ? and user = ?
680 order by datetime desc limit 1
681 """,
682 [type_id, user["id"]],
683 ).fetchall()[0][0]
684 except IndexError:
685 pass
686 current_count = user["{}_count".format(type_name)]
687 if current_count != previous_count:
688 db["count_history"].insert(
689 {
690 "type": type_id,
691 "user": user["id"],
692 "datetime": datetime.datetime.utcnow().isoformat().split(".")[0]
693 + "+00:00",
694 "count": current_count,
695 },
696 replace=True,
697 )