/tests/a.json
/tests/b.json
/tests/c.json
+/tests/d.json
/tests/a.logjson
+/tests/b.logjson
--- /dev/null
+import assert from 'assert'
+import fsPromises from 'fs/promises'
+
+class CopyCollector {
+ constructor(block_size) {
+ this.block_size = block_size || 0x1000
+ this.log = null
+ this.eof = 0
+ this.translate = {}
+ this.write_list = []
+ this.write_list_len = 0
+ }
+
+ async create(path) {
+ assert(this.log === null)
+
+ this.log = await fsPromises.open(path, 'w+')
+ this.eof = 0
+ this.translate = {}
+ this.write_list = []
+ this.write_list_len = 0
+ }
+
+ async close() {
+ assert(this.log !== null)
+
+ //await this.flush()
+ await this.log.close()
+ this.log = null
+ }
+
+ async write(value) {
+ let ptr = this.eof + this.write_list_len
+
+ let buffer = Buffer.from(
+ JSON.stringify(value, null, 2) + '\n',
+ 'utf-8'
+ )
+ this.write_list.push(buffer)
+ this.write_list_len += buffer.length
+
+ if (this.write_list_len >= this.block_size)
+ await this.flush()
+
+ return [ptr, buffer.length - 1]
+ }
+
+ async write_root(value) {
+ let buffer = Buffer.from(
+ `<${JSON.stringify(value, null, 2)}>\n`,
+ 'utf-8'
+ )
+ this.write_list.push(buffer)
+ this.write_list_len += buffer.length
+
+ if (this.write_list_len >= this.block_size)
+ await this.flush()
+ }
+
+ async flush() {
+ if (this.write_list_len === 0)
+ return
+
+ let buffer = Buffer.concat(this.write_list)
+ let len = buffer.length
+ assert(len === this.write_list_len)
+ assert(
+ (await this.log.write(buffer, 0, len, this.eof)).bytesWritten === len
+ )
+ this.eof += len
+
+ this.write_list = []
+ this.write_list_len = 0
+ }
+
+ async copy_pass(log, value) {
+ let copy = async ptr_len => {
+ let [ptr, len] = ptr_len
+
+ if (Object.prototype.hasOwnProperty.call(this.translate, ptr))
+ return this.translate[ptr]
+
+ let buffer = Buffer.alloc(len)
+ assert(
+ (await log.read(buffer, 0, len, ptr)).bytesRead === len
+ )
+ let value = JSON.parse(buffer.toString('utf-8'))
+
+ assert(typeof value === 'object' && value !== null)
+ if (value instanceof Array)
+ for (let i = 0; i < value.length; ++i) {
+ let child_value = value[i]
+ if (child_value instanceof Array)
+ value[i] = await copy(child_value)
+ }
+ else
+ for (let i in value) {
+ let child_value = value[i]
+ if (child_value instanceof Array)
+ value[i] = await copy(child_value)
+ }
+
+ let result = await this.write(value)
+ this.translate[ptr] = result
+ return result
+ }
+
+ if (value instanceof Array)
+ value = await copy(value)
+ await this.write_root(value)
+ await this.flush()
+ return value
+ }
+}
+
+export default CopyCollector
import assert from 'assert'
import fsPromises from 'fs/promises'
import CachedValue from './CachedValue.mjs'
+import CopyCollector from './CopyCollector.mjs'
import Mutex from './Mutex.mjs'
import Transaction from './Transaction.mjs'
this.write_timeout = write_timeout || 5
this.block_size = block_size || 0x1000
this.mutex = new Mutex()
+ this.log_path = null
this.log = null
this.eof = 0
this.value = undefined
await this.mutex.acquire()
assert(this.log === null)
- this.eof = 0 // before write_list
+ this.log_path = path
+ this.eof = 0
this.value = undefined
this.read_cache = {}
this.write_list = []
if (error.code !== 'ENOENT')
throw error
- console.log('warning: can\'t find database file, creating')
- this.log = await fsPromises.open(path, 'w+')
+ try {
+ await fsPromises.rename(path + '.new', path)
+ this.log = await fsPromises.open(path, 'r+')
+ }
+ catch (error) {
+ console.log('warning: can\'t find database file, creating')
+ this.log = await fsPromises.open(path, 'w+')
- this.mutex.release()
- return
+ this.mutex.release()
+ return
+ }
+
+ console.log('warning: completed incomplete log file rotation')
}
this.eof = (await this.log.stat()).size
this.mutex.release()
}
+
+ async rotate(archive_path) {
+ // note: this function is not re-entrant
+ // it should only be called from a garbage collection thread
+ let copy_collector = new CopyCollector()
+ let new_path = this.log_path + '.new'
+ await copy_collector.create(new_path)
+
+ await this.mutex.acquire()
+ await this.flush()
+
+ let value, new_value
+ do {
+ value = this.value
+ this.mutex.release()
+
+ new_value = await copy_collector.copy_pass(this.log, value)
+
+ await this.mutex.acquire()
+ await this.flush()
+ } while (value !== this.value)
+
+ // since root has not changed since last flush, can't be anything dirty
+ assert(this.write_list_len === 0)
+
+ // rotate the log files
+ await this.log.close()
+ await fsPromises.rename(this.log_path, archive_path)
+ await fsPromises.rename(new_path, this.log_path)
+ this.log = copy_collector.log
+ this.eof = copy_collector.eof
+ this.value = new_value
+
+ // translate read cache via the copy collector's translation table,
+ // done so that website performance is less affected by the rotation
+ let old_read_cache = this.read_cache
+ this.read_cache = {}
+ for (let i in old_read_cache) {
+ let cached_value = old_read_cache[i]
+ assert(cached_value.refs === 0) // can't be anything dirty
+
+ if (
+ Object.prototype.hasOwnProperty.call(
+ copy_collector.translate,
+ i
+ )
+ ) {
+ // cached object is reachable, translate (child objects reachable too)
+ let value = cached_value.value
+
+ assert(typeof(value) === 'object' && value !== null)
+ if (value instanceof Array)
+ for (let i = 0; i < value.length; ++i) {
+ let child_value = value[i]
+ if (child_value instanceof Array) {
+ let [ptr, len] = child_value
+ assert(
+ Object.prototype.hasOwnProperty.call(
+ copy_collector.translate,
+ ptr
+ )
+ )
+ value[i] = copy_collector.translate[ptr]
+ }
+ }
+ else
+ for (let i in value) {
+ let child_value = value[i]
+ if (child_value instanceof Array) {
+ let [ptr, len] = child_value
+ assert(
+ Object.prototype.hasOwnProperty.call(
+ copy_collector.translate,
+ ptr
+ )
+ )
+ value[i] = copy_collector.translate[ptr]
+ }
+ }
+
+ let [ptr, len] = copy_collector.translate[i]
+ this.read_cache[ptr] = cached_value
+ }
+ }
+
+ this.mutex.release()
+ }
}
export default Database
async read(ptr_len) {
let value = await this.database.read(ptr_len)
- if (typeof value === 'object' && value !== null)
- value =
- value instanceof Array ?
- new LazyArray(this, ptr_len, value) :
- new LazyObject(this, ptr_len, value)
- return value
+ assert(typeof value === 'object' && value !== null)
+ return (
+ value instanceof Array ?
+ new LazyArray(this, ptr_len, value) :
+ new LazyObject(this, ptr_len, value)
+ )
}
async get(default_value) {
)
transaction.rollback()
+await database.rotate('b.logjson')
+
+/*let*/ transaction = database.Transaction()
+await fsPromises.writeFile(
+ 'c.json',
+ Buffer.from(
+ JSON.stringify(
+ await logjson.logjson_to_json(await transaction.get()),
+ null,
+ 2
+ ) + '\n',
+ 'utf-8'
+ )
+)
+transaction.rollback()
+
await database.close()
await database.open('a.logjson')
let transaction = database.Transaction()
await fsPromises.writeFile(
- 'c.json',
+ 'd.json',
Buffer.from(
JSON.stringify(
await logjson.logjson_to_json(await transaction.get()),