this repo has no description
1import click
2import datetime
3import html
4import json
5import pathlib
6import re
7import time
8import urllib.parse
9import zipfile
10
11from dateutil import parser
12from requests_oauthlib import OAuth1Session
13import sqlite_utils
14
15# Twitter API error codes
16RATE_LIMIT_ERROR_CODE = 88
17
18SINCE_ID_TYPES = {
19 "user": 1,
20 "home": 2,
21 "mentions": 3,
22 "search": 4,
23}
24COUNT_HISTORY_TYPES = {
25 "followers": 1,
26 "friends": 2,
27 "listed": 3,
28 # Don't track these - they're uninteresting and really noisy in terms
29 # of writing new rows to the count_history table:
30 # "favourites": 4,
31 # "statuses": 5,
32}
33
34source_re = re.compile('<a href="(?P<url>.*?)".*?>(?P<name>.*?)</a>')
35
36
37def open_database(db_path):
38 db = sqlite_utils.Database(db_path)
39 # Only run migrations if this is an existing DB (has tables)
40 if db.tables:
41 migrate(db)
42 return db
43
44
45def migrate(db):
46 from twitter_to_sqlite.migrations import MIGRATIONS
47
48 if "migrations" not in db.table_names():
49 db["migrations"].create({"name": str, "applied": str}, pk="name")
50 applied_migrations = {
51 m[0] for m in db.conn.execute("select name from migrations").fetchall()
52 }
53 for migration in MIGRATIONS:
54 name = migration.__name__
55 if name in applied_migrations:
56 continue
57 migration(db)
58 db["migrations"].insert(
59 {"name": name, "applied": datetime.datetime.utcnow().isoformat()}
60 )
61
62
63def session_for_auth(auth):
64 return OAuth1Session(
65 client_key=auth["api_key"],
66 client_secret=auth["api_secret_key"],
67 resource_owner_key=auth["access_token"],
68 resource_owner_secret=auth["access_token_secret"],
69 )
70
71
72def fetch_user_list_chunks(
73 session, user_id=None, screen_name=None, sleep=61, noun="followers"
74):
75 cursor = -1
76 users = []
77 while cursor:
78 headers, body = fetch_user_list(session, cursor, user_id, screen_name, noun)
79 yield body["users"]
80 cursor = body["next_cursor"]
81 if not cursor:
82 break
83 time.sleep(sleep) # Rate limit = 15 per 15 minutes!
84
85
86def fetch_user_list(session, cursor, user_id=None, screen_name=None, noun="followers"):
87 args = user_args(user_id, screen_name)
88 args.update({"count": 200, "cursor": cursor})
89 r = session.get(
90 "https://api.twitter.com/1.1/{}/list.json?".format(noun)
91 + urllib.parse.urlencode(args)
92 )
93 return r.headers, r.json()
94
95
96def get_profile(db, session, user_id=None, screen_name=None):
97 if not (user_id or screen_name):
98 profile = session.get(
99 "https://api.twitter.com/1.1/account/verify_credentials.json"
100 ).json()
101 else:
102 args = user_args(user_id, screen_name)
103 url = "https://api.twitter.com/1.1/users/show.json"
104 if args:
105 url += "?" + urllib.parse.urlencode(args)
106 profile = session.get(url).json()
107 save_users(db, [profile])
108 return profile
109
110
111def fetch_timeline(
112 session,
113 url,
114 db,
115 args=None,
116 sleep=1,
117 stop_after=None,
118 key=None,
119 since_id=None,
120 since=False,
121 since_type=None,
122 since_key=None,
123):
124 # See https://developer.twitter.com/en/docs/tweets/timelines/guides/working-with-timelines
125 if since and since_id:
126 raise click.ClickException("Use either --since or --since_id, not both")
127
128 since_type_id = None
129 last_since_id = None
130 if since_type is not None:
131 assert since_key is not None
132 since_type_id = SINCE_ID_TYPES[since_type]
133 # Figure out the last since_id in case we need it
134 try:
135 last_since_id = db.conn.execute(
136 """
137 select since_id from since_ids
138 where type = ? and key = ?
139 """,
140 [since_type_id, since_key],
141 ).fetchall()[0][0]
142 except IndexError:
143 pass
144
145 if since:
146 # Load since_id from database
147 since_id = last_since_id
148
149 args = dict(args or {})
150 args["count"] = 200
151 if stop_after is not None:
152 args["count"] = stop_after
153 if since_id:
154 args["since_id"] = since_id
155 args["tweet_mode"] = "extended"
156 min_seen_id = None
157 num_rate_limit_errors = 0
158 while True:
159 if min_seen_id is not None:
160 args["max_id"] = min_seen_id - 1
161 response = session.get(url, params=args)
162 tweets = response.json()
163 if "errors" in tweets:
164 # Was it a rate limit error? If so sleep and try again
165 if RATE_LIMIT_ERROR_CODE == tweets["errors"][0]["code"]:
166 num_rate_limit_errors += 1
167 assert num_rate_limit_errors < 5, "More than 5 rate limit errors"
168 print(
169 "Rate limit exceeded - will sleep 15s and try again {}".format(
170 repr(response.headers)
171 )
172 )
173 time.sleep(15)
174 continue
175 else:
176 raise Exception(str(tweets["errors"]))
177 if key is not None:
178 tweets = tweets[key]
179 if not tweets:
180 break
181 for tweet in tweets:
182 yield tweet
183 min_seen_id = min(t["id"] for t in tweets)
184 max_seen_id = max(t["id"] for t in tweets)
185 if last_since_id is not None:
186 max_seen_id = max((last_since_id, max_seen_id))
187 last_since_id = max_seen_id
188 db["since_ids"].insert(
189 {"type": since_type_id, "key": since_key, "since_id": max_seen_id,},
190 replace=True,
191 )
192 if stop_after is not None:
193 break
194 time.sleep(sleep)
195
196
197def fetch_user_timeline(
198 session,
199 db,
200 user_id=None,
201 screen_name=None,
202 stop_after=None,
203 since_id=None,
204 since=False,
205):
206 args = user_args(user_id, screen_name)
207 yield from fetch_timeline(
208 session,
209 "https://api.twitter.com/1.1/statuses/user_timeline.json",
210 db,
211 args,
212 sleep=1,
213 stop_after=stop_after,
214 since_id=since_id,
215 since_type="user",
216 since_key="id:{}".format(user_id) if user_id else screen_name,
217 since=since,
218 )
219
220
221def fetch_favorites(session, db, user_id=None, screen_name=None, stop_after=None):
222 args = user_args(user_id, screen_name)
223 # Rate limit 75/15 mins = 5/minute = every 12 seconds
224 sleep = 12
225 yield from fetch_timeline(
226 session,
227 "https://api.twitter.com/1.1/favorites/list.json",
228 db,
229 args,
230 sleep=sleep,
231 stop_after=stop_after,
232 )
233
234
235def user_args(user_id, screen_name):
236 args = {}
237 if user_id:
238 args["user_id"] = user_id
239 if screen_name:
240 args["screen_name"] = screen_name
241 return args
242
243
244def expand_entities(s, entities):
245 for _, ents in entities.items():
246 for ent in ents:
247 if "url" in ent:
248 replacement = ent["expanded_url"] or ent["url"]
249 s = s.replace(ent["url"], replacement)
250 return s
251
252
253def transform_user(user):
254 user["created_at"] = parser.parse(user["created_at"])
255 if user["description"] and "description" in user.get("entities", {}):
256 user["description"] = expand_entities(
257 user["description"], user["entities"]["description"]
258 )
259 if user["url"] and "url" in user.get("entities", {}):
260 user["url"] = expand_entities(user["url"], user["entities"]["url"])
261 user.pop("entities", None)
262 user.pop("status", None)
263 to_remove = [k for k in user if k.endswith("_str")]
264 for key in to_remove:
265 del user[key]
266
267
268def transform_tweet(tweet):
269 tweet["full_text"] = html.unescape(
270 expand_entities(tweet["full_text"], tweet.pop("entities"))
271 )
272 to_remove = [k for k in tweet if k.endswith("_str")] + [
273 "quoted_status_id",
274 "quoted_status_permalink",
275 ]
276 for key in to_remove:
277 if key in tweet:
278 del tweet[key]
279 tweet["created_at"] = parser.parse(tweet["created_at"]).isoformat()
280
281
282def ensure_tables(db):
283 table_names = set(db.table_names())
284 if "places" not in table_names:
285 db["places"].create({"id": str}, pk="id")
286 if "sources" not in table_names:
287 db["sources"].create({"id": str, "name": str, "url": str}, pk="id")
288 if "users" not in table_names:
289 db["users"].create(
290 {
291 "id": int,
292 "screen_name": str,
293 "name": str,
294 "description": str,
295 "location": str,
296 },
297 pk="id",
298 )
299 db["users"].enable_fts(
300 ["name", "screen_name", "description", "location"], create_triggers=True
301 )
302 if "tweets" not in table_names:
303 db["tweets"].create(
304 {
305 "id": int,
306 "user": int,
307 "created_at": str,
308 "full_text": str,
309 "retweeted_status": int,
310 "quoted_status": int,
311 "place": str,
312 "source": str,
313 },
314 pk="id",
315 foreign_keys=(
316 ("user", "users", "id"),
317 ("place", "places", "id"),
318 ("source", "sources", "id"),
319 ),
320 )
321 db["tweets"].enable_fts(["full_text"], create_triggers=True)
322 db["tweets"].add_foreign_key("retweeted_status", "tweets")
323 db["tweets"].add_foreign_key("quoted_status", "tweets")
324 if "following" not in table_names:
325 db["following"].create(
326 {"followed_id": int, "follower_id": int, "first_seen": str},
327 pk=("followed_id", "follower_id"),
328 foreign_keys=(
329 ("followed_id", "users", "id"),
330 ("follower_id", "users", "id"),
331 ),
332 )
333 # Ensure following has indexes
334 following_indexes = {tuple(i.columns) for i in db["following"].indexes}
335 if ("followed_id",) not in following_indexes:
336 db["following"].create_index(["followed_id"])
337 if ("follower_id",) not in following_indexes:
338 db["following"].create_index(["follower_id"])
339
340 # Tables for tracking --since
341 if "since_ids" not in table_names:
342 db["since_id_types"].create({"id": int, "name": str,}, pk="id")
343 db["since_id_types"].insert_all(
344 [{"id": id, "name": name} for name, id in SINCE_ID_TYPES.items()]
345 )
346 db["since_ids"].create(
347 {"type": int, "key": str, "since_id": int},
348 pk=("type", "key"),
349 foreign_keys=(("type", "since_id_types", "id"),),
350 )
351
352 # Tables for recording history of user follower counts etc
353 if "count_history" not in table_names:
354 db["count_history_types"].create({"id": int, "name": str,}, pk="id")
355 db["count_history_types"].insert_all(
356 [{"id": id, "name": name} for name, id in COUNT_HISTORY_TYPES.items()]
357 )
358 db["count_history"].create(
359 {"type": int, "user": int, "datetime": str, "count": int},
360 pk=("type", "user", "datetime"),
361 foreign_keys=(
362 ("type", "count_history_types", "id"),
363 ("user", "users", "id"),
364 ),
365 )
366
367
368def save_tweets(db, tweets, favorited_by=None):
369 ensure_tables(db)
370 for tweet in tweets:
371 transform_tweet(tweet)
372 user = tweet.pop("user")
373 transform_user(user)
374 tweet["user"] = user["id"]
375 tweet["source"] = extract_and_save_source(db, tweet["source"])
376 if tweet.get("place"):
377 db["places"].insert(tweet["place"], pk="id", alter=True, replace=True)
378 tweet["place"] = tweet["place"]["id"]
379 # extended_entities contains media
380 extended_entities = tweet.pop("extended_entities", None)
381 # Deal with nested retweeted_status / quoted_status
382 nested = []
383 for tweet_key in ("quoted_status", "retweeted_status"):
384 if tweet.get(tweet_key):
385 nested.append(tweet[tweet_key])
386 tweet[tweet_key] = tweet[tweet_key]["id"]
387 if nested:
388 save_tweets(db, nested)
389 db["users"].insert(user, pk="id", alter=True, replace=True)
390 save_user_counts(db, user)
391 table = db["tweets"].insert(tweet, pk="id", alter=True, replace=True)
392 if favorited_by is not None:
393 db["favorited_by"].insert(
394 {"tweet": tweet["id"], "user": favorited_by},
395 pk=("user", "tweet"),
396 foreign_keys=("tweet", "user"),
397 replace=True,
398 )
399 if extended_entities and extended_entities.get("media"):
400 for media in extended_entities["media"]:
401 # TODO: Remove this line when .m2m() grows alter=True
402 db["media"].insert(media, pk="id", alter=True, replace=True)
403 table.m2m("media", media, pk="id")
404
405
406def save_users(db, users, followed_id=None, follower_id=None):
407 assert not (followed_id and follower_id)
408 ensure_tables(db)
409 for user in users:
410 transform_user(user)
411 db["users"].insert_all(users, pk="id", alter=True, replace=True)
412 for user in users:
413 save_user_counts(db, user)
414 if followed_id or follower_id:
415 first_seen = datetime.datetime.utcnow().isoformat()
416 db["following"].insert_all(
417 (
418 {
419 "followed_id": followed_id or user["id"],
420 "follower_id": follower_id or user["id"],
421 "first_seen": first_seen,
422 }
423 for user in users
424 ),
425 ignore=True,
426 )
427
428
429def fetch_user_batches(session, ids_or_screen_names, use_ids=False, sleep=1):
430 # Yields lists of up to 70 users (tried 100 but got this error:
431 # # {'code': 18, 'message': 'Too many terms specified in query.'} )
432 batches = []
433 batch = []
434 for id in ids_or_screen_names:
435 batch.append(id)
436 if len(batch) == 70:
437 batches.append(batch)
438 batch = []
439 if batch:
440 batches.append(batch)
441 url = "https://api.twitter.com/1.1/users/lookup.json"
442 for batch in batches:
443 if use_ids:
444 args = {"user_id": ",".join(map(str, batch))}
445 else:
446 args = {"screen_name": ",".join(batch)}
447 users = session.get(url, params=args).json()
448 yield users
449 time.sleep(sleep)
450
451
452def fetch_status_batches(session, tweet_ids, sleep=1):
453 # Yields lists of up to 100 tweets
454 batches = []
455 batch = []
456 for id in tweet_ids:
457 batch.append(id)
458 if len(batch) == 100:
459 batches.append(batch)
460 batch = []
461 if batch:
462 batches.append(batch)
463 url = "https://api.twitter.com/1.1/statuses/lookup.json"
464 for batch in batches:
465 args = {"id": ",".join(map(str, batch)), "tweet_mode": "extended"}
466 tweets = session.get(url, params=args).json()
467 yield tweets
468 time.sleep(sleep)
469
470
471def resolve_identifiers(db, identifiers, attach, sql):
472 if sql:
473 if attach:
474 for filepath in attach:
475 if ":" in filepath:
476 alias, filepath = filepath.split(":", 1)
477 else:
478 alias = filepath.split("/")[-1].split(".")[0]
479 attach_sql = """
480 ATTACH DATABASE '{}' AS [{}];
481 """.format(
482 str(pathlib.Path(filepath).resolve()), alias
483 )
484 db.conn.execute(attach_sql)
485 sql_identifiers = [r[0] for r in db.conn.execute(sql).fetchall()]
486 else:
487 sql_identifiers = []
488 return list(identifiers) + sql_identifiers
489
490
491def fetch_and_save_list(db, session, identifier, identifier_is_id=False):
492 show_url = "https://api.twitter.com/1.1/lists/show.json"
493 args = {}
494 if identifier_is_id:
495 args["list_id"] = identifier
496 else:
497 screen_name, slug = identifier.split("/")
498 args.update({"owner_screen_name": screen_name, "slug": slug})
499 # First fetch the list details
500 data = session.get(show_url, params=args).json()
501 list_id = data["id"]
502 del data["id_str"]
503 user = data.pop("user")
504 save_users(db, [user])
505 data["user"] = user["id"]
506 data["created_at"] = parser.parse(data["created_at"])
507 db["lists"].insert(data, pk="id", foreign_keys=("user",), replace=True)
508 # Now fetch the members
509 url = "https://api.twitter.com/1.1/lists/members.json"
510 cursor = -1
511 while cursor:
512 args.update({"count": 5000, "cursor": cursor})
513 body = session.get(url, params=args).json()
514 users = body["users"]
515 save_users(db, users)
516 db["list_members"].insert_all(
517 ({"list": list_id, "user": user["id"]} for user in users),
518 pk=("list", "user"),
519 foreign_keys=("list", "user"),
520 replace=True,
521 )
522 cursor = body["next_cursor"]
523 if not cursor:
524 break
525 time.sleep(1) # Rate limit = 900 per 15 minutes
526
527
528def cursor_paginate(session, url, args, key, page_size=200, sleep=None):
529 "Execute cursor pagination, yelding 'key' for each page"
530 args = dict(args)
531 args["page_size"] = page_size
532 cursor = -1
533 while cursor:
534 args["cursor"] = cursor
535 r = session.get(url, params=args)
536 raise_if_error(r)
537 body = r.json()
538 yield body[key]
539 cursor = body["next_cursor"]
540 if not cursor:
541 break
542 if sleep is not None:
543 time.sleep(sleep)
544
545
546class TwitterApiError(Exception):
547 def __init__(self, headers, body):
548 self.headers = headers
549 self.body = body
550
551 def __repr__(self):
552 return "{}: {}".format(self.body, self.headers)
553
554
555def raise_if_error(r):
556 if "errors" in r.json():
557 raise TwitterApiError(r.headers, r.json()["errors"])
558
559
560def stream_filter(session, track=None, follow=None, locations=None, language=None):
561 session.stream = True
562 args = {"tweet_mode": "extended"}
563 for key, value in (
564 ("track", track),
565 ("follow", follow),
566 ("locations", locations),
567 ("language", language),
568 ):
569 if value is None:
570 continue
571 if not isinstance(value, str):
572 value = ",".join(map(str, value))
573 args[key] = value
574 while True:
575 response = session.post(
576 "https://stream.twitter.com/1.1/statuses/filter.json", params=args
577 )
578 for line in response.iter_lines(chunk_size=10000):
579 if line.strip().startswith(b"{"):
580 tweet = json.loads(line)
581 # Only yield tweet if it has an 'id' and 'created_at'
582 # - otherwise it's probably a maintenance message, see
583 # https://developer.twitter.com/en/docs/tweets/filter-realtime/overview/statuses-filter
584 if "id" in tweet and "created_at" in tweet:
585 # 'Fix' weird tweets from streaming API
586 fix_streaming_tweet(tweet)
587 yield tweet
588 else:
589 print(tweet)
590 time.sleep(1)
591
592
593def fix_streaming_tweet(tweet):
594 if "extended_tweet" in tweet:
595 tweet.update(tweet.pop("extended_tweet"))
596 if "full_text" not in tweet:
597 tweet["full_text"] = tweet["text"]
598 if "retweeted_status" in tweet:
599 fix_streaming_tweet(tweet["retweeted_status"])
600 if "quoted_status" in tweet:
601 fix_streaming_tweet(tweet["quoted_status"])
602
603
604def user_ids_for_screen_names(db, screen_names):
605 sql = "select id from users where lower(screen_name) in ({})".format(
606 ", ".join(["?"] * len(screen_names))
607 )
608 return [
609 r[0] for r in db.conn.execute(sql, [s.lower() for s in screen_names]).fetchall()
610 ]
611
612
613def read_archive_js(filepath):
614 "Open zip file, return (filename, content) for all .js"
615 zf = zipfile.ZipFile(filepath)
616 for zi in zf.filelist:
617 if zi.filename.endswith(".js"):
618 yield zi.filename, zf.open(zi.filename).read()
619
620
621def extract_and_save_source(db, source):
622 m = source_re.match(source)
623 details = m.groupdict()
624 return db["sources"].insert(details, hash_id="id", replace=True).last_pk
625
626
627def save_user_counts(db, user):
628 for type_name, type_id in COUNT_HISTORY_TYPES.items():
629 previous_count = None
630 try:
631 previous_count = db.conn.execute(
632 """
633 select count from count_history
634 where type = ? and user = ?
635 order by datetime desc limit 1
636 """,
637 [type_id, user["id"]],
638 ).fetchall()[0][0]
639 except IndexError:
640 pass
641 current_count = user["{}_count".format(type_name)]
642 if current_count != previous_count:
643 db["count_history"].insert(
644 {
645 "type": type_id,
646 "user": user["id"],
647 "datetime": datetime.datetime.utcnow().isoformat().split(".")[0]
648 + "+00:00",
649 "count": current_count,
650 },
651 replace=True,
652 )