return value
}
- write(value) {
+ async write(value) {
// hold mutex whilst calling this function
- let buffer = Buffer.from(
- JSON.stringify(value, null, 2) + '\n',
- 'utf-8'
- )
-
let ptr = this.eof + this.write_list_len
- let len = buffer.length
-
//console.log('addw', ptr)
assert(!Object.prototype.hasOwnProperty.call(this.read_cache, ptr))
this.read_cache[ptr] = new CachedValue(1, value, this.read_timeout)
+ let buffer = Buffer.from(
+ JSON.stringify(value, null, 2) + '\n',
+ 'utf-8'
+ )
this.write_list.push(buffer)
- this.write_list_len += len
+ this.write_list_len += buffer.length
- return [ptr, len - 1]
+ if (this.write_list_len >= this.block_size)
+ await this.flush()
+
+ return [ptr, buffer.length - 1]
}
- write_root(value) {
+ async write_root(value) {
// hold mutex whilst calling this function
let buffer = Buffer.from(
`<${JSON.stringify(value, null, 2)}>\n`,
)
this.write_list.push(buffer)
this.write_list_len += buffer.length
+
+ if (this.write_list_len >= this.block_size)
+ await this.flush()
}
async flush() {
// hold mutex whilst calling this function
- let ptr = this.eof, len = 0, i = 0, j = 0
- for (; j < this.write_list.length; ++j) {
- if (len >= this.block_size) {
- let buffer = Buffer.concat(this.write_list.slice(i, j))
- assert(buffer.length === len)
- assert(
- (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len
- )
-
- ptr += len
- len = 0
- i = j
- }
- len += this.write_list[j].length
- }
- let buffer = Buffer.concat(this.write_list.slice(i, j))
- assert(buffer.length === len)
+ if (this.write_list_len === 0)
+ return
+
+ let buffer = Buffer.concat(this.write_list)
+ let len = buffer.length
+ assert(len === this.write_list_len)
assert(
- (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len
+ (await this.log.write(buffer, 0, len, this.eof)).bytesWritten === len
)
- ptr += len
- assert(ptr === this.eof + this.write_list_len)
// each block in write list is also pinned in read cache, unpin
// note: do this afterwards as needs to be on disk when unpinned
// note: special root entry is not in read cache and is ignored
- /*let*/ ptr = this.eof
- for (/*let*/ i = 0; i < this.write_list.length; ++i) {
- if (Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) {
+ let ptr = this.eof
+ for (let i = 0; i < this.write_list.length; ++i) {
+ if (this.write_list[i][0] !== 0x3c) {
let cached_value = this.read_cache[ptr]
--cached_value.refs
if (cached_value.stale_count === 0)
ptr += this.write_list[i].length
}
assert(ptr === this.eof + this.write_list_len)
-
this.eof = ptr
+
this.write_list = []
this.write_list_len = 0
}
}
async kick() {
+ await this.mutex.acquire()
+
for (let i in this.read_cache) {
let cached_value = this.read_cache[i]
- if (--cached_value.stale_count === 0) //{
+ if (cached_value.refs === 0 && --cached_value.stale_count === 0) //{
//console.log('stale', i)
delete this.read_cache[i]
//}
}
- if (this.write_count && --this.write_count === 0) {
- await this.mutex.acquire()
+ if (this.write_count && --this.write_count === 0)
await this.flush()
- this.mutex.release()
- }
+
+ this.mutex.release()
}
}