1 import assert from 'assert'
2 import fsPromises from 'fs/promises'
3 import CachedValue from './CachedValue.mjs'
4 import CopyCollector from './CopyCollector.mjs'
5 import Mutex from './Mutex.mjs'
6 import Transaction from './Transaction.mjs'
8 let open_angle = Buffer.from('\n<', 'utf-8')
9 let close_angle = Buffer.from('>\n', 'utf-8')
11 constructor(read_timeout, write_timeout, block_size) {
12 this.read_timeout = read_timeout || 3600
13 this.write_timeout = write_timeout || 5
14 this.block_size = block_size || 0x1000
15 this.mutex = new Mutex()
19 this.value = undefined
22 this.write_list_len = 0
28 await this.mutex.acquire()
29 assert(this.log === null)
33 this.value = undefined
36 this.write_list_len = 0
40 this.log = await fsPromises.open(path, 'r+')
43 if (error.code !== 'ENOENT')
47 await fsPromises.rename(path + '.new', path)
48 this.log = await fsPromises.open(path, 'r+')
51 console.log('warning: can\'t find database file, creating')
52 this.log = await fsPromises.open(path, 'w+')
58 console.log('warning: completed incomplete log file rotation')
61 this.eof = (await this.log.stat()).size
67 let ptr = this.eof & ~(this.block_size - 1)
68 let count = this.eof & (this.block_size - 1)
69 let buffer = Buffer.alloc(this.block_size + 1)
71 (await this.log.read(buffer, 0, count, ptr)).bytesRead === count
77 (i = buffer.lastIndexOf(close_angle, count - 2)) !== -1
80 let blocks = [buffer.slice(0, i)]
85 (i = buffer.lastIndexOf(open_angle, count - 2)) !== -1
87 blocks[blocks.length - 1] = blocks[blocks.length - 1].slice(i)
88 return {eof, buffer: Buffer.concat(blocks.reverse()).slice(2)}
93 ptr -= this.block_size
94 let new_buffer = Buffer.alloc(this.block_size + 1)
96 new_buffer[this.block_size] = buffer[0]
97 count = this.block_size + 1
100 count = this.block_size
103 (await this.log.read(buffer, 0, this.block_size, ptr)).bytesRead === this.block_size
105 blocks.push(buffer.slice(0, this.block_size))
108 // special case for < at start of database
109 if (count && buffer[0] === 0x3c)
110 return {eof, buffer: Buffer.concat(blocks.reverse()).slice(1)}
112 throw new Error('can\'t find logjson start marker')
117 ptr -= this.block_size
119 buffer[this.block_size] = buffer[0]
120 count = this.block_size + 1
123 count = this.block_size
125 (await this.log.read(buffer, 0, this.block_size, ptr)).bytesRead === this.block_size
129 throw new Error('can\'t find logjson end marker')
134 console.log('warning: can\'t find root, truncating database file')
135 await this.log.truncate(0)
140 let {eof, buffer} = eof_buffer
141 this.value = JSON.parse(buffer.toString('utf-8'))
143 // optional: trim any garbage off the end of the database file
144 if (eof < this.eof) {
145 console.log('warning: garbage after root, truncating database file')
146 await this.log.truncate(eof)
154 await this.mutex.acquire()
155 assert(this.log !== null)
158 await this.log.close()
164 async read(ptr_len) {
165 // hold mutex whilst calling this function
166 let [ptr, len] = ptr_len
169 if (Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) {
170 //console.log('hit', ptr)
171 let cached_value = this.read_cache[ptr]
172 cached_value.stale_count = this.read_timeout
173 value = cached_value.value
176 let buffer = Buffer.alloc(len)
178 (await this.log.read(buffer, 0, len, ptr)).bytesRead === len
180 value = JSON.parse(buffer.toString('utf-8'))
182 if (!this.read_timeout)
184 //console.log('add', ptr)
185 this.read_cache[ptr] = new CachedValue(0, value, this.read_timeout)
188 // value is from cache, copy it
189 assert(typeof value === 'object' && value !== null)
191 if (value instanceof Array) {
193 for (let i = 0; i < value.length; ++i)
194 new_value.push(value[i])
199 new_value[i] = value[i]
205 // hold mutex whilst calling this function
206 let ptr = this.eof + this.write_list_len
207 //console.log('addw', ptr)
208 assert(!Object.prototype.hasOwnProperty.call(this.read_cache, ptr))
209 this.read_cache[ptr] = new CachedValue(1, value, this.read_timeout)
211 let buffer = Buffer.from(
212 JSON.stringify(value, null, 2) + '\n',
215 this.write_list.push(buffer)
216 this.write_list_len += buffer.length
218 if (this.write_list_len >= this.block_size)
221 return [ptr, buffer.length - 1]
224 async write_root(value) {
225 // hold mutex whilst calling this function
226 let buffer = Buffer.from(
227 `<${JSON.stringify(value, null, 2)}>\n`,
230 this.write_list.push(buffer)
231 this.write_list_len += buffer.length
233 if (this.write_list_len >= this.block_size)
238 // hold mutex whilst calling this function
239 if (this.write_list_len === 0)
242 let buffer = Buffer.concat(this.write_list)
243 let len = buffer.length
244 assert(len === this.write_list_len)
246 (await this.log.write(buffer, 0, len, this.eof)).bytesWritten === len
249 // each block in write list is also pinned in read cache, unpin
250 // note: do this afterwards as needs to be on disk when unpinned
251 // note: special root entry is not in read cache and is ignored
253 for (let i = 0; i < this.write_list.length; ++i) {
254 if (this.write_list[i][0] !== 0x3c) {
255 let cached_value = this.read_cache[ptr]
257 if (cached_value.stale_count === 0)
258 delete this.read_cache[ptr]
260 ptr += this.write_list[i].length
262 assert(ptr === this.eof + this.write_list_len)
266 this.write_list_len = 0
269 async Transaction() {
270 await this.mutex.acquire()
271 //let transaction = new Transaction(this, this.value)
272 //transaction.serial = this.serial
274 //console.log('create transaction', transaction.serial)
276 return new Transaction(this, this.value)
280 await this.mutex.acquire()
282 for (let i in this.read_cache) {
283 let cached_value = this.read_cache[i]
284 if (cached_value.refs === 0 && --cached_value.stale_count === 0) //{
285 //console.log('stale', i)
286 delete this.read_cache[i]
290 if (this.write_count && --this.write_count === 0)
296 async rotate(archive_path) {
297 // note: this function is not re-entrant
298 // it should only be called from a garbage collection thread
299 let copy_collector = new CopyCollector()
300 let new_path = this.log_path + '.new'
301 await copy_collector.create(new_path)
303 await this.mutex.acquire()
311 new_value = await copy_collector.copy_pass(this.log, value)
313 await this.mutex.acquire()
315 } while (value !== this.value)
317 // since root has not changed since last flush, can't be anything dirty
318 assert(this.write_list_len === 0)
320 // rotate the log files
321 await this.log.close()
322 await fsPromises.rename(this.log_path, archive_path)
323 await fsPromises.rename(new_path, this.log_path)
324 this.log = copy_collector.log
325 this.eof = copy_collector.eof
326 this.value = new_value
328 // translate read cache via the copy collector's translation table,
329 // done so that website performance is less affected by the rotation
330 let old_read_cache = this.read_cache
332 for (let i in old_read_cache) {
333 let cached_value = old_read_cache[i]
334 assert(cached_value.refs === 0) // can't be anything dirty
337 Object.prototype.hasOwnProperty.call(
338 copy_collector.translate,
342 // cached object is reachable, translate (child objects reachable too)
343 let value = cached_value.value
345 assert(typeof(value) === 'object' && value !== null)
346 if (value instanceof Array)
347 for (let i = 0; i < value.length; ++i) {
348 let child_value = value[i]
349 if (child_value instanceof Array) {
350 let [ptr, len] = child_value
352 Object.prototype.hasOwnProperty.call(
353 copy_collector.translate,
357 value[i] = copy_collector.translate[ptr]
361 for (let i in value) {
362 let child_value = value[i]
363 if (child_value instanceof Array) {
364 let [ptr, len] = child_value
366 Object.prototype.hasOwnProperty.call(
367 copy_collector.translate,
371 value[i] = copy_collector.translate[ptr]
375 let [ptr, len] = copy_collector.translate[i]
376 this.read_cache[ptr] = cached_value
384 export default Database