import os
import re
import sys
import json
from config import conf
from datetime import datetime
from urllib.parse import urlparse
TCORE = re.compile(r'https://t.co/([0-9A-Za-z]{10})')
def procline(x):
r = x.strip()
if r.startswith('#') or r.startswith('['):
r = chr(0x200b) + r # extremely hacky
return r
def htmlunescape(s):
return s.replace("<", "<")\
.replace(">", ">")\
.replace("&", "&")
class Tweet:
def __init__(self, tid, text, date, tags, media, urls, replying_to):
self.tid = tid
self.text = text
self.media = media
self.date = date
self.tags = tags
self.urls = urls
self.replying_to = replying_to
def parse(json):
t = json["tweet"]
text = htmlunescape(t["full_text"])
#tcolinks = TCORE.findall(text)
e = t["entities"]
ee = t["extended_entities"] if "extended_entities" in t else None
tid = t["id"]
tags = [tag["text"] for tag in e["hashtags"]]
tags = ["Tweet"] + tags
is_reply = "in_reply_to_status_id_str" in t
replying_to = None
if is_reply:
replying_to = t["in_reply_to_status_id_str"]
urls = []
media = []
date = datetime.strptime(t["created_at"], "%a %b %d %H:%M:%S %z %Y")
#tcoset = set()
for u in e["urls"]:
urls.append((u["url"], u["expanded_url"]))
#tco = TCORE.match(u["url"]).group(1)
#tcoset.add(tco)
if ee is not None:
for m in ee["media"]:
if m["type"] == "photo":
media.append((m["url"], "image", m["id"], m["media_url"]))
elif m["type"] == "video":
media.append((m["url"], "video", m["id"], m["media_url"]))
else: continue
#tco = TCORE.match(m["url"]).group(1)
#tcoset.add(tco)
return Tweet(tid, text, date, tags, media, urls, replying_to)
def dump(self):
print(f"{self.tid}@{self.date} #{self.tags} -> {self.replying_to}")
print(self.text)
print(self.media)
print(self.urls)
def to_plaintext_post(self, id2date, twdata):
if self.replying_to is not None and self.replying_to not in id2date:
return
ts = int(self.date.timestamp())
media_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "media_orig"), str(self.date.year))
post_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "posts"), str(self.date.year))
post_fn = f"{ts}-{self.date.isoformat()[:-6]}Z.post"
os.makedirs(post_dir, 0o755, True)
text = self.text
for u, ru in self.urls:
text = text.replace(u, ru)
text = TCORE.sub("", text)
text = " \n".join([procline(t) for t in text.split('\n')])
if self.replying_to is not None:
text = f"[↪ Last Tweet](?post={int(id2date[self.replying_to].timestamp())}) \n{text}"
has_video = any([m[1] == 'video' for m in self.media])
if has_video:
text += "\n\n(Videos aren't supported in Notekins yet.)"
if len(self.tags) > 0:
text += '\n\n' + ' '.join([f"#{t}" for t in self.tags])
if len(self.media) > 0:
text += '\n\n'
os.makedirs(media_dir, 0o755, True)
for _, t, id, u in self.media:
if t != "image":
continue
url = urlparse(u)
bn = os.path.basename(url.path)
text += f"[{bn}]\n"
os.link(os.path.join(os.path.join(twdata, "tweets_media"), f"{self.tid}-{bn}"),
os.path.join(media_dir, f"{ts}-{bn}"))
with open(os.path.join(post_dir, post_fn), "w") as f:
f.write(text)
def main():
o = None
if len(sys.argv) < 2:
print("specify the data directory of your twitter archive on the command line pls")
exit(1)
twdata = sys.argv[1]
with open(os.path.join(twdata, "tweets.js")) as f:
s = f.read()
o = json.loads(s[s.index('=') + 1:].strip())
tweets = []
for t in o:
tweets.append(Tweet.parse(t))
tweets = sorted(tweets, key=lambda x: x.date)
id2date = dict()
for t in tweets:
id2date[t.tid] = t.date
conf.require()
for t in tweets:
t.to_plaintext_post(id2date, twdata)
if __name__ == "__main__":
main()