Add write list
authorNick Downing <nick@ndcode.org>
Fri, 7 Jan 2022 05:02:08 +0000 (16:02 +1100)
committerNick Downing <nick@ndcode.org>
Fri, 7 Jan 2022 05:02:08 +0000 (16:02 +1100)
a.mjs
logjson.mjs

diff --git a/a.mjs b/a.mjs
index 3b27c31..89f8fc8 100755 (executable)
--- a/a.mjs
+++ b/a.mjs
@@ -3,7 +3,7 @@
 import logjson from './logjson.mjs'
 import fsPromises from 'fs/promises'
 
-let database = new logjson.Database(5)
+let database = new logjson.Database(5, 1)
 await database.open('a.logjson')
 
 let transaction = database.Transaction()
@@ -26,6 +26,7 @@ await fsPromises.writeFile(
     'utf-8'
   )
 )
+await transaction.rollback()
 
 await database.kick()
 await database.kick()
index 4a3d09d..70cb975 100644 (file)
@@ -3,7 +3,8 @@ import fsPromises from 'fs/promises'
 import Mutex from './Mutex.mjs'
 
 class CachedValue {
-  constructor(value, stale_count) {
+  constructor(refs, value, stale_count) {
+    this.refs = refs
     this.value = value
     this.stale_count = stale_count
   }
@@ -12,23 +13,30 @@ class CachedValue {
 let open_angle = Buffer.from('\n<', 'utf-8')
 let close_angle = Buffer.from('>\n', 'utf-8')
 class Database {
-  constructor(read_timeout) {
+  constructor(read_timeout, write_timeout) {
     this.read_timeout = read_timeout || 3600
+    this.write_timeout = write_timeout || 5
     this.mutex = new Mutex()
     this.log = null
-    this.eof = 0
+    this.eof = 0 // before write_list
+    this.eof1 = 0 // after write_list
     this.value = undefined
     this.read_cache = {}
+    this.write_list = []
+    this.write_count = 0
   }
 
   async open(path) {
     await this.mutex.acquire()
     assert(this.log === null)
 
-    this.eof = 0
+    this.eof = 0 // before write_list
+    this.eof1 = 0 // after write_list
     this.value = undefined
     this.read_cache = {}
-
+    this.write_list = []
+    this.write_count = 0
     try {
       this.log = await fsPromises.open(path, 'r+')
     }
@@ -43,7 +51,7 @@ class Database {
       return
     }
 
-    this.eof = (await this.log.stat()).size
+    this.eof = this.eof1 = (await this.log.stat()).size
 
     let eof_buffer
     try {
@@ -121,6 +129,7 @@ class Database {
       console.log('warning: garbage after root, truncating database file')
       await this.log.truncate(eof)
       this.eof = eof
+      this.eof1 = eof
     }
 
     this.mutex.release()
@@ -130,6 +139,7 @@ class Database {
     await this.mutex.acquire()
     assert(this.log !== null)
 
+    await this.flush()
     await this.log.close()
     this.log = null
 
@@ -137,6 +147,7 @@ class Database {
   }
 
   async read(ptr_len) {
+    // hold mutex whilst calling this function
     let [ptr, len] = ptr_len
 
     if (Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) {
@@ -154,48 +165,67 @@ class Database {
 
     if (this.read_timeout) //{
       //console.log('add', ptr)
-      this.read_cache[ptr] = new CachedValue(value, this.read_timeout)
+      this.read_cache[ptr] = new CachedValue(0, value, this.read_timeout)
     //}
+
     return value
   }
 
-  async write(value) {
+  write(value) {
+    // hold mutex whilst calling this function
     let buffer = Buffer.from(
       JSON.stringify(value, null, 2) + '\n',
       'utf-8'
     )
 
-    let ptr = this.eof
+    let ptr = this.eof1
     let len = buffer.length
 
-    if (this.read_timeout) {
-      //console.log('addw', ptr)
-      assert(!Object.prototype.hasOwnProperty.call(this.read_cache, ptr))
-      this.read_cache[ptr] = new CachedValue(value, this.read_timeout)
-    }
+    //console.log('addw', ptr)
+    assert(!Object.prototype.hasOwnProperty.call(this.read_cache, ptr))
+    this.read_cache[ptr] = new CachedValue(1, value, this.read_timeout)
 
-    assert(
-      (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len
-    )
-    this.eof += len
+    this.write_list.push(buffer)
+    this.eof1 += len
 
     return [ptr, len - 1]
   }
 
-  async write_root(value) {
+  write_root(value) {
+    // hold mutex whilst calling this function
     let buffer = Buffer.from(
       `<${JSON.stringify(value, null, 2)}>\n`,
       'utf-8'
     )
+    this.write_list.push(buffer)
+    this.eof1 += buffer.length
+  }
 
+  async flush() {
+    // hold mutex whilst calling this function
     let ptr = this.eof
-    let len = buffer.length
-    assert(
-      (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len
-    )
-    this.eof += len
+    for (let i = 0; i < this.write_list.length; ++i) {
+      let buffer = this.write_list[i]
+      let len = buffer.length
+      assert(
+        (await this.log.write(buffer, 0, len, ptr)).bytesWritten === len
+      )
+
+      // each block in write list is also pinned in read cache, unpin
+      // (if not in read cache then it's special root entry with < >)
+      if (Object.prototype.hasOwnProperty.call(this.read_cache, ptr)) {
+        let cached_value = this.read_cache[ptr]
+        --cached_value.refs
+        if (cached_value.stale_count === 0)
+          delete this.read_cache[ptr]
+      }
 
-    return [ptr + 1, len - 3]
+      ptr += len
+    }
+    assert(ptr === this.eof1)
+    this.eof = ptr
+
+    this.write_list = []
   }
 
   Transaction() {
@@ -211,6 +241,12 @@ class Database {
         delete this.read_cache[i]
       //}
     }
+
+    if (this.write_count && --this.write_count === 0) {
+      await this.mutex.acquire()
+      await this.flush()
+      this.mutex.release()
+    }
   }
 }
 
@@ -264,10 +300,20 @@ class Transaction {
     }
 
     if (this.dirty) {
-      await this.database.write_root(this.value)
+      this.database.write_root(this.value)
       this.database.value = this.value
     }
 
+    if (
+      this.database.eof1 > this.database.eof &&
+      this.database.write_count === 0
+    ) {
+      if (this.database.write_timeout === 0)
+        await this.database.flush()
+      else
+        this.database.write_count = this.database.write_timeout
+    }
+
     this.database.mutex.release()
   }
 
@@ -398,7 +444,7 @@ class LazyArray extends Lazy {
     }
 
     if (this.ptr_len === null) {
-      this.ptr_len = await this.transaction.database.write(this.array)
+      this.ptr_len = this.transaction.database.write(this.array)
       return true
     }
     return false
@@ -465,7 +511,7 @@ class LazyObject extends Lazy {
     }
 
     if (this.ptr_len === null) {
-      this.ptr_len = await this.transaction.database.write(this.object)
+      this.ptr_len = this.transaction.database.write(this.object)
       return true
     }
     return false