aboutsummaryrefslogtreecommitdiff
path: root/utils/twitter_import.py
blob: 76170bbfc4b4d57b61a805bf8477af978b259f12 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import os
import re
import sys
import json
from config import conf
from datetime import datetime
from urllib.parse import urlparse

TCORE = re.compile(r'https://t.co/([0-9A-Za-z]{10})')

def procline(x):
    r = x.strip()
    if r.startswith('#') or r.startswith('['):
        r = chr(0x200b) + r # extremely hacky
    return r

def htmlunescape(s):
    return s.replace("&lt;", "<")\
            .replace("&gt;", ">")\
            .replace("&amp;", "&")

class Tweet:
    def __init__(self, tid, text, date, tags, media, urls, replying_to):
        self.tid = tid
        self.text = text
        self.media = media
        self.date = date
        self.tags = tags
        self.urls = urls
        self.replying_to = replying_to

    def parse(json):
        t = json["tweet"]
        text = htmlunescape(t["full_text"])
        #tcolinks = TCORE.findall(text)
        e = t["entities"]
        ee = t["extended_entities"] if "extended_entities" in t else None
        tid = t["id"]
        tags = [tag["text"] for tag in e["hashtags"]]
        tags = ["Tweet"] + tags
        is_reply = "in_reply_to_status_id_str" in t
        replying_to = None
        if is_reply:
            replying_to = t["in_reply_to_status_id_str"]
        urls = []
        media = []
        date = datetime.strptime(t["created_at"], "%a %b %d %H:%M:%S %z %Y")
        #tcoset = set()
        for u in e["urls"]:
            urls.append((u["url"], u["expanded_url"]))
            #tco = TCORE.match(u["url"]).group(1)
            #tcoset.add(tco)
        if ee is not None:
            for m in ee["media"]:
                if m["type"] == "photo":
                    media.append((m["url"], "image", m["id"], m["media_url"]))
                elif m["type"] == "video":
                    media.append((m["url"], "video", m["id"], m["media_url"]))
                else: continue
                #tco = TCORE.match(m["url"]).group(1)
                #tcoset.add(tco)
        return Tweet(tid, text, date, tags, media, urls, replying_to)

    def dump(self):
        print(f"{self.tid}@{self.date} #{self.tags} -> {self.replying_to}")
        print(self.text)
        print(self.media)
        print(self.urls)

    def to_plaintext_post(self, id2date, twdata):
        if self.replying_to is not None and self.replying_to not in id2date:
            return
        ts = int(self.date.timestamp())
        media_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "media_orig"), str(self.date.year))
        post_dir = os.path.join(os.path.join(conf.LOCAL_DATA_ROOT, "posts"), str(self.date.year))
        post_fn = f"{ts}-{self.date.isoformat()[:-6]}Z.post"
        os.makedirs(post_dir, 0o755, True)
        text = self.text
        for u, ru in self.urls:
            text = text.replace(u, ru)
        text = TCORE.sub("", text)
        text = "  \n".join([procline(t) for t in text.split('\n')])
        if self.replying_to is not None:
            text = f"[↪ Last Tweet](?post={int(id2date[self.replying_to].timestamp())})  \n{text}"
        has_video = any([m[1] == 'video' for m in self.media])
        if has_video:
            text += "\n\n(Videos aren't supported in Notekins yet.)"
        if len(self.tags) > 0:
            text += '\n\n' + ' '.join([f"#{t}" for t in self.tags])
        if len(self.media) > 0:
            text += '\n\n'
            os.makedirs(media_dir, 0o755, True)
        for _, t, id, u in self.media:
            if t != "image":
                continue
            url = urlparse(u)
            bn = os.path.basename(url.path)
            text += f"[{bn}]\n"
            os.link(os.path.join(os.path.join(twdata, "tweets_media"), f"{self.tid}-{bn}"),
                    os.path.join(media_dir, f"{ts}-{bn}"))
        with open(os.path.join(post_dir, post_fn), "w") as f:
            f.write(text)

def main():
    o = None
    if len(sys.argv) < 2:
        print("specify the data directory of your twitter archive on the command line pls")
        exit(1)
    twdata = sys.argv[1]
    with open(os.path.join(twdata, "tweets.js")) as f:
        s = f.read()
        o = json.loads(s[s.index('=') + 1:].strip())
    tweets = []
    for t in o:
        tweets.append(Tweet.parse(t))
    tweets = sorted(tweets, key=lambda x: x.date)
    id2date = dict()
    for t in tweets:
        id2date[t.tid] = t.date
    conf.require()
    for t in tweets:
        t.to_plaintext_post(id2date, twdata)

if __name__ == "__main__":
    main()