# Chris Xiong 2024
# License: Expat (MIT)
'''
I/O facilities for the Monolith file used by the backend.
Structure of the Monolith file:
Element Length in byte Notes
<Post> (varies)
<␋> 1 Treated as part of the post that preceeds it
<Post> (varies)
<␋> 1
...
Post:
Element Length in byte Notes
content (varies) utf8 string, null-terminated. HTML fragment.
date 8 seconds since unix epoch
media (varies) See below
ntags 1
tag[0] (varies) null-terminated utf8 string, excluding the hash prefix. HTML-escaped.
tag[1] .. ..
...
tag[ntags-1].. ..
Media:
Element Length in byte Notes
nmedia 1
<MediaIns.> (varies)
<MediaIns.> (varies)
...
MediaInstance:
Element Length in byte Notes
type 1 'I' = image
-----------------type == 'I'-----------------
thumbnail (varies) null-terminated utf8 string, relative path to storage url
original (varies) ..
Index file (.idx)
00 08 09 0F
--<post#0 date>-- <post#0 end byte>
--<post#1 date>-- <post#1 end byte>
...
Page index file (.pdx)
00 08 09 0F
<page#0 ends at#> <page#1 ends at#>
...
Page #0 contains the latest posts. The final page always starts
at byte #0 in the monolith.
'''
from enum import Enum
from mmap import mmap
from bisect import bisect_left
from config import conf
from datetime import datetime, timezone
MediaType = Enum("MediaType", ["IMAGE", "VIDEO"])
def consume_str(buf):
nulp = buf.find(b'\0')
rets = buf[0 : nulp].decode("utf-8")
return rets, nulp
def strbuf(s):
return s.encode("utf-8") + b'\0'
class MediaInstance:
def __init__(self, type):
self.type = type
def __str__(self):
match self.type:
case MediaType.IMAGE:
return f"Image: {self.thumbnail} {self.original}\n"
case _:
return f"Unknown media\n"
def consume(buf):
match buf[0 : 1]:
case b'I':
l = 1
b = buf[1:]
thmb, p = consume_str(b)
l += p + 1
b = b[p + 1:]
orig, p = consume_str(b)
l += p + 1
return MediaInstance.makeImage(thmb, orig), l
case _:
raise ValueError("Invalid media type")
def to_buf(self):
match self.type:
case MediaType.IMAGE:
return b'I' + strbuf(self.thumbnail) + strbuf(self.original)
case _:
raise ValueError("Unsupported media type")
def dump(self):
match self.type:
case MediaType.IMAGE:
print(f"({self.thumbnail}, {self.original})")
case _:
raise ValueError("Unsupported media type")
def makeImage(thumb, orig):
r = MediaInstance(MediaType.IMAGE)
r.thumbnail = thumb
r.original = orig
return r
class Post:
'''
.content: utf-8 string
.date: int, secs since unix epoch
.media: list of MediaInstance
.tags: list of strings
'''
def __init__(self, cont, date, media, tags):
self.content = cont
self.date = date
self.media = media
self.tags = tags
def __str__(self):
medias = "\n".join([str(m) for m in self.media])
tags = ",".join([f'"{t}"' for t in self.tags])
return f"{self.content}\n{self.date}\n{medias}\n[{tags}]"
def from_buf(buf):
content, p = consume_str(buf)
buf = buf[p + 1 :]
date = int.from_bytes(buf[: 8], "little")
buf = buf[8 :]
media = []
nmedia = int.from_bytes(buf[: 1], "little")
buf = buf[1 :]
for i in range(0, nmedia):
m, l = MediaInstance.consume(buf)
media.append(m)
buf = buf[l :]
tags = []
ntags = int.from_bytes(buf[: 1], "little")
buf = buf[1 :]
for i in range(0, ntags):
t, p = consume_str(buf)
tags.append(t)
buf = buf[p + 1:]
return Post(content, date, media, tags)
def to_buf(self):
ret = strbuf(self.content)
ret += self.date.to_bytes(8, "little")
ret += len(self.media).to_bytes(1, "little")
for m in self.media:
ret += m.to_buf()
ret += len(self.tags).to_bytes(1, "little")
for t in self.tags:
ret += strbuf(t)
return ret + b'\x0b'
def dump(self):
print('=' * 40)
print(self.content)
print(datetime.fromtimestamp(self.date, tz=timezone.utc).isoformat())
for t in self.tags:
print(f"#{t} ", end='')
print("")
for m in self.media:
m.dump()
class Monolith:
def __init__(self, fn):
self.filename = fn
self.idxfn = f"{fn}.idx"
self.pdxfn = f"{fn}.pdx"
self.postranges = []
def _append_idxf(self, t, r):
with open(self.idxfn, "ab") as f:
buf = t.to_bytes(8, "little") + \
r.to_bytes(8, "little")
if f.write(buf) != len(buf):
raise RuntimeError("write failure")
def clear(self):
with open(self.filename, "wb"): pass
with open(self.idxfn, "wb"): pass
with open(self.pdxfn, "wb"): pass
def append(self, post):
with open(self.filename, "ab") as f:
postbuf = post.to_buf()
t = post.date
l = f.tell()
w = f.write(postbuf)
if w != len(postbuf):
raise RuntimeError("write failure")
r = l + w
self.postranges.append((t, l, r))
self._append_idxf(t, r)
# self.generate_page_index()
def load_index(self):
with open(self.idxfn, "rb") as f:
last_r = 0
self.postranges = []
while True:
bs = f.read(16)
if len(bs) == 0: break
t = int.from_bytes(bs[0 : 8], "little")
l = last_r
r = int.from_bytes(bs[8 :16], "little")
self.postranges.append((t, l, r))
last_r = r
def write_index(self):
with open(self.idxfn, "wb") as f:
for (t, _, r) in self.postranges:
f.write(t.to_bytes(8, "little") + \
r.to_bytes(8, "little"))
def find_post(self, date):
p = bisect_left(self.postranges, date, key=lambda p: p[0])
if p != len(self.postranges) and self.postranges[p][0] == date:
return p
return None
def find_nearby_posts(self, date, r=2):
p = bisect_left(self.postranges, date, key=lambda p: p[0])
left = max(p - r, 0)
right = min(p + r + 1, len(self.postranges))
return [t for (t, _, _) in self.postranges[left : right]]
def get_all_dates(self):
return [t for (t, _, _) in self.postranges]
def get_post(self, date):
p = self.find_post(date)
if p is None: return None
t, l, r = self.postranges[p]
with open(self.filename, "r+b") as f:
d = mmap(f.fileno(), 0)
post = Post.from_buf(d[l : r])
return post
def replace_post(self, date, post):
p = self.find_post(date)
if p is None: return None
t, l, r = self.postranges[p]
new_post_buf = post.to_buf()
dlen = len(new_post_buf) - (r - l)
with open(self.filename, "r+b") as f:
d = mmap(f.fileno(), 0)
mlength = len(d)
oldend = r
newend = l + len(new_post_buf)
if dlen > 0:
d.resize(mlength + dlen)
if dlen != 0:
d.move(newend, oldend, mlength - oldend)
if dlen < 0:
d.resize(mlength + dlen)
d[l : newend] = new_post_buf
self.postranges[p] = (t, l, r + dlen)
for i in range(p + 1, len(self.postranges)):
t, l, r = self.postranges[i]
self.postranges[i] = (t, l + dlen, r + dlen)
self.write_index()
def generate_page_index(self):
posts_per_page = conf.POSTS_PER_PAGE
ranges = []
for ub in range(len(self.postranges), 0, -posts_per_page):
pr = ub - 1
_, _, r = self.postranges[pr]
ranges.append(r)
with open(self.pdxfn, "wb") as f:
for r in ranges:
f.write(r.to_bytes(8, "little"))