Add immediate flush when Database.write_list_len exceeds Database.block_size, fix...
authorNick Downing <nick@ndcode.org>
Sat, 8 Jan 2022 01:39:42 +0000 (12:39 +1100)
committerNick Downing <nick@ndcode.org>
Sat, 8 Jan 2022 01:45:21 +0000 (12:45 +1100)
Database.mjs
LazyArray.mjs
LazyObject.mjs
Transaction.mjs

index 732401c..315428d 100644 (file)
@@ -165,27 +165,27 @@ class Database {
     return value
   }
 
-  write(value) {
+  async write(value) {
     // hold mutex whilst calling this function
-    let buffer = Buffer.from(
-      JSON.stringify(value, null, 2) + '\n',
-      'utf-8'
-    )
-
     let ptr = this.eof + this.write_list_len
-    let len = buffer.length
-
     //console.log('addw', ptr)
     assert(!Object.prototype.hasOwnProperty.call(this.read_cache, ptr))
     this.read_cache[ptr] = new CachedValue(1, value, this.read_timeout)
 
+    let buffer = Buffer.from(
+      JSON.stringify(value, null, 2) + '\n',
+      'utf-8'
+    )
     this.write_list.push(buffer)
-    this.write_list_len += len
+    this.write_list_len += buffer.length
 
-    return [ptr, len - 1]
+    if (this.write_list_len >= this.block_size)
+      await this.flush()
+
+    return [ptr, buffer.length - 1]
   }
 
-  write_root(value) {
+  async write_root(value) {
     // hold mutex whilst calling this function
     let buffer = Buffer.from(
       `<${JSON.stringify(value, null, 2)}>\n`,
@@ -193,39 +193,29 @@ class Database {
     )
     this.write_list.push(buffer)
     this.write_list_len += buffer.length
+
+    if (this.write_list_len >= this.block_size)
+      await this.flush()
   }
 
   async flush() {
     // hold mutex whilst calling this function
-    let ptr = this.eof, len = 0, i = 0, j = 0
-    for (; j < this.write_list.length; ++j) {
-      if (len >= this.block_size) {
-        let buffer = Buffer.concat(this.write_list.slice(i, j))
-        assert(buffer.length === len)
-        assert(
-          (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len
-        )
-
-        ptr += len
-        len = 0
-        i = j
-      }
-      len += this.write_list[j].length
-    }
-    let buffer = Buffer.concat(this.write_list.slice(i, j))
-    assert(buffer.length === len)
+    if (this.write_list_len === 0)
+      return
+
+    let buffer = Buffer.concat(this.write_list)
+    let len = buffer.length
+    assert(len === this.write_list_len)
     assert(
-      (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len
+      (await this.log.write(buffer, 0, len, this.eof)).bytesWritten === len
     )
-    ptr += len
-    assert(ptr === this.eof + this.write_list_len)
 
     // each block in write list is also pinned in read cache, unpin
     // note: do this afterwards as needs to be on disk when unpinned
     // note: special root entry is not in read cache and is ignored
-    /*let*/ ptr = this.eof
-    for (/*let*/ i = 0; i < this.write_list.length; ++i) {
-      if (Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) {
+    let ptr = this.eof
+    for (let i = 0; i < this.write_list.length; ++i) {
+      if (this.write_list[i][0] !== 0x3c) {
         let cached_value = this.read_cache[ptr]
         --cached_value.refs
         if (cached_value.stale_count === 0)
@@ -234,8 +224,8 @@ class Database {
       ptr += this.write_list[i].length
     }
     assert(ptr === this.eof + this.write_list_len)
-
     this.eof = ptr
+
     this.write_list = []
     this.write_list_len = 0
   }
@@ -246,19 +236,20 @@ class Database {
   }
 
   async kick() {
+    await this.mutex.acquire()
+
     for (let i in this.read_cache) {
       let cached_value = this.read_cache[i]
-      if (--cached_value.stale_count === 0) //{
+      if (cached_value.refs === 0 && --cached_value.stale_count === 0) //{
         //console.log('stale', i)
         delete this.read_cache[i]
       //}
     }
 
-    if (this.write_count && --this.write_count === 0) {
-      await this.mutex.acquire()
+    if (this.write_count && --this.write_count === 0)
       await this.flush()
-      this.mutex.release()
-    }
+
+    this.mutex.release()
   }
 }
 
index 1beba9e..f53988b 100644 (file)
@@ -68,7 +68,7 @@ class LazyArray extends LazyValue {
     }
 
     if (this.ptr_len === null) {
-      this.ptr_len = this.transaction.database.write(this.array)
+      this.ptr_len = await this.transaction.database.write(this.array)
       return true
     }
     return false
index 080abe8..51a3e4a 100644 (file)
@@ -61,7 +61,7 @@ class LazyObject extends LazyValue {
     }
 
     if (this.ptr_len === null) {
-      this.ptr_len = this.transaction.database.write(this.object)
+      this.ptr_len = await this.transaction.database.write(this.object)
       return true
     }
     return false
index e23110e..de6e48f 100644 (file)
@@ -55,7 +55,7 @@ class Transaction {
     }
 
     if (this.dirty) {
-      this.database.write_root(this.value)
+      await this.database.write_root(this.value)
       this.database.value = this.value
     }