Implement log file rotation
authorNick Downing <nick@ndcode.org>
Sat, 8 Jan 2022 03:30:23 +0000 (14:30 +1100)
committerNick Downing <nick@ndcode.org>
Sun, 9 Jan 2022 00:29:36 +0000 (11:29 +1100)
.gitignore
CopyCollector.mjs [new file with mode: 0644]
Database.mjs
Transaction.mjs
tests/a.mjs
tests/b.mjs

index 446fa49..21b59fd 100644 (file)
@@ -4,4 +4,6 @@
 /tests/a.json
 /tests/b.json
 /tests/c.json
+/tests/d.json
 /tests/a.logjson
+/tests/b.logjson
diff --git a/CopyCollector.mjs b/CopyCollector.mjs
new file mode 100644 (file)
index 0000000..8eee7e3
--- /dev/null
@@ -0,0 +1,116 @@
+import assert from 'assert'
+import fsPromises from 'fs/promises'
+
+class CopyCollector {
+  constructor(block_size) {
+    this.block_size = block_size || 0x1000
+    this.log = null
+    this.eof = 0
+    this.translate = {}
+    this.write_list = []
+    this.write_list_len = 0
+  }
+
+  async create(path) {
+    assert(this.log === null)
+
+    this.log = await fsPromises.open(path, 'w+')
+    this.eof = 0
+    this.translate = {}
+    this.write_list = []
+    this.write_list_len = 0
+  }
+
+  async close() {
+    assert(this.log !== null)
+
+    //await this.flush()
+    await this.log.close()
+    this.log = null
+  }
+
+  async write(value) {
+    let ptr = this.eof + this.write_list_len
+
+    let buffer = Buffer.from(
+      JSON.stringify(value, null, 2) + '\n',
+      'utf-8'
+    )
+    this.write_list.push(buffer)
+    this.write_list_len += buffer.length
+
+    if (this.write_list_len >= this.block_size)
+      await this.flush()
+
+    return [ptr, buffer.length - 1]
+  }
+
+  async write_root(value) {
+    let buffer = Buffer.from(
+      `<${JSON.stringify(value, null, 2)}>\n`,
+      'utf-8'
+    )
+    this.write_list.push(buffer)
+    this.write_list_len += buffer.length
+
+    if (this.write_list_len >= this.block_size)
+      await this.flush()
+  }
+
+  async flush() {
+    if (this.write_list_len === 0)
+      return
+
+    let buffer = Buffer.concat(this.write_list)
+    let len = buffer.length
+    assert(len === this.write_list_len)
+    assert(
+      (await this.log.write(buffer, 0, len, this.eof)).bytesWritten === len
+    )
+    this.eof += len
+
+    this.write_list = []
+    this.write_list_len = 0
+  }
+
+  async copy_pass(log, value) {
+    let copy = async ptr_len => {
+      let [ptr, len] = ptr_len
+
+      if (Object.prototype.hasOwnProperty.call(this.translate, ptr))
+        return this.translate[ptr]
+
+      let buffer = Buffer.alloc(len)
+      assert(
+        (await log.read(buffer, 0, len, ptr)).bytesRead === len
+      )
+      let value = JSON.parse(buffer.toString('utf-8'))
+
+      assert(typeof value === 'object' && value !== null)
+      if (value instanceof Array)
+        for (let i = 0; i < value.length; ++i) {
+          let child_value = value[i]
+          if (child_value instanceof Array)
+            value[i] = await copy(child_value)
+        }
+      else
+        for (let i in value) {
+          let child_value = value[i]
+          if (child_value instanceof Array)
+            value[i] = await copy(child_value)
+        }
+
+      let result = await this.write(value)
+      this.translate[ptr] = result
+      return result
+    }
+
+    if (value instanceof Array)
+      value = await copy(value)
+    await this.write_root(value)
+    await this.flush()
+    return value
+  }
+}
+
+export default CopyCollector
index 315428d..f0d40a4 100644 (file)
@@ -1,6 +1,7 @@
 import assert from 'assert'
 import fsPromises from 'fs/promises'
 import CachedValue from './CachedValue.mjs'
+import CopyCollector from './CopyCollector.mjs'
 import Mutex from './Mutex.mjs'
 import Transaction from './Transaction.mjs'
 
