import os import re import sys import json from config import conf from datetime import datetime from urllib.parse import urlparse TCORE = re.compile(r'https://t.co/([0-9A-Za-z]{10})') def procline(x): r = x.strip() if r.startswith('#') or r.startswith('['): r = chr(0x200b) + r # extremely hacky return r def htmlunescape(s): return s.replace("<", "<")\ .replace(">", ">")\ .replace("&", "&") class Tweet: def __init__(self, tid, text, date, tags, media, urls, replying_to): self.tid = tid self.text = text self.media = media self.date = date self.tags = tags self.urls = urls self.replying_to = replying_to def parse(json): t = json["tweet"] text = htmlunescape(t["full_text"]) #tcolinks = TCORE.findall(text) e = t["entities"] ee = t["extended_entities"] if "extended_entities" in t else None tid = t["id"] tags = [tag["text"] for tag in e["hashtags"]] tags = ["Tweet"] + tags is_reply = "in_reply_to_status_id_str" in t replying_to = None if is_reply: replying_to = t["in_reply_to_status_id_str"] urls = [] media = [] date = datetime.strptime(t["created_at"], "%a %b %d %H:%M:%S %z %Y") #tcoset = set() for u in e["urls"]: urls.append((u["url"], u["expanded_url"])) #tco = TCORE.match(u["url"]).group(1) #tcoset.add(tco) if ee is not None: for m in ee["media"]: if m["type"] == "photo": media.append((m["url"], "image", m["id"], m["media_url"])) elif m["type"] == "video": media.append((m["url"], "video", m["id"], m["media_url"])) else: continue #tco = TCORE.match(m["url"]).group(1) #tcoset.add(tco) return Tweet(tid, text, date, tags, media, urls, replying_to) def dump(self): print(f"{self.tid}@{self.date} #{self.tags} -> {self.replying_to}") print(self.text) print(self.media) print(self.urls) def to_plaintext_post(self, id2date, twdata): if self.replying_to is not None and self.replying_to not in id2date: return ts = int(self.date.timestamp()) media_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "media_orig"), str(self.date.year)) post_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "posts"), str(self.date.year)) post_fn = f"{ts}-{self.date.isoformat()[:-6]}Z.post" os.makedirs(post_dir, 0o755, True) text = self.text for u, ru in self.urls: text = text.replace(u, ru) text = TCORE.sub("", text) text = " \n".join([procline(t) for t in text.split('\n')]) if self.replying_to is not None: text = f"[↪ Last Tweet](?post={int(id2date[self.replying_to].timestamp())}) \n{text}" has_video = any([m[1] == 'video' for m in self.media]) if has_video: text += "\n\n(Videos aren't supported in Notekins yet.)" if len(self.tags) > 0: text += '\n\n' + ' '.join([f"#{t}" for t in self.tags]) if len(self.media) > 0: text += '\n\n' os.makedirs(media_dir, 0o755, True) for _, t, id, u in self.media: if t != "image": continue url = urlparse(u) bn = os.path.basename(url.path) text += f"[{bn}]\n" os.link(os.path.join(os.path.join(twdata, "tweets_media"), f"{self.tid}-{bn}"), os.path.join(media_dir, f"{ts}-{bn}")) with open(os.path.join(post_dir, post_fn), "w") as f: f.write(text) def main(): o = None if len(sys.argv) < 2: print("specify the data directory of your twitter archive on the command line pls") exit(1) twdata = sys.argv[1] with open(os.path.join(twdata, "tweets.js")) as f: s = f.read() o = json.loads(s[s.index('=') + 1:].strip()) tweets = [] for t in o: tweets.append(Tweet.parse(t)) tweets = sorted(tweets, key=lambda x: x.date) id2date = dict() for t in tweets: id2date[t.tid] = t.date conf.require() for t in tweets: t.to_plaintext_post(id2date, twdata) if __name__ == "__main__": main()