# Chris Xiong 2024
# License: Expat (MIT)
import os
import sys
import time
import shutil
import tempfile
import subprocess
from datetime import datetime, timezone
import postutil
from atomgen import gen_atom
from monolith import Monolith, MediaType
from config import conf
'''
Launches an editor (set by $EDITOR) to edit the given file
'''
def edit_file(fn):
editor = os.environ["EDITOR"]
subprocess.run([editor, fn])
'''
Opens an editor to create a new post.
The post will be formated, and all media will be processed accordingly.
'''
def new_post():
conf.require()
with tempfile.TemporaryDirectory() as dir:
fn = os.path.join(dir, "note.txt")
with open(fn, "w"): pass
edit_file(fn)
ts = time.time_ns() // 10 ** 9
if len(sys.argv) >= 3:
dt = datetime.fromisoformat(sys.argv[2])
ts = int(dt.timestamp())
postpath = postutil.move_post(fn, ts)
p = postutil.process_post(postpath, False)
if len(p.content) == 0 and len(p.media) == 0:
print("No post was made.")
os.remove(postpath)
return
print(f"Post {os.path.basename(postpath)} made!")
m = Monolith(os.path.join(conf.LOCAL_DATA_ROOT, "posts.monolith"))
m.append(p)
m.load_index()
m.generate_page_index()
if conf.ATOM_ENABLED:
gen_atom()
'''
Opens an editor to edit an existing post.
Post time cannot be modified and will always stay the same.
Media that can be found in the media_dropoff folder will be updated.
If a media entry is modified, the file it refers to must either be
present in the media_dropoff folder, or already in the corresponding
folder inside media_orig.
'''
def edit_post(ts):
conf.require()
m = Monolith(os.path.join(conf.LOCAL_DATA_ROOT, "posts.monolith"))
m.load_index()
if m.get_post(ts) is None:
print("No post was made at that time!")
# TODO: allow the user to select a post made near this time
return
d = datetime.fromtimestamp(ts, tz=timezone.utc)
pfn = f"{ts}-{d.isoformat()[:-6]}Z.post"
rp = os.path.join(os.path.join("posts", str(d.year)), pfn)
edit_file(os.path.join(conf.LOCAL_DATA_ROOT, rp))
p = postutil.process_post(rp, True)
m.replace_post(ts, p)
m.generate_page_index()
if conf.ATOM_ENABLED:
gen_atom()
'''
Regenerate the ENTIRE monolith file.
Horribly slow.
'''
def regen_monolith():
conf.require()
print("Do you want to regenerate the ENTIRE monolith file? [y/n]")
if input() not in ['Y', 'y']:
return
m = Monolith(os.path.join(conf.LOCAL_DATA_ROOT, "posts.monolith"))
m.clear()
postlist = []
postsp = os.path.join(conf.LOCAL_DATA_ROOT, "posts")
for ye in os.scandir(postsp):
inty = None
try:
inty = int(ye.name)
except ValueError:
pass
if inty is None or not ye.is_dir():
continue
yearp = os.path.join(postsp, ye.name)
postlist += [os.path.join(yearp, p) for p in filter(lambda x: x.endswith(".post"), os.listdir(yearp))]
def keyf(x):
b = os.path.basename(x)
return int(b[:b.index('-')])
postlist = sorted(postlist, key=keyf)
for pfn in postlist:
p = postutil.process_post(pfn, True)
m.append(p)
m.load_index()
m.generate_page_index()
if conf.ATOM_ENABLED:
gen_atom()
'''
Sync all local data to the configured remote host for serving.
'''
def sync_remote():
conf.require()
subprocess.run(["rsync", "-rLptgoDzv", "--exclude=posts", "--exclude=media_dropoff", conf.LOCAL_DATA_ROOT + "/", conf.SYNC_TARGET])
'''
Create a new notekins instance with all files and directories that it expects.
'''
def init_instance():
repop = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
print(repop)
if len(sys.argv) < 3:
print("Missing path to the new instance.")
return
targetp = sys.argv[2].rstrip('/')
os.mkdir(targetp, mode=0o755)
os.mkdir(os.path.join(targetp, "posts"), mode=0o755)
os.mkdir(os.path.join(targetp, "emotes"), mode=0o755)
os.mkdir(os.path.join(targetp, "media_dropoff"), mode=0o755)
os.mkdir(os.path.join(targetp, "media_orig"), mode=0o755)
os.mkdir(os.path.join(targetp, "media_thmb"), mode=0o755)
shutil.copytree(os.path.join(repop, "template"), os.path.join(targetp, "template"))
with open(os.path.join(targetp, "notekins.conf"), "w") as f:
f.write(f"LOCAL_DATA_ROOT={targetp}")
m = Monolith(os.path.join(targetp, "posts.monolith"))
m.clear()
'''
Clean up any media file that isn't used in the monolith file.
'''
def media_cleanup():
conf.require()
origs = [os.path.relpath(os.path.join(a, x), conf.LOCAL_DATA_ROOT) for (a, b, c) in os.walk(os.path.join(conf.LOCAL_DATA_ROOT, "media_orig")) for x in c]
thmbs = [os.path.relpath(os.path.join(a, x), conf.LOCAL_DATA_ROOT) for (a, b, c) in os.walk(os.path.join(conf.LOCAL_DATA_ROOT, "media_thmb")) for x in c]
referee = set()
m = Monolith(os.path.join(conf.LOCAL_DATA_ROOT, "posts.monolith"))
m.load_index()
for d in m.get_all_dates():
p = m.get_post(d)
for media in p.media:
if media.type == MediaType.IMAGE:
referee.add(media.thumbnail)
referee.add(media.original)
pdel = []
for f in origs + thmbs:
if f not in referee:
pdel.append(f)
if len(pdel) > 0:
for p in pdel: print(p)
print(f"Found {len(pdel)} unsed media file(s). Delete these files? [y/n]")
if input() not in ['Y', 'y']:
return
for p in pdel: os.remove(os.path.join(conf.LOCAL_DATA_ROOT, p))
else:
print("No unused media files found.")
def main():
if len(sys.argv) < 2:
print("Missing command. Available commands:")
print("new Create a new post.")
print("edit Edit an existing post. Requires a post timestamp.")
print("atom Generate atom feed.")
print("regen Regenerate the entire monolith file.")
print("sync Sync data to remote for hosting.")
print("init Initialize a new Notekins instance. Requires path to the instance.")
print("clean Clean up unused media files.")
print("dump Dump the content of the monolith file.")
return
match sys.argv[1]:
case "new":
new_post()
case "edit":
edit_post(int(sys.argv[2]))
case "atom":
gen_atom()
case "regen":
regen_monolith()
case "sync":
sync_remote()
case "init":
init_instance()
case "clean":
media_cleanup()
case "dump":
m = Monolith(os.path.join(conf.LOCAL_DATA_ROOT, "posts.monolith"))
m.load_index()
for d in m.get_all_dates():
m.get_post(d).dump()
if __name__ == "__main__":
main()