From 13cda6ba8011d626dd49218493808301a01e5c87 Mon Sep 17 00:00:00 2001 From: Nick Downing Date: Fri, 7 Jan 2022 16:02:08 +1100 Subject: [PATCH] Add write list --- a.mjs | 3 +- logjson.mjs | 102 +++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 76 insertions(+), 29 deletions(-) diff --git a/a.mjs b/a.mjs index 3b27c31..89f8fc8 100755 --- a/a.mjs +++ b/a.mjs @@ -3,7 +3,7 @@ import logjson from './logjson.mjs' import fsPromises from 'fs/promises' -let database = new logjson.Database(5) +let database = new logjson.Database(5, 1) await database.open('a.logjson') let transaction = database.Transaction() @@ -26,6 +26,7 @@ await fsPromises.writeFile( 'utf-8' ) ) +await transaction.rollback() await database.kick() await database.kick() diff --git a/logjson.mjs b/logjson.mjs index 4a3d09d..70cb975 100644 --- a/logjson.mjs +++ b/logjson.mjs @@ -3,7 +3,8 @@ import fsPromises from 'fs/promises' import Mutex from './Mutex.mjs' class CachedValue { - constructor(value, stale_count) { + constructor(refs, value, stale_count) { + this.refs = refs this.value = value this.stale_count = stale_count } @@ -12,23 +13,30 @@ class CachedValue { let open_angle = Buffer.from('\n<', 'utf-8') let close_angle = Buffer.from('>\n', 'utf-8') class Database { - constructor(read_timeout) { + constructor(read_timeout, write_timeout) { this.read_timeout = read_timeout || 3600 + this.write_timeout = write_timeout || 5 this.mutex = new Mutex() this.log = null - this.eof = 0 + this.eof = 0 // before write_list + this.eof1 = 0 // after write_list this.value = undefined this.read_cache = {} + this.write_list = [] + this.write_count = 0 } async open(path) { await this.mutex.acquire() assert(this.log === null) - this.eof = 0 + this.eof = 0 // before write_list + this.eof1 = 0 // after write_list this.value = undefined this.read_cache = {} - + this.write_list = [] + this.write_count = 0 + try { this.log = await fsPromises.open(path, 'r+') } @@ -43,7 +51,7 @@ class Database { return } - this.eof = (await this.log.stat()).size + this.eof = this.eof1 = (await this.log.stat()).size let eof_buffer try { @@ -121,6 +129,7 @@ class Database { console.log('warning: garbage after root, truncating database file') await this.log.truncate(eof) this.eof = eof + this.eof1 = eof } this.mutex.release() @@ -130,6 +139,7 @@ class Database { await this.mutex.acquire() assert(this.log !== null) + await this.flush() await this.log.close() this.log = null @@ -137,6 +147,7 @@ class Database { } async read(ptr_len) { + // hold mutex whilst calling this function let [ptr, len] = ptr_len if (Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) { @@ -154,48 +165,67 @@ class Database { if (this.read_timeout) //{ //console.log('add', ptr) - this.read_cache[ptr] = new CachedValue(value, this.read_timeout) + this.read_cache[ptr] = new CachedValue(0, value, this.read_timeout) //} + return value } - async write(value) { + write(value) { + // hold mutex whilst calling this function let buffer = Buffer.from( JSON.stringify(value, null, 2) + '\n', 'utf-8' ) - let ptr = this.eof + let ptr = this.eof1 let len = buffer.length - if (this.read_timeout) { - //console.log('addw', ptr) - assert(!Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) - this.read_cache[ptr] = new CachedValue(value, this.read_timeout) - } + //console.log('addw', ptr) + assert(!Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) + this.read_cache[ptr] = new CachedValue(1, value, this.read_timeout) - assert( - (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len - ) - this.eof += len + this.write_list.push(buffer) + this.eof1 += len return [ptr, len - 1] } - async write_root(value) { + write_root(value) { + // hold mutex whilst calling this function let buffer = Buffer.from( `<${JSON.stringify(value, null, 2)}>\n`, 'utf-8' ) + this.write_list.push(buffer) + this.eof1 += buffer.length + } + async flush() { + // hold mutex whilst calling this function let ptr = this.eof - let len = buffer.length - assert( - (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len - ) - this.eof += len + for (let i = 0; i < this.write_list.length; ++i) { + let buffer = this.write_list[i] + let len = buffer.length + assert( + (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len + ) + + // each block in write list is also pinned in read cache, unpin + // (if not in read cache then it's special root entry with < >) + if (Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) { + let cached_value = this.read_cache[ptr] + --cached_value.refs + if (cached_value.stale_count === 0) + delete this.read_cache[ptr] + } - return [ptr + 1, len - 3] + ptr += len + } + assert(ptr === this.eof1) + this.eof = ptr + + this.write_list = [] } Transaction() { @@ -211,6 +241,12 @@ class Database { delete this.read_cache[i] //} } + + if (this.write_count && --this.write_count === 0) { + await this.mutex.acquire() + await this.flush() + this.mutex.release() + } } } @@ -264,10 +300,20 @@ class Transaction { } if (this.dirty) { - await this.database.write_root(this.value) + this.database.write_root(this.value) this.database.value = this.value } + if ( + this.database.eof1 > this.database.eof && + this.database.write_count === 0 + ) { + if (this.database.write_timeout === 0) + await this.database.flush() + else + this.database.write_count = this.database.write_timeout + } + this.database.mutex.release() } @@ -398,7 +444,7 @@ class LazyArray extends Lazy { } if (this.ptr_len === null) { - this.ptr_len = await this.transaction.database.write(this.array) + this.ptr_len = this.transaction.database.write(this.array) return true } return false @@ -465,7 +511,7 @@ class LazyObject extends Lazy { } if (this.ptr_len === null) { - this.ptr_len = await this.transaction.database.write(this.object) + this.ptr_len = this.transaction.database.write(this.object) return true } return false -- 2.34.1