@@ -12,6 +13,7 @@ class Database {
     this.write_timeout = write_timeout || 5
     this.block_size = block_size || 0x1000
     this.mutex = new Mutex()
+    this.log_path = null
     this.log = null
     this.eof = 0
     this.value = undefined
@@ -25,7 +27,8 @@ class Database {
     await this.mutex.acquire()
     assert(this.log === null)
 
-    this.eof = 0 // before write_list
+    this.log_path = path
+    this.eof = 0
     this.value = undefined
     this.read_cache = {}
     this.write_list = []
@@ -39,11 +42,19 @@ class Database {
       if (error.code !== 'ENOENT')
         throw error
 
-      console.log('warning: can\'t find database file, creating')
-      this.log = await fsPromises.open(path, 'w+')
+      try {
+        await fsPromises.rename(path + '.new', path)
+        this.log = await fsPromises.open(path, 'r+')
+      }
+      catch (error) {
+        console.log('warning: can\'t find database file, creating')
+        this.log = await fsPromises.open(path, 'w+')
 
-      this.mutex.release()
-      return
+        this.mutex.release()
+        return
+      }
+
+      console.log('warning: completed incomplete log file rotation')
     }
 
     this.eof = (await this.log.stat()).size
@@ -251,6 +262,93 @@ class Database {
 
     this.mutex.release()
   }
+
+  async rotate(archive_path) {
+    // note: this function is not re-entrant
+    // it should only be called from a garbage collection thread
+    let copy_collector = new CopyCollector()
+    let new_path = this.log_path + '.new'
+    await copy_collector.create(new_path)
+
+    await this.mutex.acquire()
+    await this.flush()
+
+    let value, new_value
+    do {
+      value = this.value
+      this.mutex.release()
+
+      new_value = await copy_collector.copy_pass(this.log, value)
+
+      await this.mutex.acquire()
+      await this.flush()
+    } while (value !== this.value)
+
+    // since root has not changed since last flush, can't be anything dirty
+    assert(this.write_list_len === 0)
+
+    // rotate the log files
+    await this.log.close()
+    await fsPromises.rename(this.log_path, archive_path)
+    await fsPromises.rename(new_path, this.log_path)
+    this.log = copy_collector.log
+    this.eof = copy_collector.eof
+    this.value = new_value
+
+    // translate read cache via the copy collector's translation table,
+    // done so that website performance is less affected by the rotation
+    let old_read_cache = this.read_cache
+    this.read_cache = {}
+    for (let i in old_read_cache) {
+      let cached_value = old_read_cache[i]
+      assert(cached_value.refs === 0) // can't be anything dirty
+
+      if (
+        Object.prototype.hasOwnProperty.call(
+          copy_collector.translate,
+          i
+        )
+      ) {
+        // cached object is reachable, translate (child objects reachable too)
+        let value = cached_value.value
+
+        assert(typeof(value) === 'object' && value !== null)
+        if (value instanceof Array)
+          for (let i = 0; i < value.length; ++i) {
+            let child_value = value[i]
+            if (child_value instanceof Array) {
+              let [ptr, len] = child_value
+              assert(
+                Object.prototype.hasOwnProperty.call(
+                  copy_collector.translate,
+                  ptr
+                )
+              )
+              value[i] = copy_collector.translate[ptr]
+            }
+          }
+        else
+          for (let i in value) {
+            let child_value = value[i]
+            if (child_value instanceof Array) {
+              let [ptr, len] = child_value
+              assert(
+                Object.prototype.hasOwnProperty.call(
+                  copy_collector.translate,
+                  ptr
+                )
+              )
+              value[i] = copy_collector.translate[ptr]
+            }
+          }
+
+        let [ptr, len] = copy_collector.translate[i]
+        this.read_cache[ptr] = cached_value
+      }
+    }
+
+    this.mutex.release()
+  }
 }
 
 export default Database
index de6e48f..ad5e497 100644 (file)
@@ -15,12 +15,12 @@ class Transaction {
 
   async read(ptr_len) {
     let value = await this.database.read(ptr_len)
-    if (typeof value === 'object' && value !== null)
-      value =
-        value instanceof Array ?
-          new LazyArray(this, ptr_len, value) :
-          new LazyObject(this, ptr_len, value)
-    return value
+    assert(typeof value === 'object' && value !== null)
+    return (
+      value instanceof Array ?
+        new LazyArray(this, ptr_len, value) :
+        new LazyObject(this, ptr_len, value)
+    )
   }
 
   async get(default_value) {
index 99be049..ccaacfb 100755 (executable)
@@ -48,4 +48,20 @@ await fsPromises.writeFile(
 )
 transaction.rollback()
 
+await database.rotate('b.logjson')
+
+/*let*/ transaction = database.Transaction()
+await fsPromises.writeFile(
+  'c.json',
+  Buffer.from(
+    JSON.stringify(
+      await logjson.logjson_to_json(await transaction.get()),
+      null,
+      2
+    ) + '\n',
+    'utf-8'
+  )
+)
+transaction.rollback()
+
 await database.close()
index 7c0e591..fba6893 100755 (executable)
@@ -7,7 +7,7 @@ let database = new logjson.Database()
 await database.open('a.logjson')
 let transaction = database.Transaction()
 await fsPromises.writeFile(
-  'c.json',
+  'd.json',
   Buffer.from(
     JSON.stringify(
       await logjson.logjson_to_json(await transaction.get()),