1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
# Chris Xiong 2024
# License: Expat (MIT)
import os
import sys
import time
import shutil
import tempfile
import subprocess
from datetime import datetime, timezone
import postutil
from atomgen import gen_atom
from monolith import Monolith
from config import conf
'''
Launches an editor (set by $EDITOR) to edit the given file
'''
def edit_file(fn):
editor = os.environ["EDITOR"]
subprocess.run([editor, fn])
'''
Opens an editor to create a new post.
The post will be formated, and all media will be processed accordingly.
'''
def new_post():
conf.require()
with tempfile.TemporaryDirectory() as dir:
fn = os.path.join(dir, "note.txt")
with open(fn, "w"): pass
edit_file(fn)
ts = time.time_ns() // 10 ** 9
if len(sys.argv) >= 3:
dt = datetime.fromisoformat(sys.argv[2])
ts = int(dt.timestamp())
postpath = postutil.move_post(fn, ts)
p = postutil.process_post(postpath, False)
if len(p.content) == 0 and len(p.media) == 0:
print("No post was made.")
os.remove(postpath)
return
print(f"Post {os.path.basename(postpath)} made!")
m = Monolith(os.path.join(conf.LOCAL_DATA_ROOT, "posts.monolith"))
m.append(p)
m.load_index()
m.generate_page_index()
if conf.ATOM_ENABLED:
gen_atom()
'''
Opens an editor to edit an existing post.
Post time cannot be modified and will always stay the same.
Media that can be found in the media_dropoff folder will be updated.
If a media entry is modified, the file it refers to must either be
present in the media_dropoff folder, or already in the corresponding
folder inside media_orig.
'''
def edit_post(ts):
conf.require()
m = Monolith(os.path.join(conf.LOCAL_DATA_ROOT, "posts.monolith"))
m.load_index()
if m.get_post(ts) is None:
print("No post was made at that time!")
# TODO: allow the user to select a post made near this time
return
d = datetime.fromtimestamp(ts, tz=timezone.utc)
pfn = f"{ts}-{d.isoformat()[:-6]}Z.post"
rp = os.path.join(os.path.join("posts", str(d.year)), pfn)
edit_file(os.path.join(conf.LOCAL_DATA_ROOT, rp))
p = postutil.process_post(rp, True)
m.replace_post(ts, p)
m.generate_page_index()
if conf.ATOM_ENABLED:
gen_atom()
'''
Regenerate the ENTIRE monolith file.
Horribly slow.
'''
def regen_monolith():
conf.require()
print("Do you want to regenerate the ENTIRE monolith file? [y/n]")
if input() not in ['Y', 'y']:
return
m = Monolith(os.path.join(conf.LOCAL_DATA_ROOT, "posts.monolith"))
m.clear()
postlist = []
postsp = os.path.join(conf.LOCAL_DATA_ROOT, "posts")
for ye in os.scandir(postsp):
inty = None
try:
inty = int(ye.name)
except ValueError:
pass
if inty is None or not ye.is_dir():
continue
yearp = os.path.join(postsp, ye.name)
postlist += [os.path.join(yearp, p) for p in filter(lambda x: x.endswith(".post"), os.listdir(yearp))]
def keyf(x):
b = os.path.basename(x)
return int(b[:b.index('-')])
postlist = sorted(postlist, key=keyf)
for pfn in postlist:
p = postutil.process_post(pfn, True)
m.append(p)
m.load_index()
m.generate_page_index()
if conf.ATOM_ENABLED:
gen_atom()
'''
Sync all local data to the configured remote host for serving.
'''
def sync_remote():
conf.require()
subprocess.run(["rsync", "-rLptgoDzv", "--exclude=posts", "--exclude=media_dropoff", conf.LOCAL_DATA_ROOT + "/", conf.SYNC_TARGET])
'''
Create a new notekins instance with all files and directories that it expects.
'''
def init_instance():
repop = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
print(repop)
if len(sys.argv) < 3:
print("Missing path to the new instance.")
return
targetp = sys.argv[2].rstrip('/')
os.mkdir(targetp, mode=0o755)
os.mkdir(os.path.join(targetp, "posts"), mode=0o755)
os.mkdir(os.path.join(targetp, "emotes"), mode=0o755)
os.mkdir(os.path.join(targetp, "media_dropoff"), mode=0o755)
os.mkdir(os.path.join(targetp, "media_orig"), mode=0o755)
os.mkdir(os.path.join(targetp, "media_thmb"), mode=0o755)
shutil.copytree(os.path.join(repop, "template"), os.path.join(targetp, "template"))
with open(os.path.join(targetp, "notekins.conf"), "w") as f:
f.write(f"LOCAL_DATA_ROOT={targetp}")
m = Monolith(os.path.join(targetp, "posts.monolith"))
m.clear()
'''
Clean up any media file that isn't used in the monolith file.
TODO.
'''
def media_cleanup():
conf.require()
pass
def main():
if len(sys.argv) < 2:
print("Missing command. Available commands:")
print("new Create a new post.")
print("edit Edit an existing post. Requires a post timestamp.")
print("atom Generate atom feed.")
print("regen Regenerate the entire monolith file.")
print("sync Sync data to remote for hosting.")
print("init Initialize a new Notekins instance. Requires path to the instance.")
print("dump Dump the content of the monolith file.")
return
match sys.argv[1]:
case "new":
new_post()
case "edit":
edit_post(int(sys.argv[2]))
case "atom":
gen_atom()
case "regen":
regen_monolith()
case "sync":
sync_remote()
case "init":
init_instance()
case "dump":
m = Monolith(os.path.join(conf.LOCAL_DATA_ROOT, "posts.monolith"))
m.load_index()
for d in m.get_all_dates():
m.get_post(d).dump()
if __name__ == "__main__":
main()
|