aboutsummaryrefslogtreecommitdiff
path: root/utils
diff options
context:
space:
mode:
Diffstat (limited to 'utils')
-rw-r--r--utils/config.py2
-rw-r--r--utils/twitter_import.py119
2 files changed, 120 insertions, 1 deletions
diff --git a/utils/config.py b/utils/config.py
index 05c1f9d..43d3683 100644
--- a/utils/config.py
+++ b/utils/config.py
@@ -58,7 +58,7 @@ class config:
return self.d[k]
def require(self):
if self.f is None:
- print("This operation requires a configuration file, but none can be found.")
+ print("This operation requires a configuration file, but none was found.")
exit(1)
conf = config()
diff --git a/utils/twitter_import.py b/utils/twitter_import.py
new file mode 100644
index 0000000..90d0c7c
--- /dev/null
+++ b/utils/twitter_import.py
@@ -0,0 +1,119 @@
+import os
+import re
+import sys
+import json
+from config import conf
+from datetime import datetime
+from urllib.parse import urlparse
+
+TCORE = re.compile(r'https://t.co/([0-9A-Za-z]{10})')
+
+def procline(x):
+ r = x.strip()
+ if r.startswith('#') or r.startswith('['):
+ r = chr(0x200b) + r # extremely hacky
+ return r
+
+class Tweet:
+ def __init__(self, tid, text, date, tags, media, urls, replying_to):
+ self.tid = tid
+ self.text = text
+ self.media = media
+ self.date = date
+ self.tags = tags
+ self.urls = urls
+ self.replying_to = replying_to
+
+ def parse(json):
+ t = json["tweet"]
+ text = t["full_text"]
+ #tcolinks = TCORE.findall(text)
+ e = t["entities"]
+ ee = t["extended_entities"] if "extended_entities" in t else None
+ tid = t["id"]
+ tags = [tag["text"] for tag in e["hashtags"]]
+ is_reply = "in_reply_to_status_id_str" in t
+ replying_to = None
+ if is_reply:
+ replying_to = t["in_reply_to_status_id_str"]
+ urls = []
+ media = []
+ date = datetime.strptime(t["created_at"], "%a %b %d %H:%M:%S %z %Y")
+ #tcoset = set()
+ for u in e["urls"]:
+ urls.append((u["url"], u["expanded_url"]))
+ #tco = TCORE.match(u["url"]).group(1)
+ #tcoset.add(tco)
+ if ee is not None:
+ for m in ee["media"]:
+ if m["type"] == "photo":
+ media.append((m["url"], "image", m["id"], m["media_url"]))
+ elif m["type"] == "video":
+ media.append((m["url"], "video", m["id"], m["media_url"]))
+ else: continue
+ #tco = TCORE.match(m["url"]).group(1)
+ #tcoset.add(tco)
+ return Tweet(tid, text, date, tags, media, urls, replying_to)
+
+ def dump(self):
+ print(f"{self.tid}@{self.date} #{self.tags} -> {self.replying_to}")
+ print(self.text)
+ print(self.media)
+ print(self.urls)
+
+ def to_plaintext_post(self, id2date, twdata):
+ if self.replying_to is not None and self.replying_to not in id2date:
+ return
+ ts = int(self.date.timestamp())
+ media_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "media_orig"), str(self.date.year))
+ post_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "posts"), str(self.date.year))
+ post_fn = f"{ts}-{self.date.isoformat()[:-6]}Z.post"
+ os.makedirs(post_dir, 0o755, True)
+ text = self.text
+ for u, ru in self.urls:
+ text = text.replace(u, ru)
+ text = TCORE.sub("", text)
+ text = " \n".join([procline(t) for t in text.split('\n')])
+ if self.replying_to is not None:
+ text = f"[↪ Last Tweet](?post={int(id2date[self.replying_to].timestamp())}) \n{text}"
+ has_video = any([m[1] == 'video' for m in self.media])
+ if has_video:
+ text += "\n\n(Videos aren't supported in Notekins yet.)"
+ if len(self.tags) > 0:
+ text += '\n\n' + ' '.join([f"#{t}" for t in self.tags])
+ if len(self.media) > 0:
+ text += '\n\n'
+ os.makedirs(media_dir, 0o755, True)
+ for _, t, id, u in self.media:
+ if t != "image":
+ continue
+ url = urlparse(u)
+ bn = os.path.basename(url.path)
+ text += f"[{bn}]\n"
+ os.link(os.path.join(os.path.join(twdata, "tweets_media"), f"{self.tid}-{bn}"),
+ os.path.join(media_dir, f"{ts}-{bn}"))
+ with open(os.path.join(post_dir, post_fn), "w") as f:
+ f.write(text)
+
+def main():
+ o = None
+ if len(sys.argv) < 2:
+ print("specify the data directory of your twitter archive on the command line pls")
+ exit(1)
+ twdata = sys.argv[1]
+ with open(os.path.join(twdata, "tweets.js")) as f:
+ s = f.read()
+ o = json.loads(s[s.index('=') + 1:].strip())
+ tweets = []
+ for t in o:
+ tweets.append(Tweet.parse(t))
+ tweets = sorted(tweets, key=lambda x: x.date)
+ id2date = dict()
+ for t in tweets:
+ id2date[t.tid] = t.date
+ conf.require()
+ for t in tweets:
+ t.to_plaintext_post(id2date, twdata)
+
+if __name__ == "__main__":
+ main()