From bc55aecd7fcf9e9193431e28482bac6c52768b0d Mon Sep 17 00:00:00 2001 From: Chris Xiong Date: Sat, 27 Jul 2024 21:42:39 -0400 Subject: Twitter archive import support. Slight wording change. --- utils/config.py | 2 +- utils/twitter_import.py | 119 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 utils/twitter_import.py diff --git a/utils/config.py b/utils/config.py index 05c1f9d..43d3683 100644 --- a/utils/config.py +++ b/utils/config.py @@ -58,7 +58,7 @@ class config: return self.d[k] def require(self): if self.f is None: - print("This operation requires a configuration file, but none can be found.") + print("This operation requires a configuration file, but none was found.") exit(1) conf = config() diff --git a/utils/twitter_import.py b/utils/twitter_import.py new file mode 100644 index 0000000..90d0c7c --- /dev/null +++ b/utils/twitter_import.py @@ -0,0 +1,119 @@ +import os +import re +import sys +import json +from config import conf +from datetime import datetime +from urllib.parse import urlparse + +TCORE = re.compile(r'https://t.co/([0-9A-Za-z]{10})') + +def procline(x): + r = x.strip() + if r.startswith('#') or r.startswith('['): + r = chr(0x200b) + r # extremely hacky + return r + +class Tweet: + def __init__(self, tid, text, date, tags, media, urls, replying_to): + self.tid = tid + self.text = text + self.media = media + self.date = date + self.tags = tags + self.urls = urls + self.replying_to = replying_to + + def parse(json): + t = json["tweet"] + text = t["full_text"] + #tcolinks = TCORE.findall(text) + e = t["entities"] + ee = t["extended_entities"] if "extended_entities" in t else None + tid = t["id"] + tags = [tag["text"] for tag in e["hashtags"]] + is_reply = "in_reply_to_status_id_str" in t + replying_to = None + if is_reply: + replying_to = t["in_reply_to_status_id_str"] + urls = [] + media = [] + date = datetime.strptime(t["created_at"], "%a %b %d %H:%M:%S %z %Y") + #tcoset = set() + for u in e["urls"]: + urls.append((u["url"], u["expanded_url"])) + #tco = TCORE.match(u["url"]).group(1) + #tcoset.add(tco) + if ee is not None: + for m in ee["media"]: + if m["type"] == "photo": + media.append((m["url"], "image", m["id"], m["media_url"])) + elif m["type"] == "video": + media.append((m["url"], "video", m["id"], m["media_url"])) + else: continue + #tco = TCORE.match(m["url"]).group(1) + #tcoset.add(tco) + return Tweet(tid, text, date, tags, media, urls, replying_to) + + def dump(self): + print(f"{self.tid}@{self.date} #{self.tags} -> {self.replying_to}") + print(self.text) + print(self.media) + print(self.urls) + + def to_plaintext_post(self, id2date, twdata): + if self.replying_to is not None and self.replying_to not in id2date: + return + ts = int(self.date.timestamp()) + media_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "media_orig"), str(self.date.year)) + post_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "posts"), str(self.date.year)) + post_fn = f"{ts}-{self.date.isoformat()[:-6]}Z.post" + os.makedirs(post_dir, 0o755, True) + text = self.text + for u, ru in self.urls: + text = text.replace(u, ru) + text = TCORE.sub("", text) + text = " \n".join([procline(t) for t in text.split('\n')]) + if self.replying_to is not None: + text = f"[↪ Last Tweet](?post={int(id2date[self.replying_to].timestamp())}) \n{text}" + has_video = any([m[1] == 'video' for m in self.media]) + if has_video: + text += "\n\n(Videos aren't supported in Notekins yet.)" + if len(self.tags) > 0: + text += '\n\n' + ' '.join([f"#{t}" for t in self.tags]) + if len(self.media) > 0: + text += '\n\n' + os.makedirs(media_dir, 0o755, True) + for _, t, id, u in self.media: + if t != "image": + continue + url = urlparse(u) + bn = os.path.basename(url.path) + text += f"[{bn}]\n" + os.link(os.path.join(os.path.join(twdata, "tweets_media"), f"{self.tid}-{bn}"), + os.path.join(media_dir, f"{ts}-{bn}")) + with open(os.path.join(post_dir, post_fn), "w") as f: + f.write(text) + +def main(): + o = None + if len(sys.argv) < 2: + print("specify the data directory of your twitter archive on the command line pls") + exit(1) + twdata = sys.argv[1] + with open(os.path.join(twdata, "tweets.js")) as f: + s = f.read() + o = json.loads(s[s.index('=') + 1:].strip()) + tweets = [] + for t in o: + tweets.append(Tweet.parse(t)) + tweets = sorted(tweets, key=lambda x: x.date) + id2date = dict() + for t in tweets: + id2date[t.tid] = t.date + conf.require() + for t in tweets: + t.to_plaintext_post(id2date, twdata) + +if __name__ == "__main__": + main() -- cgit v1.2.3