aboutsummaryrefslogtreecommitdiff
path: root/utils/postutil.py
blob: 1ac32c3cc14f5bfb787fb2898206f6929023999f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Chris Xiong 2024
# License: Expat (MIT)

'''
The anatomy of a post:

--comment                     <= any line that starts with a double dash will be ignored
Post text (markdown)

#tag #tag #tag                <= a line that starts with # makes it a tag line.
                                 only the first of such lines is used.

[media]                       <= lines that are surrounded with [] are media lines.
[media]                          each contain a single file name inside the brackets.
[media]                          drop the files in LOCAL_DATA_ROOT/media_dropoff
'''

from mistune.plugins.formatting import strikethrough, superscript, subscript
from mistune.plugins.url import url
from mistune.plugins.ruby import ruby
from mistune.plugins.spoiler import spoiler
import mistune

from wand.image import Image

import tempfile
import os
import sys
import shutil
import hashlib
import mimetypes
from datetime import datetime, timezone

from mistune_emote import emote
from monolith import MediaInstance, Post
from config import conf

def file_digest(f, n):
    if sys.version_info[:2] < (3, 11):
        h = hashlib.new(n)
        d = f.read()
        h.update(d)
        return h
    else:
        return hashlib.file_digest(f, n)

'''
Takes an abolute path to a static image, generate a thumbnail for it if needed
Returns path to the thumbnail, relative to conf.LOCAL_DATA_ROOT

If a thumbnail isn't required, returns None
'''
def generate_thumbnail(file):
    with tempfile.TemporaryDirectory() as dir:
        outf = os.path.join(dir, "downsampled.webp")
        dim = conf.THUMBNAIL_DIM
        with Image(filename=file) as i:
            if i.height <= dim and i.width <= dim and i.format.lower() != "png":
                return None
            if "exif:Orientation" in i.metadata:
                o = int(i.metadata['exif:Orientation'][:1])
                match o:
                    case 0: pass
                    case 1: pass
                    case 2: i.flop()
                    case 3: i.rotate(180)
                    case 4:
                        i.flop()
                        i.rotate(180)
                    case 5:
                        i.flip()
                        i.rotate(90)
                    case 6: i.rotate(90)
                    case 7:
                        i.flip()
                        i.rotate(270)
                    case 8: i.rotate(270)
            s = dim / max(i.height, i.width)
            i.resize(int(i.width * s), int(i.height * s), "lanczos2")
            i.format = "webp"
            i.save(filename=outf)
            shas = None
            with open(outf, "rb") as f:
                d = file_digest(f, "sha256")
                shas = d.hexdigest()
            destdirp = os.path.join(shas[0:2], shas[2:4])
            destdirp = os.path.join("media_thmb", destdirp)
            destpath = os.path.join(destdirp, f"{shas}.webp")
            destabsp = os.path.join(conf.LOCAL_DATA_ROOT, destpath)
            os.makedirs(os.path.join(conf.LOCAL_DATA_ROOT, destdirp), 0o755, True)
            if not os.path.isfile(destabsp):
                shutil.move(outf, destabsp)
            return destpath

def should_generate_thumbnail(file):
    thumbed_types = ["image/png", "image/jpeg", "image/webp"]
    return mimetypes.guess_type(file)[0] in thumbed_types

def process_body(text):
    renderer = mistune.HTMLRenderer(escape=False)
    md = mistune.Markdown(renderer, plugins=
                          [strikethrough, url, superscript, subscript, ruby, spoiler, emote])
    return md(text)

'''
move file at absolute path fn to conf.LOCAL_DATA_ROOT/<dirn>/<year>/destfilename
destfilename is the return value of dfnf(ts, datetime.fromtimestamp(ts, tz=timezone.utc), fn)

returns path to the destination file relative to conf.LOCAL_DATA_ROOT
'''
def move_file(fn, ts, dirn, dfnf):
    d = datetime.fromtimestamp(ts, tz=timezone.utc)
    dfn = dfnf(ts, d, fn)
    destydir = os.path.join(dirn, str(d.year))
    destpath = os.path.join(destydir, dfn)
    os.makedirs(os.path.join(conf.LOCAL_DATA_ROOT, destydir), 0o755, True)
    shutil.move(fn, os.path.join(conf.LOCAL_DATA_ROOT, destpath))
    return destpath

def move_post(fn, ts):
    return move_file(fn, ts, "posts", lambda ts, d, fn: f"{ts}-{d.isoformat()[:-6]}Z.post")

def move_media(fn, ts):
    return move_file(fn, ts, "media_orig", lambda ts, d, fn: f"{ts}-{os.path.basename(fn)}")

'''
Reads and processes a post from fn.
fn must be a path relative to conf.LOCAL_DATA_ROOT
pointing to a raw post input (e.g. posts/2024/xxxxxx-yyyy-mm-ddThh:mm:ssZ.post)
If is_updating is False, assumes all media is present in the media_dropoff folder.

Returns a Post struct for that post.
'''
def process_post(fn, is_updating):
    body = ""
    media_str = []
    media = []
    tags = []
    tagline = None
    fbasen = os.path.basename(fn)
    ts = int(fbasen[:fbasen.index('-')])
    with open(os.path.join(conf.LOCAL_DATA_ROOT, fn), "r") as f:
        for l in f:
            line = l.strip()
            if line.startswith("--"):
                continue
            if line.startswith('[') and line.endswith(']'):
                media_str.append(line[1 : -1])
            elif line.startswith('#'):
                if tagline is None:
                    tagline = line
            elif len(media_str) == 0 and tagline is None:
                body += l

    rendered_body = process_body(body)

    if tagline is not None:
        tags = [s[1:] for s in filter(lambda t: t.startswith('#'), tagline.split(' '))]

    for m in media_str:
        destm = None
        dropoff = os.path.join("media_dropoff", m)
        dropoffa = os.path.join(conf.LOCAL_DATA_ROOT, dropoff)
        e = os.path.isfile(dropoffa)
        if not is_updating:
            if not e:
                raise FileNotFoundError(f"{dropoffa} does not exist.")
            destm = move_media(dropoffa, ts)
        elif e:
            destm = move_media(dropoffa, ts)
        if destm is None:
            d = datetime.fromtimestamp(ts, tz=timezone.utc)
            destm = os.path.join("media_orig", str(d.year))
            destm = os.path.join(destm, f"{ts}-{os.path.basename(m)}")
        if not os.path.isfile(os.path.join(conf.LOCAL_DATA_ROOT, destm)):
            raise FileNotFoundError(f"Cannot find original media ({destm})")
        thumbnail = None
        if should_generate_thumbnail(destm):
            thumbnail = generate_thumbnail(os.path.join(conf.LOCAL_DATA_ROOT, destm))
        if thumbnail is None:
            thumbnail = destm
        media.append(MediaInstance.makeImage(thumbnail, destm))

    return Post(rendered_body, ts, media, tags)