# Chris Xiong 2024 # License: Expat (MIT) ''' I/O facilities for the Monolith file used by the backend. Structure of the Monolith file: Element Length in byte Notes (varies) <␋> 1 Treated as part of the post that preceeds it (varies) <␋> 1 ... Post: Element Length in byte Notes content (varies) utf8 string, null-terminated. HTML fragment. date 8 seconds since unix epoch media (varies) See below ntags 1 tag[0] (varies) null-terminated utf8 string, excluding the hash prefix. HTML-escaped. tag[1] .. .. ... tag[ntags-1].. .. Media: Element Length in byte Notes nmedia 1 (varies) (varies) ... MediaInstance: Element Length in byte Notes type 1 'I' = image -----------------type == 'I'----------------- thumbnail (varies) null-terminated utf8 string, relative path to storage url original (varies) .. Index file (.idx) 00 08 09 0F ---- ---- ... Page index file (.pdx) 00 08 09 0F ... Page #0 contains the latest posts. The final page always starts at byte #0 in the monolith. ''' from enum import Enum from mmap import mmap from bisect import bisect_left from config import conf from datetime import datetime, timezone MediaType = Enum("MediaType", ["IMAGE", "VIDEO"]) def consume_str(buf): nulp = buf.find(b'\0') rets = buf[0 : nulp].decode("utf-8") return rets, nulp def strbuf(s): return s.encode("utf-8") + b'\0' class MediaInstance: def __init__(self, type): self.type = type def __str__(self): match self.type: case MediaType.IMAGE: return f"Image: {self.thumbnail} {self.original}\n" case _: return f"Unknown media\n" def consume(buf): match buf[0 : 1]: case b'I': l = 1 b = buf[1:] thmb, p = consume_str(b) l += p + 1 b = b[p + 1:] orig, p = consume_str(b) l += p + 1 return MediaInstance.makeImage(thmb, orig), l case _: raise ValueError("Invalid media type") def to_buf(self): match self.type: case MediaType.IMAGE: return b'I' + strbuf(self.thumbnail) + strbuf(self.original) case _: raise ValueError("Unsupported media type") def dump(self): match self.type: case MediaType.IMAGE: print(f"({self.thumbnail}, {self.original})") case _: raise ValueError("Unsupported media type") def makeImage(thumb, orig): r = MediaInstance(MediaType.IMAGE) r.thumbnail = thumb r.original = orig return r class Post: ''' .content: utf-8 string .date: int, secs since unix epoch .media: list of MediaInstance .tags: list of strings ''' def __init__(self, cont, date, media, tags): self.content = cont self.date = date self.media = media self.tags = tags def __str__(self): medias = "\n".join([str(m) for m in self.media]) tags = ",".join([f'"{t}"' for t in self.tags]) return f"{self.content}\n{self.date}\n{medias}\n[{tags}]" def from_buf(buf): content, p = consume_str(buf) buf = buf[p + 1 :] date = int.from_bytes(buf[: 8], "little") buf = buf[8 :] media = [] nmedia = int.from_bytes(buf[: 1], "little") buf = buf[1 :] for i in range(0, nmedia): m, l = MediaInstance.consume(buf) media.append(m) buf = buf[l :] tags = [] ntags = int.from_bytes(buf[: 1], "little") buf = buf[1 :] for i in range(0, ntags): t, p = consume_str(buf) tags.append(t) buf = buf[p + 1:] return Post(content, date, media, tags) def to_buf(self): ret = strbuf(self.content) ret += self.date.to_bytes(8, "little") ret += len(self.media).to_bytes(1, "little") for m in self.media: ret += m.to_buf() ret += len(self.tags).to_bytes(1, "little") for t in self.tags: ret += strbuf(t) return ret + b'\x0b' def dump(self): print('=' * 40) print(self.content) print(datetime.fromtimestamp(self.date, tz=timezone.utc).isoformat()) for t in self.tags: print(f"#{t} ", end='') print("") for m in self.media: m.dump() class Monolith: def __init__(self, fn): self.filename = fn self.idxfn = f"{fn}.idx" self.pdxfn = f"{fn}.pdx" self.postranges = [] def _append_idxf(self, t, r): with open(self.idxfn, "ab") as f: buf = t.to_bytes(8, "little") + \ r.to_bytes(8, "little") if f.write(buf) != len(buf): raise RuntimeError("write failure") def clear(self): with open(self.filename, "wb"): pass with open(self.idxfn, "wb"): pass with open(self.pdxfn, "wb"): pass def append(self, post): with open(self.filename, "ab") as f: postbuf = post.to_buf() t = post.date l = f.tell() w = f.write(postbuf) if w != len(postbuf): raise RuntimeError("write failure") r = l + w self.postranges.append((t, l, r)) self._append_idxf(t, r) # self.generate_page_index() def load_index(self): with open(self.idxfn, "rb") as f: last_r = 0 self.postranges = [] while True: bs = f.read(16) if len(bs) == 0: break t = int.from_bytes(bs[0 : 8], "little") l = last_r r = int.from_bytes(bs[8 :16], "little") self.postranges.append((t, l, r)) last_r = r def write_index(self): with open(self.idxfn, "wb") as f: for (t, _, r) in self.postranges: f.write(t.to_bytes(8, "little") + \ r.to_bytes(8, "little")) def find_post(self, date): p = bisect_left(self.postranges, date, key=lambda p: p[0]) if p != len(self.postranges) and self.postranges[p][0] == date: return p return None def find_nearby_posts(self, date, r=2): p = bisect_left(self.postranges, date, key=lambda p: p[0]) left = max(p - r, 0) right = min(p + r + 1, len(self.postranges)) return [t for (t, _, _) in self.postranges[left : right]] def get_all_dates(self): return [t for (t, _, _) in self.postranges] def get_post(self, date): p = self.find_post(date) if p is None: return None t, l, r = self.postranges[p] with open(self.filename, "r+b") as f: d = mmap(f.fileno(), 0) post = Post.from_buf(d[l : r]) return post def replace_post(self, date, post): p = self.find_post(date) if p is None: return None t, l, r = self.postranges[p] new_post_buf = post.to_buf() dlen = len(new_post_buf) - (r - l) with open(self.filename, "r+b") as f: d = mmap(f.fileno(), 0) mlength = len(d) oldend = r newend = l + len(new_post_buf) if dlen > 0: d.resize(mlength + dlen) if dlen != 0: d.move(newend, oldend, mlength - oldend) if dlen < 0: d.resize(mlength + dlen) d[l : newend] = new_post_buf self.postranges[p] = (t, l, r + dlen) for i in range(p + 1, len(self.postranges)): t, l, r = self.postranges[i] self.postranges[i] = (t, l + dlen, r + dlen) self.write_index() def generate_page_index(self): posts_per_page = conf.POSTS_PER_PAGE ranges = [] for ub in range(len(self.postranges), 0, -posts_per_page): pr = ub - 1 _, _, r = self.postranges[pr] ranges.append(r) with open(self.pdxfn, "wb") as f: for r in ranges: f.write(r.to_bytes(8, "little"))