# Chris Xiong 2024
# License: Expat (MIT)
'''
The anatomy of a post:
--comment <= any line that starts with a double dash will be ignored
Post text (markdown)
#tag #tag #tag <= a line that starts with # makes it a tag line.
only the first of such lines is used.
[media] <= lines that are surrounded with [] are media lines.
[media] each contain a single file name inside the brackets.
[media] drop the files in LOCAL_DATA_ROOT/media_dropoff
'''
from mistune.plugins.formatting import strikethrough, superscript, subscript
from mistune.plugins.url import url
from mistune.plugins.ruby import ruby
from mistune.plugins.spoiler import spoiler
import mistune
from wand.image import Image
import tempfile
import os
import sys
import shutil
import hashlib
import mimetypes
from datetime import datetime, timezone
from mistune_emote import emote
from monolith import MediaInstance, Post
from config import conf
def file_digest(f, n):
if sys.version_info[:2] < (3, 11):
h = hashlib.new(n)
d = f.read()
h.update(d)
return h
else:
return hashlib.file_digest(f, n)
'''
Takes an abolute path to a static image, generate a thumbnail for it if needed
Returns path to the thumbnail, relative to conf.LOCAL_DATA_ROOT
If a thumbnail isn't required, returns None
'''
def generate_thumbnail(file):
with tempfile.TemporaryDirectory() as dir:
outf = os.path.join(dir, "downsampled.webp")
dim = conf.THUMBNAIL_DIM
with Image(filename=file) as i:
if i.height <= dim and i.width <= dim and i.format.lower() != "png":
return None
if "exif:Orientation" in i.metadata:
o = int(i.metadata['exif:Orientation'][:1])
match o:
case 0: pass
case 1: pass
case 2: i.flop()
case 3: i.rotate(180)
case 4:
i.flop()
i.rotate(180)
case 5:
i.flip()
i.rotate(90)
case 6: i.rotate(90)
case 7:
i.flip()
i.rotate(270)
case 8: i.rotate(270)
s = dim / max(i.height, i.width)
i.resize(int(i.width * s), int(i.height * s), "lanczos2")
i.format = "webp"
i.save(filename=outf)
shas = None
with open(outf, "rb") as f:
d = file_digest(f, "sha256")
shas = d.hexdigest()
destdirp = os.path.join(shas[0:2], shas[2:4])
destdirp = os.path.join("media_thmb", destdirp)
destpath = os.path.join(destdirp, f"{shas}.webp")
destabsp = os.path.join(conf.LOCAL_DATA_ROOT, destpath)
os.makedirs(os.path.join(conf.LOCAL_DATA_ROOT, destdirp), 0o755, True)
if not os.path.isfile(destabsp):
shutil.move(outf, destabsp)
return destpath
def should_generate_thumbnail(file):
thumbed_types = ["image/png", "image/jpeg", "image/webp"]
return mimetypes.guess_type(file)[0] in thumbed_types
def process_body(text):
renderer = mistune.HTMLRenderer(escape=False)
md = mistune.Markdown(renderer, plugins=
[strikethrough, url, superscript, subscript, ruby, spoiler, emote])
return md(text)
'''
move file at absolute path fn to conf.LOCAL_DATA_ROOT/<dirn>/<year>/destfilename
destfilename is the return value of dfnf(ts, datetime.fromtimestamp(ts, tz=timezone.utc), fn)
returns path to the destination file relative to conf.LOCAL_DATA_ROOT
'''
def move_file(fn, ts, dirn, dfnf):
d = datetime.fromtimestamp(ts, tz=timezone.utc)
dfn = dfnf(ts, d, fn)
destydir = os.path.join(dirn, str(d.year))
destpath = os.path.join(destydir, dfn)
os.makedirs(os.path.join(conf.LOCAL_DATA_ROOT, destydir), 0o755, True)
shutil.move(fn, os.path.join(conf.LOCAL_DATA_ROOT, destpath))
return destpath
def move_post(fn, ts):
return move_file(fn, ts, "posts", lambda ts, d, fn: f"{ts}-{d.isoformat()[:-6]}Z.post")
def move_media(fn, ts):
return move_file(fn, ts, "media_orig", lambda ts, d, fn: f"{ts}-{os.path.basename(fn)}")
'''
Reads and processes a post from fn.
fn must be a path relative to conf.LOCAL_DATA_ROOT
pointing to a raw post input (e.g. posts/2024/xxxxxx-yyyy-mm-ddThh:mm:ssZ.post)
If is_updating is False, assumes all media is present in the media_dropoff folder.
Returns a Post struct for that post.
'''
def process_post(fn, is_updating):
body = ""
media_str = []
media = []
tags = []
tagline = None
fbasen = os.path.basename(fn)
ts = int(fbasen[:fbasen.index('-')])
with open(os.path.join(conf.LOCAL_DATA_ROOT, fn), "r") as f:
for l in f:
line = l.strip()
if line.startswith("--"):
continue
if line.startswith('[') and line.endswith(']'):
media_str.append(line[1 : -1])
elif line.startswith('#'):
if tagline is None:
tagline = line
elif len(media_str) == 0 and tagline is None:
body += l
rendered_body = process_body(body)
if tagline is not None:
tags = [s[1:] for s in filter(lambda t: t.startswith('#'), tagline.split(' '))]
for m in media_str:
destm = None
dropoff = os.path.join("media_dropoff", m)
dropoffa = os.path.join(conf.LOCAL_DATA_ROOT, dropoff)
e = os.path.isfile(dropoffa)
if not is_updating:
if not e:
raise FileNotFoundError(f"{dropoffa} does not exist.")
destm = move_media(dropoffa, ts)
elif e:
destm = move_media(dropoffa, ts)
if destm is None:
d = datetime.fromtimestamp(ts, tz=timezone.utc)
destm = os.path.join("media_orig", str(d.year))
destm = os.path.join(destm, f"{ts}-{os.path.basename(m)}")
if not os.path.isfile(os.path.join(conf.LOCAL_DATA_ROOT, destm)):
raise FileNotFoundError(f"Cannot find original media ({destm})")
thumbnail = None
if should_generate_thumbnail(destm):
thumbnail = generate_thumbnail(os.path.join(conf.LOCAL_DATA_ROOT, destm))
if thumbnail is None:
thumbnail = destm
media.append(MediaInstance.makeImage(thumbnail, destm))
return Post(rendered_body, ts, media, tags)