import Mutex from './Mutex.mjs'
class CachedValue {
- constructor(value, stale_count) {
+ constructor(refs, value, stale_count) {
+ this.refs = refs
this.value = value
this.stale_count = stale_count
}
let open_angle = Buffer.from('\n<', 'utf-8')
let close_angle = Buffer.from('>\n', 'utf-8')
class Database {
- constructor(read_timeout) {
+ constructor(read_timeout, write_timeout) {
this.read_timeout = read_timeout || 3600
+ this.write_timeout = write_timeout || 5
this.mutex = new Mutex()
this.log = null
- this.eof = 0
+ this.eof = 0 // before write_list
+ this.eof1 = 0 // after write_list
this.value = undefined
this.read_cache = {}
+ this.write_list = []
+ this.write_count = 0
}
async open(path) {
await this.mutex.acquire()
assert(this.log === null)
- this.eof = 0
+ this.eof = 0 // before write_list
+ this.eof1 = 0 // after write_list
this.value = undefined
this.read_cache = {}
-
+ this.write_list = []
+ this.write_count = 0
+
try {
this.log = await fsPromises.open(path, 'r+')
}
return
}
- this.eof = (await this.log.stat()).size
+ this.eof = this.eof1 = (await this.log.stat()).size
let eof_buffer
try {
console.log('warning: garbage after root, truncating database file')
await this.log.truncate(eof)
this.eof = eof
+ this.eof1 = eof
}
this.mutex.release()
await this.mutex.acquire()
assert(this.log !== null)
+ await this.flush()
await this.log.close()
this.log = null
}
async read(ptr_len) {
+ // hold mutex whilst calling this function
let [ptr, len] = ptr_len
if (Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) {
if (this.read_timeout) //{
//console.log('add', ptr)
- this.read_cache[ptr] = new CachedValue(value, this.read_timeout)
+ this.read_cache[ptr] = new CachedValue(0, value, this.read_timeout)
//}
+
return value
}
- async write(value) {
+ write(value) {
+ // hold mutex whilst calling this function
let buffer = Buffer.from(
JSON.stringify(value, null, 2) + '\n',
'utf-8'
)
- let ptr = this.eof
+ let ptr = this.eof1
let len = buffer.length
- if (this.read_timeout) {
- //console.log('addw', ptr)
- assert(!Object.prototype.hasOwnProperty.call(this.read_cache, ptr))
- this.read_cache[ptr] = new CachedValue(value, this.read_timeout)
- }
+ //console.log('addw', ptr)
+ assert(!Object.prototype.hasOwnProperty.call(this.read_cache, ptr))
+ this.read_cache[ptr] = new CachedValue(1, value, this.read_timeout)
- assert(
- (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len
- )
- this.eof += len
+ this.write_list.push(buffer)
+ this.eof1 += len
return [ptr, len - 1]
}
- async write_root(value) {
+ write_root(value) {
+ // hold mutex whilst calling this function
let buffer = Buffer.from(
`<${JSON.stringify(value, null, 2)}>\n`,
'utf-8'
)
+ this.write_list.push(buffer)
+ this.eof1 += buffer.length
+ }
+ async flush() {
+ // hold mutex whilst calling this function
let ptr = this.eof
- let len = buffer.length
- assert(
- (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len
- )
- this.eof += len
+ for (let i = 0; i < this.write_list.length; ++i) {
+ let buffer = this.write_list[i]
+ let len = buffer.length
+ assert(
+ (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len
+ )
+
+ // each block in write list is also pinned in read cache, unpin
+ // (if not in read cache then it's special root entry with < >)
+ if (Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) {
+ let cached_value = this.read_cache[ptr]
+ --cached_value.refs
+ if (cached_value.stale_count === 0)
+ delete this.read_cache[ptr]
+ }
- return [ptr + 1, len - 3]
+ ptr += len
+ }
+ assert(ptr === this.eof1)
+ this.eof = ptr
+
+ this.write_list = []
}
Transaction() {
delete this.read_cache[i]
//}
}
+
+ if (this.write_count && --this.write_count === 0) {
+ await this.mutex.acquire()
+ await this.flush()
+ this.mutex.release()
+ }
}
}
}
if (this.dirty) {
- await this.database.write_root(this.value)
+ this.database.write_root(this.value)
this.database.value = this.value
}
+ if (
+ this.database.eof1 > this.database.eof &&
+ this.database.write_count === 0
+ ) {
+ if (this.database.write_timeout === 0)
+ await this.database.flush()
+ else
+ this.database.write_count = this.database.write_timeout
+ }
+
this.database.mutex.release()
}
}
if (this.ptr_len === null) {
- this.ptr_len = await this.transaction.database.write(this.array)
+ this.ptr_len = this.transaction.database.write(this.array)
return true
}
return false
}
if (this.ptr_len === null) {
- this.ptr_len = await this.transaction.database.write(this.object)
+ this.ptr_len = this.transaction.database.write(this.object)
return true
}
return false