Implement LazyArray.splice(), implement unshift/shift/push/pop in terms of splice...
[logjson.git] / Database.mjs
1 import assert from 'assert'
2 import fsPromises from 'fs/promises'
3 import CachedValue from './CachedValue.mjs'
4 import CopyCollector from './CopyCollector.mjs'
5 import Mutex from './Mutex.mjs'
6 import Transaction from './Transaction.mjs'
7
8 let open_angle = Buffer.from('\n<', 'utf-8')
9 let close_angle = Buffer.from('>\n', 'utf-8')
10 class Database {
11   constructor(read_timeout, write_timeout, block_size) {
12     this.read_timeout = read_timeout || 3600
13     this.write_timeout = write_timeout || 5
14     this.block_size = block_size || 0x1000
15     this.mutex = new Mutex()
16     this.log_path = null
17     this.log = null
18     this.eof = 0
19     this.value = undefined
20     this.read_cache = {}
21     this.write_list = []
22     this.write_list_len = 0
23     this.write_count = 0
24     //this.serial = 0
25   }
26
27   async open(path) {
28     await this.mutex.acquire()
29     assert(this.log === null)
30
31     this.log_path = path
32     this.eof = 0
33     this.value = undefined
34     this.read_cache = {}
35     this.write_list = []
36     this.write_list_len = 0
37     this.write_count = 0
38  
39     try {
40       this.log = await fsPromises.open(path, 'r+')
41     }
42     catch (error) {
43       if (error.code !== 'ENOENT')
44         throw error
45
46       try {
47         await fsPromises.rename(path + '.new', path)
48         this.log = await fsPromises.open(path, 'r+')
49       }
50       catch (error) {
51         console.log('warning: can\'t find database file, creating')
52         this.log = await fsPromises.open(path, 'w+')
53
54         this.mutex.release()
55         return
56       }
57
58       console.log('warning: completed incomplete log file rotation')
59     }
60
61     this.eof = (await this.log.stat()).size
62
63     let eof_buffer
64     try {
65       eof_buffer = await (
66         async () => {
67           let ptr = this.eof & ~(this.block_size - 1)
68           let count = this.eof & (this.block_size - 1)
69           let buffer = Buffer.alloc(this.block_size + 1)
70           assert(
71             (await this.log.read(buffer, 0, count, ptr)).bytesRead === count
72           )
73           while (true) {
74             let i
75             if (
76               count >= 2 &&
77                 (i = buffer.lastIndexOf(close_angle, count - 2)) !== -1
78             ) {
79               let eof = ptr + i + 2
80               let blocks = [buffer.slice(0, i)]
81               count = i
82               while (true) {
83                 if (
84                   count >= 2 &&
85                     (i = buffer.lastIndexOf(open_angle, count - 2)) !== -1
86                 ) {
87                   blocks[blocks.length - 1] = blocks[blocks.length - 1].slice(i)
88                   return {eof, buffer: Buffer.concat(blocks.reverse()).slice(2)}
89                 }
90
91                 if (ptr === 0)
92                   break
93                 ptr -= this.block_size
94                 let new_buffer = Buffer.alloc(this.block_size + 1)
95                 if (count) {
96                   new_buffer[this.block_size] = buffer[0]
97                   count = this.block_size + 1
98                 }
99                 else
100                   count = this.block_size
101                 buffer = new_buffer
102                 assert(
103                   (await this.log.read(buffer, 0, this.block_size, ptr)).bytesRead === this.block_size
104                 )
105                 blocks.push(buffer.slice(0, this.block_size))
106               }
107
108               // special case for < at start of database
109               if (count && buffer[0] === 0x3c)
110                 return {eof, buffer: Buffer.concat(blocks.reverse()).slice(1)}
111
112               throw new Error('can\'t find logjson start marker')
113             }
114
115             if (ptr === 0)
116               break
117             ptr -= this.block_size
118             if (count) {
119               buffer[this.block_size] = buffer[0]
120               count = this.block_size + 1
121             }
122             else
123               count = this.block_size
124             assert(
125               (await this.log.read(buffer, 0, this.block_size, ptr)).bytesRead === this.block_size
126             )
127           }
128
129           throw new Error('can\'t find logjson end marker')
130         }
131       )()
132     }
133     catch (error) {
134       console.log('warning: can\'t find root, truncating database file')
135       await this.log.truncate(0)
136
137       this.mutex.release()
138       return
139     }
140     let {eof, buffer} = eof_buffer
141     this.value = JSON.parse(buffer.toString('utf-8'))
142
143     // optional: trim any garbage off the end of the database file
144     if (eof < this.eof) {
145       console.log('warning: garbage after root, truncating database file')
146       await this.log.truncate(eof)
147       this.eof = eof
148     }
149
150     this.mutex.release()
151   }
152
153   async close() {
154     await this.mutex.acquire()
155     assert(this.log !== null)
156
157     await this.flush()
158     await this.log.close()
159     this.log = null
160
161     this.mutex.release()
162   }
163
164   async read(ptr_len) {
165     // hold mutex whilst calling this function
166     let [ptr, len] = ptr_len
167
168     let value
169     if (Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) {
170       //console.log('hit', ptr)
171       let cached_value = this.read_cache[ptr]
172       cached_value.stale_count = this.read_timeout
173       value = cached_value.value
174     }
175     else {
176       let buffer = Buffer.alloc(len)
177       assert(
178         (await this.log.read(buffer, 0, len, ptr)).bytesRead === len
179       )
180       value = JSON.parse(buffer.toString('utf-8'))
181
182       if (!this.read_timeout)
183         return value
184       //console.log('add', ptr)
185       this.read_cache[ptr] = new CachedValue(0, value, this.read_timeout)
186     }
187
188     // value is from cache, copy it
189     assert(typeof value === 'object' && value !== null)
190     let new_value
191     if (value instanceof Array) {
192       new_value = []
193       for (let i = 0; i < value.length; ++i)
194         new_value.push(value[i])
195     }
196     else {
197       new_value = {}
198       for (let i in value)
199         new_value[i] = value[i]
200     }
201     return new_value
202   }
203
204   async write(value) {
205     // hold mutex whilst calling this function
206     let ptr = this.eof + this.write_list_len
207     //console.log('addw', ptr)
208     assert(!Object.prototype.hasOwnProperty.call(this.read_cache, ptr))
209     this.read_cache[ptr] = new CachedValue(1, value, this.read_timeout)
210
211     let buffer = Buffer.from(
212       JSON.stringify(value, null, 2) + '\n',
213       'utf-8'
214     )
215     this.write_list.push(buffer)
216     this.write_list_len += buffer.length
217
218     if (this.write_list_len >= this.block_size)
219       await this.flush()
220
221     return [ptr, buffer.length - 1]
222   }
223
224   async write_root(value) {
225     // hold mutex whilst calling this function
226     let buffer = Buffer.from(
227       `<${JSON.stringify(value, null, 2)}>\n`,
228       'utf-8'
229     )
230     this.write_list.push(buffer)
231     this.write_list_len += buffer.length
232
233     if (this.write_list_len >= this.block_size)
234       await this.flush()
235   }
236
237   async flush() {
238     // hold mutex whilst calling this function
239     if (this.write_list_len === 0)
240       return
241
242     let buffer = Buffer.concat(this.write_list)
243     let len = buffer.length
244     assert(len === this.write_list_len)
245     assert(
246       (await this.log.write(buffer, 0, len, this.eof)).bytesWritten === len
247     )
248
249     // each block in write list is also pinned in read cache, unpin
250     // note: do this afterwards as needs to be on disk when unpinned
251     // note: special root entry is not in read cache and is ignored
252     let ptr = this.eof
253     for (let i = 0; i < this.write_list.length; ++i) {
254       if (this.write_list[i][0] !== 0x3c) {
255         let cached_value = this.read_cache[ptr]
256         --cached_value.refs
257         if (cached_value.stale_count === 0)
258           delete this.read_cache[ptr]
259       }
260       ptr += this.write_list[i].length
261     }
262     assert(ptr === this.eof + this.write_list_len)
263     this.eof = ptr
264
265     this.write_list = []
266     this.write_list_len = 0
267   }
268
269   async Transaction() {
270     await this.mutex.acquire()
271     //let transaction = new Transaction(this, this.value)
272     //transaction.serial = this.serial
273     //this.serial += 1
274     //console.log('create transaction', transaction.serial)
275     //return transaction
276     return new Transaction(this, this.value)
277   }
278
279   async kick() {
280     await this.mutex.acquire()
281
282     for (let i in this.read_cache) {
283       let cached_value = this.read_cache[i]
284       if (cached_value.refs === 0 && --cached_value.stale_count === 0) //{
285         //console.log('stale', i)
286         delete this.read_cache[i]
287       //}
288     }
289
290     if (this.write_count && --this.write_count === 0)
291       await this.flush()
292
293     this.mutex.release()
294   }
295
296   async rotate(archive_path) {
297     // note: this function is not re-entrant
298     // it should only be called from a garbage collection thread
299     let copy_collector = new CopyCollector()
300     let new_path = this.log_path + '.new'
301     await copy_collector.create(new_path)
302
303     await this.mutex.acquire()
304     await this.flush()
305
306     let value, new_value
307     do {
308       value = this.value
309       this.mutex.release()
310
311       new_value = await copy_collector.copy_pass(this.log, value)
312
313       await this.mutex.acquire()
314       await this.flush()
315     } while (value !== this.value)
316
317     // since root has not changed since last flush, can't be anything dirty
318     assert(this.write_list_len === 0)
319
320     // rotate the log files
321     await this.log.close()
322     await fsPromises.rename(this.log_path, archive_path)
323     await fsPromises.rename(new_path, this.log_path)
324     this.log = copy_collector.log
325     this.eof = copy_collector.eof
326     this.value = new_value
327
328     // translate read cache via the copy collector's translation table,
329     // done so that website performance is less affected by the rotation
330     let old_read_cache = this.read_cache
331     this.read_cache = {}
332     for (let i in old_read_cache) {
333       let cached_value = old_read_cache[i]
334       assert(cached_value.refs === 0) // can't be anything dirty
335
336       if (
337         Object.prototype.hasOwnProperty.call(
338           copy_collector.translate,
339           i
340         )
341       ) {
342         // cached object is reachable, translate (child objects reachable too)
343         let value = cached_value.value
344
345         assert(typeof(value) === 'object' && value !== null)
346         if (value instanceof Array)
347           for (let i = 0; i < value.length; ++i) {
348             let child_value = value[i]
349             if (child_value instanceof Array) {
350               let [ptr, len] = child_value
351               assert(
352                 Object.prototype.hasOwnProperty.call(
353                   copy_collector.translate,
354                   ptr
355                 )
356               )
357               value[i] = copy_collector.translate[ptr]
358             }
359           }
360         else
361           for (let i in value) {
362             let child_value = value[i]
363             if (child_value instanceof Array) {
364               let [ptr, len] = child_value
365               assert(
366                 Object.prototype.hasOwnProperty.call(
367                   copy_collector.translate,
368                   ptr
369                 )
370               )
371               value[i] = copy_collector.translate[ptr]
372             }
373           }
374
375         let [ptr, len] = copy_collector.translate[i]
376         this.read_cache[ptr] = cached_value
377       }
378     }
379
380     this.mutex.release()
381   }
382 }
383
384 export default Database