aboutsummaryrefslogblamecommitdiff
path: root/utils/twitter_import.py
blob: 76170bbfc4b4d57b61a805bf8477af978b259f12 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16















                                                     




                                  











                                                                        
                                           




                                                                         
                               




















































































                                                                                                       
import os
import re
import sys
import json
from config import conf
from datetime import datetime
from urllib.parse import urlparse

TCORE = re.compile(r'https://t.co/([0-9A-Za-z]{10})')

def procline(x):
    r = x.strip()
    if r.startswith('#') or r.startswith('['):
        r = chr(0x200b) + r # extremely hacky
    return r

def htmlunescape(s):
    return s.replace("&lt;", "<")\
            .replace("&gt;", ">")\
            .replace("&amp;", "&")

class Tweet:
    def __init__(self, tid, text, date, tags, media, urls, replying_to):
        self.tid = tid
        self.text = text
        self.media = media
        self.date = date
        self.tags = tags
        self.urls = urls
        self.replying_to = replying_to

    def parse(json):
        t = json["tweet"]
        text = htmlunescape(t["full_text"])
        #tcolinks = TCORE.findall(text)
        e = t["entities"]
        ee = t["extended_entities"] if "extended_entities" in t else None
        tid = t["id"]
        tags = [tag["text"] for tag in e["hashtags"]]
        tags = ["Tweet"] + tags
        is_reply = "in_reply_to_status_id_str" in t
        replying_to = None
        if is_reply:
            replying_to = t["in_reply_to_status_id_str"]
        urls = []
        media = []
        date = datetime.strptime(t["created_at"], "%a %b %d %H:%M:%S %z %Y")
        #tcoset = set()
        for u in e["urls"]:
            urls.append((u["url"], u["expanded_url"]))
            #tco = TCORE.match(u["url"]).group(1)
            #tcoset.add(tco)
        if ee is not None:
            for m in ee["media"]:
                if m["type"] == "photo":
                    media.append((m["url"], "image", m["id"], m["media_url"]))
                elif m["type"] == "video":
                    media.append((m["url"], "video", m["id"], m["media_url"]))
                else: continue
                #tco = TCORE.match(m["url"]).group(1)
                #tcoset.add(tco)
        return Tweet(tid, text, date, tags, media, urls, replying_to)

    def dump(self):
        print(f"{self.tid}@{self.date} #{self.tags} -> {self.replying_to}")
        print(self.text)
        print(self.media)
        print(self.urls)

    def to_plaintext_post(self, id2date, twdata):
        if self.replying_to is not None and self.replying_to not in id2date:
            return
        ts = int(self.date.timestamp())
        media_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "media_orig"), str(self.date.year))
        post_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "posts"), str(self.date.year))
        post_fn = f"{ts}-{self.date.isoformat()[:-6]}Z.post"
        os.makedirs(post_dir, 0o755, True)
        text = self.text
        for u, ru in self.urls:
            text = text.replace(u, ru)
        text = TCORE.sub("", text)
        text = "  \n".join([procline(t) for t in text.split('\n')])
        if self.replying_to is not None:
            text = f"[↪ Last Tweet](?post={int(id2date[self.replying_to].timestamp())})  \n{text}"
        has_video = any([m[1] == 'video' for m in self.media])
        if has_video:
            text += "\n\n(Videos aren't supported in Notekins yet.)"
        if len(self.tags) > 0:
            text += '\n\n' + ' '.join([f"#{t}" for t in self.tags])
        if len(self.media) > 0:
            text += '\n\n'
            os.makedirs(media_dir, 0o755, True)
        for _, t, id, u in self.media:
            if t != "image":
                continue
            url = urlparse(u)
            bn = os.path.basename(url.path)
            text += f"[{bn}]\n"
            os.link(os.path.join(os.path.join(twdata, "tweets_media"), f"{self.tid}-{bn}"),
                    os.path.join(media_dir, f"{ts}-{bn}"))
        with open(os.path.join(post_dir, post_fn), "w") as f:
            f.write(text)

def main():
    o = None
    if len(sys.argv) < 2:
        print("specify the data directory of your twitter archive on the command line pls")
        exit(1)
    twdata = sys.argv[1]
    with open(os.path.join(twdata, "tweets.js")) as f:
        s = f.read()
        o = json.loads(s[s.index('=') + 1:].strip())
    tweets = []
    for t in o:
        tweets.append(Tweet.parse(t))
    tweets = sorted(tweets, key=lambda x: x.date)
    id2date = dict()
    for t in tweets:
        id2date[t.tid] = t.date
    conf.require()
    for t in tweets:
        t.to_plaintext_post(id2date, twdata)

if __name__ == "__main__":
    main()