diff --git a/package-lock.json b/package-lock.json index 937e51fc..3097bb43 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5,7 +5,6 @@ "requires": true, "packages": { "": { - "name": "audiobookshelf", "version": "1.7.2", "license": "GPL-3.0", "dependencies": { @@ -23,7 +22,6 @@ "image-type": "^4.1.0", "jsonwebtoken": "^8.5.1", "libgen": "^2.1.0", - "njodb": "^0.4.29", "node-cron": "^3.0.0", "node-ffprobe": "^3.0.0", "node-stream-zip": "^1.15.0", @@ -1357,14 +1355,6 @@ "node": ">= 0.6" } }, - "node_modules/njodb": { - "version": "0.4.32", - "resolved": "https://registry.npmjs.org/njodb/-/njodb-0.4.32.tgz", - "integrity": "sha512-aChloK6swcwQ1dy7/gxPABCb6qGQkwc3dMR5p+Rn/c37Zifl9NrfVwlWdWtFJgOZsC2+bwnBPGeHoY64avVyZQ==", - "dependencies": { - "proper-lockfile": "^4.1.2" - } - }, "node_modules/node-cron": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-3.0.0.tgz", @@ -1516,16 +1506,6 @@ "resolved": "https://registry.npmjs.org/promise-concurrency-limiter/-/promise-concurrency-limiter-1.0.0.tgz", "integrity": "sha512-OI96yL5DUck9KCLee5H6DnRfVsHIstQspXk8xsYrWr9ur9IlFuzKvoU70HwQb99MqHg2mpdkuGa92NuoXue3cw==" }, - "node_modules/proper-lockfile": { - "version": "4.1.2", - "resolved": "https://registry.npmjs.org/proper-lockfile/-/proper-lockfile-4.1.2.tgz", - "integrity": "sha512-TjNPblN4BwAWMXU8s9AEz4JmQxnD1NNL7bNOY/AKUzyamc379FWASUhc/K1pL2noVb+XmZKLL68cjzLsiOAMaA==", - "dependencies": { - "graceful-fs": "^4.2.4", - "retry": "^0.12.0", - "signal-exit": "^3.0.2" - } - }, "node_modules/proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", @@ -1688,14 +1668,6 @@ "lowercase-keys": "^2.0.0" } }, - "node_modules/retry": { - "version": "0.12.0", - "resolved": "https://registry.npmjs.org/retry/-/retry-0.12.0.tgz", - "integrity": "sha1-G0KmJmoh8HQh0bC1S33BZ7AcATs=", - "engines": { - "node": ">= 4" - } - }, "node_modules/ripstat": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/ripstat/-/ripstat-1.1.1.tgz", @@ -1788,11 +1760,6 @@ "resolved": "https://registry.npmjs.org/setprototypeof/-/setprototypeof-1.2.0.tgz", "integrity": "sha512-E5LDX7Wrp85Kil5bhZv46j8jOeboKq5JMmYM3gVGdGH8xFpPWXUMsNrlODCrkoxMEeNi/XZIwuRvY4XNwYMJpw==" }, - "node_modules/signal-exit": { - "version": "3.0.7", - "resolved": "https://registry.npmjs.org/signal-exit/-/signal-exit-3.0.7.tgz", - "integrity": "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==" - }, "node_modules/socket.io": { "version": "4.4.1", "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.4.1.tgz", @@ -3172,14 +3139,6 @@ "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==" }, - "njodb": { - "version": "0.4.32", - "resolved": "https://registry.npmjs.org/njodb/-/njodb-0.4.32.tgz", - "integrity": "sha512-aChloK6swcwQ1dy7/gxPABCb6qGQkwc3dMR5p+Rn/c37Zifl9NrfVwlWdWtFJgOZsC2+bwnBPGeHoY64avVyZQ==", - "requires": { - "proper-lockfile": "^4.1.2" - } - }, "node-cron": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-3.0.0.tgz", @@ -3279,16 +3238,6 @@ "resolved": "https://registry.npmjs.org/promise-concurrency-limiter/-/promise-concurrency-limiter-1.0.0.tgz", "integrity": "sha512-OI96yL5DUck9KCLee5H6DnRfVsHIstQspXk8xsYrWr9ur9IlFuzKvoU70HwQb99MqHg2mpdkuGa92NuoXue3cw==" }, - "proper-lockfile": { - "version": "4.1.2", - "resolved": "https://registry.npmjs.org/proper-lockfile/-/proper-lockfile-4.1.2.tgz", - "integrity": "sha512-TjNPblN4BwAWMXU8s9AEz4JmQxnD1NNL7bNOY/AKUzyamc379FWASUhc/K1pL2noVb+XmZKLL68cjzLsiOAMaA==", - "requires": { - "graceful-fs": "^4.2.4", - "retry": "^0.12.0", - "signal-exit": "^3.0.2" - } - }, "proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", @@ -3421,11 +3370,6 @@ "lowercase-keys": "^2.0.0" } }, - "retry": { - "version": "0.12.0", - "resolved": "https://registry.npmjs.org/retry/-/retry-0.12.0.tgz", - "integrity": "sha1-G0KmJmoh8HQh0bC1S33BZ7AcATs=" - }, "ripstat": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/ripstat/-/ripstat-1.1.1.tgz", @@ -3497,11 +3441,6 @@ "resolved": "https://registry.npmjs.org/setprototypeof/-/setprototypeof-1.2.0.tgz", "integrity": "sha512-E5LDX7Wrp85Kil5bhZv46j8jOeboKq5JMmYM3gVGdGH8xFpPWXUMsNrlODCrkoxMEeNi/XZIwuRvY4XNwYMJpw==" }, - "signal-exit": { - "version": "3.0.7", - "resolved": "https://registry.npmjs.org/signal-exit/-/signal-exit-3.0.7.tgz", - "integrity": "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==" - }, "socket.io": { "version": "4.4.1", "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.4.1.tgz", diff --git a/package.json b/package.json index 82d0db3f..46ce3696 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,6 @@ "image-type": "^4.1.0", "jsonwebtoken": "^8.5.1", "libgen": "^2.1.0", - "njodb": "^0.4.29", "node-cron": "^3.0.0", "node-ffprobe": "^3.0.0", "node-stream-zip": "^1.15.0", diff --git a/server/Db.js b/server/Db.js index 2ff44582..dc5b3214 100644 --- a/server/Db.js +++ b/server/Db.js @@ -1,5 +1,6 @@ const Path = require('path') -const njodb = require("njodb") +// const njodb = require("njodb") +const njodb = require('./njodb') const fs = require('fs-extra') const jwt = require('jsonwebtoken') const Logger = require('./Logger') @@ -322,6 +323,16 @@ class Db { return entityDb.update((record) => record.id === entity.id, () => jsonEntity).then((results) => { if (process.env.NODE_ENV !== 'production') { Logger.debug(`[DB] Updated ${entityName}: ${results.updated} | Selected: ${results.selected}`) + + if (!results.selected) { + entityDb.select(match => match.id == jsonEntity.id).then((results) => { + if (results.data.length) { + console.log('Said selected 0 but found it right here...', results.data[0].id) + } else { + console.log('Said selected 0 and no results for json entity id', jsonEntity.id) + } + }) + } } else { Logger.debug(`[DB] Updated ${entityName}: ${results.updated}`) } diff --git a/server/njodb/index.js b/server/njodb/index.js new file mode 100644 index 00000000..b2aaf87b --- /dev/null +++ b/server/njodb/index.js @@ -0,0 +1,492 @@ +"use strict"; + +const { + existsSync, + mkdirSync, + readFileSync, + writeFileSync +} = require("graceful-fs"); + +const { + join, + resolve +} = require("path"); + +const { + aggregateStoreData, + aggregateStoreDataSync, + distributeStoreData, + distributeStoreDataSync, + deleteStoreData, + deleteStoreDataSync, + dropEverything, + dropEverythingSync, + getStoreNames, + getStoreNamesSync, + insertStoreData, + insertStoreDataSync, + insertFileData, + selectStoreData, + selectStoreDataSync, + statsStoreData, + statsStoreDataSync, + updateStoreData, + updateStoreDataSync +} = require("./njodb"); + +const { + Randomizer, + Reducer, + Result +} = require("./objects"); + +const { + validateArray, + validateFunction, + validateName, + validateObject, + validatePath, + validateSize +} = require("./validators"); + +const defaults = { + "datadir": "data", + "dataname": "data", + "datastores": 5, + "tempdir": "tmp", + "lockoptions": { + "stale": 5000, + "update": 1000, + "retries": { + "retries": 5000, + "minTimeout": 250, + "maxTimeout": 5000, + "factor": 0.15, + "randomize": false + } + } +}; + +const mergeProperties = (defaults, userProperties) => { + var target = Object.assign({}, defaults); + + for (let key of Object.keys(userProperties)) { + if (Object.prototype.hasOwnProperty.call(target, key)) { + if (typeof userProperties[key] !== 'object' && !Array.isArray(userProperties[key])) { + Object.assign(target, { [key]: userProperties[key] }); + } else { + target[key] = mergeProperties(target[key], userProperties[key]); + } + } + } + + return target; +} + +const saveProperties = (root, properties) => { + properties = { + "datadir": properties.datadir, + "dataname": properties.dataname, + "datastores": properties.datastores, + "tempdir": properties.tempdir, + "lockoptions": properties.lockoptions + }; + const propertiesFile = join(root, "njodb.properties"); + writeFileSync(propertiesFile, JSON.stringify(properties, null, 4)); + return properties; +} + +process.on("uncaughtException", error => { + if (error.code === "ECOMPROMISED") { + console.error(Object.assign(new Error("Stale lock or attempt to update it after release"), { code: error.code })); + } else { + throw error; + } +}); + +class Database { + + constructor(root, properties = {}) { + validateObject(properties); + + this.properties = {}; + + if (root !== undefined && root !== null) { + validateName(root); + this.properties.root = root; + } else { + this.properties.root = process.cwd(); + } + + if (!existsSync(this.properties.root)) mkdirSync(this.properties.root); + else { + console.log('Db already exists', root) + } + + const propertiesFile = join(this.properties.root, "njodb.properties"); + + if (existsSync(propertiesFile)) { + this.setProperties(JSON.parse(readFileSync(propertiesFile))); + } else { + this.setProperties(mergeProperties(defaults, properties)); + } + + if (!existsSync(this.properties.datapath)) mkdirSync(this.properties.datapath); + if (!existsSync(this.properties.temppath)) mkdirSync(this.properties.temppath); + + this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname); + + return this; + } + + // Database management methods + + getProperties() { + return this.properties; + } + + setProperties(properties) { + validateObject(properties); + + this.properties.datadir = (validateName(properties.datadir)) ? properties.datadir : defaults.datadir; + this.properties.dataname = (validateName(properties.dataname)) ? properties.dataname : defaults.dataname; + this.properties.datastores = (validateSize(properties.datastores)) ? properties.datastores : defaults.datastores; + this.properties.tempdir = (validateName(properties.tempdir)) ? properties.tempdir : defaults.tempdir; + this.properties.lockoptions = (validateObject(properties.lockoptions)) ? properties.lockoptions : defaults.lockoptions; + this.properties.datapath = join(this.properties.root, this.properties.datadir); + this.properties.temppath = join(this.properties.root, this.properties.tempdir); + + saveProperties(this.properties.root, this.properties); + + return this.properties; + } + + async stats() { + var stats = { + root: resolve(this.properties.root), + data: resolve(this.properties.datapath), + temp: resolve(this.properties.temppath) + }; + + var promises = []; + + for (const storename of this.properties.storenames) { + const storepath = join(this.properties.datapath, storename); + promises.push(statsStoreData(storepath, this.properties.lockoptions)); + } + + const results = await Promise.all(promises); + + return Object.assign(stats, Reducer("stats", results)); + } + + statsSync() { + var stats = { + root: resolve(this.properties.root), + data: resolve(this.properties.datapath), + temp: resolve(this.properties.temppath) + }; + + var results = []; + + for (const storename of this.properties.storenames) { + const storepath = join(this.properties.datapath, storename); + results.push(statsStoreDataSync(storepath)); + } + + return Object.assign(stats, Reducer("stats", results)); + } + + async grow() { + this.properties.datastores++; + const results = await distributeStoreData(this.properties); + this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname); + saveProperties(this.properties.root, this.properties); + return results; + } + + growSync() { + this.properties.datastores++; + const results = distributeStoreDataSync(this.properties); + this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname); + saveProperties(this.properties.root, this.properties); + return results; + } + + async shrink() { + if (this.properties.datastores > 1) { + this.properties.datastores--; + const results = await distributeStoreData(this.properties); + this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname); + saveProperties(this.properties.root, this.properties); + return results; + } else { + throw new Error("Database cannot shrink any further"); + } + } + + shrinkSync() { + if (this.properties.datastores > 1) { + this.properties.datastores--; + const results = distributeStoreDataSync(this.properties); + this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname); + saveProperties(this.properties.root, this.properties); + return results; + } else { + throw new Error("Database cannot shrink any further"); + } + } + + async resize(size) { + validateSize(size); + this.properties.datastores = size; + const results = await distributeStoreData(this.properties); + this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname); + saveProperties(this.properties.root, this.properties); + return results; + } + + resizeSync(size) { + validateSize(size); + this.properties.datastores = size; + const results = distributeStoreDataSync(this.properties); + this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname); + saveProperties(this.properties.root, this.properties); + return results; + } + + async drop() { + const results = await dropEverything(this.properties); + return Reducer("drop", results); + } + + dropSync() { + const results = dropEverythingSync(this.properties); + return Reducer("drop", results); + } + + // Data manipulation methods + + async insert(data) { + validateArray(data); + + var promises = []; + var records = []; + + for (let i = 0; i < this.properties.datastores; i++) { + records[i] = ""; + } + + for (let i = 0; i < data.length; i++) { + records[i % this.properties.datastores] += JSON.stringify(data[i]) + "\n"; + } + + const randomizer = Randomizer(Array.from(Array(this.properties.datastores).keys()), false); + + for (var j = 0; j < records.length; j++) { + if (records[j] !== "") { + const storenumber = randomizer.next(); + const storename = [this.properties.dataname, storenumber, "json"].join("."); + const storepath = join(this.properties.datapath, storename) + promises.push(insertStoreData(storepath, records[j], this.properties.lockoptions)); + } + } + + const results = await Promise.all(promises); + + this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname); + + return Reducer("insert", results); + } + + insertSync(data) { + validateArray(data); + + var results = []; + var records = []; + + for (let i = 0; i < this.properties.datastores; i++) { + records[i] = ""; + } + + for (let i = 0; i < data.length; i++) { + records[i % this.properties.datastores] += JSON.stringify(data[i]) + "\n"; + } + + const randomizer = Randomizer(Array.from(Array(this.properties.datastores).keys()), false); + + for (var j = 0; j < records.length; j++) { + if (records[j] !== "") { + const storenumber = randomizer.next(); + const storename = [this.properties.dataname, storenumber, "json"].join("."); + const storepath = join(this.properties.datapath, storename) + results.push(insertStoreDataSync(storepath, records[j], this.properties.lockoptions)); + } + } + + this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname); + + return Reducer("insert", results); + } + + async insertFile(file) { + validatePath(file); + + const results = await insertFileData(file, this.properties.datapath, this.properties.storenames, this.properties.lockoptions); + + return results; + } + + insertFileSync(file) { + validatePath(file); + + const data = readFileSync(file, "utf8").split("\n"); + var records = []; + + var results = Result("insertFile"); + + for (var record of data) { + record = record.trim() + + results.lines++; + + if (record.length > 0) { + try { + records.push(JSON.parse(record)); + } catch (error) { + results.errors.push({ error: error.message, line: results.lines, data: record }); + } + } else { + results.blanks++; + } + } + + return Object.assign(results, this.insertSync(records)); + } + + async select(match, project) { + validateFunction(match); + if (project) validateFunction(project); + + var promises = []; + + for (const storename of this.properties.storenames) { + const storepath = join(this.properties.datapath, storename); + promises.push(selectStoreData(storepath, match, project, this.properties.lockoptions)); + } + + const results = await Promise.all(promises); + return Reducer("select", results); + } + + selectSync(match, project) { + validateFunction(match); + if (project) validateFunction(project); + + var results = []; + + for (const storename of this.properties.storenames) { + const storepath = join(this.properties.datapath, storename); + results.push(selectStoreDataSync(storepath, match, project)); + } + + return Reducer("select", results); + } + + async update(match, update) { + validateFunction(match); + validateFunction(update); + + var promises = []; + + for (const storename of this.properties.storenames) { + const storepath = join(this.properties.datapath, storename); + const tempstorename = [storename, Date.now(), "tmp"].join("."); + const tempstorepath = join(this.properties.temppath, tempstorename); + promises.push(updateStoreData(storepath, match, update, tempstorepath, this.properties.lockoptions)); + } + + const results = await Promise.all(promises); + return Reducer("update", results); + } + + updateSync(match, update) { + validateFunction(match); + validateFunction(update); + + var results = []; + + for (const storename of this.properties.storenames) { + const storepath = join(this.properties.datapath, storename); + const tempstorename = [storename, Date.now(), "tmp"].join("."); + const tempstorepath = join(this.properties.temppath, tempstorename); + results.push(updateStoreDataSync(storepath, match, update, tempstorepath)); + } + + return Reducer("update", results); + } + + async delete(match) { + validateFunction(match); + + var promises = []; + + for (const storename of this.properties.storenames) { + const storepath = join(this.properties.datapath, storename); + const tempstorename = [storename, Date.now(), "tmp"].join("."); + const tempstorepath = join(this.properties.temppath, tempstorename); + promises.push(deleteStoreData(storepath, match, tempstorepath, this.properties.lockoptions)); + } + + const results = await Promise.all(promises); + return Reducer("delete", results); + } + + deleteSync(match) { + validateFunction(match); + + var results = []; + + for (const storename of this.properties.storenames) { + const storepath = join(this.properties.datapath, storename); + const tempstorename = [storename, Date.now(), "tmp"].join("."); + const tempstorepath = join(this.properties.temppath, tempstorename); + results.push(deleteStoreDataSync(storepath, match, tempstorepath)); + } + + return Reducer("delete", results); + } + + async aggregate(match, index, project) { + validateFunction(match); + validateFunction(index); + if (project) validateFunction(project); + + var promises = []; + + for (const storename of this.properties.storenames) { + const storepath = join(this.properties.datapath, storename); + promises.push(aggregateStoreData(storepath, match, index, project, this.properties.lockoptions)); + } + + const results = await Promise.all(promises); + return Reducer("aggregate", results); + } + + aggregateSync(match, index, project) { + validateFunction(match); + validateFunction(index); + if (project) validateFunction(project); + + var results = []; + + for (const storename of this.properties.storenames) { + const storepath = join(this.properties.datapath, storename); + results.push(aggregateStoreDataSync(storepath, match, index, project)); + } + + return Reducer("aggregate", results); + } +} + +exports.Database = Database; diff --git a/server/njodb/njodb.js b/server/njodb/njodb.js new file mode 100644 index 00000000..92b89edf --- /dev/null +++ b/server/njodb/njodb.js @@ -0,0 +1,731 @@ +"use strict"; + +const { + appendFile, + appendFileSync, + createReadStream, + createWriteStream, + readFileSync, + readdir, + readdirSync, + stat, + statSync, + writeFile +} = require("graceful-fs"); + +const { + join, + resolve +} = require("path"); + +const { createInterface } = require("readline"); + +const { promisify } = require("util"); + +const { + check, + checkSync, + lock, + lockSync +} = require("proper-lockfile"); + +const { + deleteFile, + deleteFileSync, + deleteDirectory, + deleteDirectorySync, + fileExists, + fileExistsSync, + moveFile, + moveFileSync, + releaseLock, + releaseLockSync, + replaceFile, + replaceFileSync +} = require("./utils"); + +const { + Handler, + Randomizer, + Result +} = require("./objects"); + +const filterStoreNames = (files, dataname) => { + var storenames = []; + const re = new RegExp("^" + [dataname, "\\d+", "json"].join(".") + "$"); + for (const file of files) { + if (re.test(file)) storenames.push(file); + } + return storenames; +}; + +const getStoreNames = async (datapath, dataname) => { + const files = await promisify(readdir)(datapath); + return filterStoreNames(files, dataname); +} + +const getStoreNamesSync = (datapath, dataname) => { + const files = readdirSync(datapath); + return filterStoreNames(files, dataname); +}; + +// Database management + +const statsStoreData = async (store, lockoptions) => { + var release, stats, results; + + release = await lock(store, lockoptions); + + const handlerResults = await new Promise((resolve, reject) => { + const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity }); + const handler = Handler("stats"); + + reader.on("line", record => handler.next(record)); + reader.on("close", () => resolve(handler.return())); + reader.on("error", error => reject(error)); + }); + + if (await check(store, lockoptions)) await releaseLock(store, release); + + results = Object.assign({ store: resolve(store) }, handlerResults) + + stats = await promisify(stat)(store); + results.size = stats.size; + results.created = stats.birthtime; + results.modified = stats.mtime; + + results.end = Date.now() + + return results; +}; + +const statsStoreDataSync = (store) => { + var file, release, results; + + release = lockSync(store); + file = readFileSync(store, "utf8"); + + if (checkSync(store)) releaseLockSync(store, release); + + const data = file.split("\n"); + const handler = Handler("stats"); + + for (var record of data) { + handler.next(record) + } + + results = Object.assign({ store: resolve(store) }, handler.return()); + + const stats = statSync(store); + results.size = stats.size; + results.created = stats.birthtime; + results.modified = stats.mtime; + + results.end = Date.now(); + + return results; +}; + +const distributeStoreData = async (properties) => { + var results = Result("distribute"); + + var storepaths = []; + var tempstorepaths = []; + + var locks = []; + + for (let storename of properties.storenames) { + const storepath = join(properties.datapath, storename); + storepaths.push(storepath); + locks.push(lock(storepath, properties.lockoptions)); + } + + const releases = await Promise.all(locks); + + var writes = []; + var writers = []; + + for (let i = 0; i < properties.datastores; i++) { + const tempstorepath = join(properties.temppath, [properties.dataname, i, results.start, "json"].join(".")); + tempstorepaths.push(tempstorepath); + await promisify(writeFile)(tempstorepath, ""); + writers.push(createWriteStream(tempstorepath, { flags: "r+" })); + } + + for (let storename of properties.storenames) { + writes.push(new Promise((resolve, reject) => { + var line = 0; + const store = join(properties.datapath, storename); + const randomizer = Randomizer(Array.from(Array(properties.datastores).keys()), false); + const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity }); + + reader.on("line", record => { + const storenumber = randomizer.next(); + + line++; + try { + record = JSON.stringify(JSON.parse(record)); + results.records++; + } catch { + results.errors.push({ line: line, data: record }); + } finally { + writers[storenumber].write(record + "\n"); + } + }); + + reader.on("close", () => { + resolve(true); + }); + + reader.on("error", error => { + reject(error); + }); + })); + } + + await Promise.all(writes); + + for (let writer of writers) { + writer.end(); + } + + var deletes = []; + + for (let storepath of storepaths) { + deletes.push(deleteFile(storepath)); + } + + await Promise.all(deletes); + + for (const release of releases) { + release(); + } + + var moves = []; + + for (let i = 0; i < tempstorepaths.length; i++) { + moves.push(moveFile(tempstorepaths[i], join(properties.datapath, [properties.dataname, i, "json"].join(".")))) + } + + await Promise.all(moves); + + results.stores = tempstorepaths.length, + results.end = Date.now(); + results.elapsed = results.end - results.start; + + return results; + +}; + +const distributeStoreDataSync = (properties) => { + var results = Result("distribute"); + + var storepaths = []; + var tempstorepaths = []; + + var releases = []; + var data = []; + + for (let storename of properties.storenames) { + const storepath = join(properties.datapath, storename); + storepaths.push(storepath); + releases.push(lockSync(storepath)); + const file = readFileSync(storepath, "utf8").trimEnd(); + if (file.length > 0) data = data.concat(file.split("\n")); + } + + var records = []; + + for (var i = 0; i < data.length; i++) { + try { + data[i] = JSON.stringify(JSON.parse(data[i])); + results.records++; + } catch (error) { + results.errors.push({ line: i, data: data[i] }); + } finally { + if (i === i % properties.datastores) records[i] = []; + records[i % properties.datastores] += data[i] + "\n"; + } + + } + + const randomizer = Randomizer(Array.from(Array(properties.datastores).keys()), false); + + for (var j = 0; j < records.length; j++) { + const storenumber = randomizer.next(); + const tempstorepath = join(properties.temppath, [properties.dataname, storenumber, results.start, "json"].join(".")); + tempstorepaths.push(tempstorepath); + appendFileSync(tempstorepath, records[j]); + } + + for (let storepath of storepaths) { + deleteFileSync(storepath); + } + + for (const release of releases) { + release(); + } + + for (let i = 0; i < tempstorepaths.length; i++) { + moveFileSync(tempstorepaths[i], join(properties.datapath, [properties.dataname, i, "json"].join("."))); + } + + results.stores = tempstorepaths.length, + results.end = Date.now(); + results.elapsed = results.end - results.start; + + return results; + +}; + +const dropEverything = async (properties) => { + var locks = []; + + for (let storename of properties.storenames) { + locks.push(lock(join(properties.datapath, storename), properties.lockoptions)); + } + + const releases = await Promise.all(locks); + + var deletes = []; + + for (let storename of properties.storenames) { + deletes.push(deleteFile(join(properties.datapath, storename))); + } + + var results = await Promise.all(deletes); + + for (const release of releases) { + release(); + } + + deletes = [ + deleteDirectory(properties.temppath), + deleteDirectory(properties.datapath), + deleteFile(join(properties.root, "njodb.properties")) + ]; + + results = results.concat(await Promise.all(deletes)); + + return results; +} + +const dropEverythingSync = (properties) => { + var results = []; + var releases = []; + + for (let storename of properties.storenames) { + releases.push(lockSync(join(properties.datapath, storename))); + } + + for (let storename of properties.storenames) { + results.push(deleteFileSync(join(properties.datapath, storename))); + } + + for (const release of releases) { + release(); + } + + results.push(deleteDirectorySync(properties.temppath)); + results.push(deleteDirectorySync(properties.datapath)); + results.push(deleteFileSync(join(properties.root, "njodb.properties"))); + + return results; +} + +// Data manipulation + +const insertStoreData = async (store, data, lockoptions) => { + let release, results; + + results = Object.assign({ store: resolve(store) }, Result("insert")); + + if (await fileExists(store)) release = await lock(store, lockoptions); + + await promisify(appendFile)(store, data, "utf8"); + + if (await check(store, lockoptions)) await releaseLock(store, release); + + results.inserted = (data.length > 0) ? data.split("\n").length - 1 : 0; + results.end = Date.now(); + + return results; +}; + +const insertStoreDataSync = (store, data) => { + let release, results; + + results = Object.assign({ store: resolve(store) }, Result("insert")); + + if (fileExistsSync(store)) release = lockSync(store); + + appendFileSync(store, data, "utf8"); + + if (checkSync(store)) releaseLockSync(store, release); + + results.inserted = (data.length > 0) ? data.split("\n").length - 1 : 0; + results.end = Date.now(); + + return results; +}; + +const insertFileData = async (file, datapath, storenames, lockoptions) => { + let datastores, locks, releases, writers, results; + + results = Result("insertFile"); + + datastores = storenames.length; + locks = []; + writers = []; + + for (let storename of storenames) { + const storepath = join(datapath, storename); + locks.push(lock(storepath, lockoptions)); + writers.push(createWriteStream(storepath, { flags: "r+" })); + } + + releases = await Promise.all(locks); + + await new Promise((resolve, reject) => { + const randomizer = Randomizer(Array.from(Array(datastores).keys()), false); + const reader = createInterface({ input: createReadStream(file), crlfDelay: Infinity }); + + reader.on("line", record => { + record = record.trim(); + + const storenumber = randomizer.next(); + results.lines++; + + if (record.length > 0) { + try { + record = JSON.parse(record); + results.inserted++; + } catch (error) { + results.errors.push({ error: error.message, line: results.lines, data: record }); + } finally { + writers[storenumber].write(JSON.stringify(record) + "\n"); + } + } else { + results.blanks++; + } + }); + + reader.on("close", () => { + resolve(true); + }); + + reader.on("error", error => { + reject(error); + }); + }); + + for (const writer of writers) { + writer.end(); + } + + for (const release of releases) { + release(); + } + + results.end = Date.now(); + results.elapsed = results.end - results.start; + + return results; +} + +const selectStoreData = async (store, match, project, lockoptions) => { + let release, results; + + release = await lock(store, lockoptions); + + const handlerResults = await new Promise((resolve, reject) => { + const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity }); + const handler = Handler("select", match, project); + + reader.on("line", record => handler.next(record)); + reader.on("close", () => resolve(handler.return())); + reader.on("error", error => reject(error)); + }); + + if (await check(store, lockoptions)) await releaseLock(store, release); + + results = Object.assign({ store: store }, handlerResults); + + return results; +}; + +const selectStoreDataSync = (store, match, project) => { + let file, release, results; + + release = lockSync(store); + + file = readFileSync(store, "utf8"); + + if (checkSync(store)) releaseLockSync(store, release); + + const records = file.split("\n"); + const handler = Handler("select", match, project); + + for (var record of records) { + handler.next(record); + } + + results = Object.assign({ store: store }, handler.return()); + + return results; +}; + +const updateStoreData = async (store, match, update, tempstore, lockoptions) => { + let release, results; + + release = await lock(store, lockoptions); + + // console.log('Start updateStoreData for tempstore', tempstore, 'real store', store) + const handlerResults = await new Promise((resolve, reject) => { + + const writer = createWriteStream(tempstore); + const handler = Handler("update", match, update); + + writer.on("open", () => { + // Reader was opening and closing before writer ever opened + const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity }); + + // console.log('Writer opened for tempstore', tempstore) + reader.on("line", record => { + handler.next(record, writer) + }); + + + reader.on("close", () => { + // console.log('Closing reader for store', store) + writer.end(); + resolve(handler.return()); + }); + + reader.on("error", error => reject(error)); + }); + + // writer.on('close', () => { + // console.log('Writer closed for tempstore', tempstore) + // }) + + writer.on("error", error => reject(error)); + + }); + + results = Object.assign({ store: store, tempstore: tempstore }, handlerResults); + + if (results.updated > 0) { + if (!await replaceFile(store, tempstore)) { + results.errors = [...results.records]; + results.updated = 0; + } + } else { + await deleteFile(tempstore); + } + + if (await check(store, lockoptions)) await releaseLock(store, release); + + results.end = Date.now(); + delete results.data; + delete results.records; + + return results; +}; + +const updateStoreDataSync = (store, match, update, tempstore) => { + let file, release, results; + + release = lockSync(store); + file = readFileSync(store, "utf8").trimEnd(); + + if (checkSync(store)) releaseLockSync(store, release); + + + const records = file.split("\n"); + const handler = Handler("update", match, update); + + for (var record of records) { + handler.next(record); + } + + results = Object.assign({ store: store, tempstore: tempstore }, handler.return()); + + if (results.updated > 0) { + let append, replace; + + try { + appendFileSync(tempstore, results.data.join("\n") + "\n", "utf8"); + append = true; + } catch { + append = false; + } + + if (append) replace = replaceFileSync(store, tempstore); + + if (!(append || replace)) { + results.errors = [...results.records]; + results.updated = 0; + } + } + + results.end = Date.now(); + delete results.data; + delete results.records; + + return results; + +}; + +const deleteStoreData = async (store, match, tempstore, lockoptions) => { + let release, results; + + release = await lock(store, lockoptions); + + const handlerResults = await new Promise((resolve, reject) => { + const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity }); + const writer = createWriteStream(tempstore); + const handler = Handler("delete", match); + + writer.on("open", () => { + reader.on("line", record => handler.next(record, writer)); + }); + + writer.on("error", error => reject(error)); + + reader.on("close", () => { + writer.end(); + resolve(handler.return()); + }); + + reader.on("error", error => reject(error)); + }); + + results = Object.assign({ store: store, tempstore: tempstore }, handlerResults); + + if (results.deleted > 0) { + if (!await replaceFile(store, tempstore)) { + results.errors = [...results.records]; + results.deleted = 0; + } + } else { + await deleteFile(tempstore); + } + + if (await check(store, lockoptions)) await releaseLock(store, release); + + results.end = Date.now(); + delete results.data; + delete results.records; + + return results; + +}; + +const deleteStoreDataSync = (store, match, tempstore) => { + let file, release, results; + + release = lockSync(store); + file = readFileSync(store, "utf8"); + + if (checkSync(store)) releaseLockSync(store, release); + + const records = file.split("\n"); + const handler = Handler("delete", match); + + for (var record of records) { + handler.next(record) + } + + results = Object.assign({ store: store, tempstore: tempstore }, handler.return()); + + if (results.deleted > 0) { + let append, replace; + + try { + appendFileSync(tempstore, results.data.join("\n") + "\n", "utf8"); + append = true; + } catch { + append = false; + } + + if (append) replace = replaceFileSync(store, tempstore); + + if (!(append || replace)) { + results.errors = [...results.records]; + results.updated = 0; + } + } + + results.end = Date.now(); + delete results.data; + delete results.records; + + return results; +}; + +const aggregateStoreData = async (store, match, index, project, lockoptions) => { + let release, results; + + release = await lock(store, lockoptions); + + const handlerResults = await new Promise((resolve, reject) => { + const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity }); + const handler = Handler("aggregate", match, index, project); + + reader.on("line", record => handler.next(record)); + reader.on("close", () => resolve(handler.return())); + reader.on("error", error => reject(error)); + }); + + if (await check(store, lockoptions)) releaseLock(store, release); + + results = Object.assign({ store: store }, handlerResults); + + return results; +} + +const aggregateStoreDataSync = (store, match, index, project) => { + let file, release, results; + + release = lockSync(store); + file = readFileSync(store, "utf8"); + + if (checkSync(store)) releaseLockSync(store, release); + + const records = file.split("\n"); + const handler = Handler("aggregate", match, index, project); + + for (var record of records) { + handler.next(record); + } + + results = Object.assign({ store: store }, handler.return()); + + return results; +} + +exports.getStoreNames = getStoreNames; +exports.getStoreNamesSync = getStoreNamesSync; + +// Database management +exports.statsStoreData = statsStoreData; +exports.statsStoreDataSync = statsStoreDataSync; +exports.distributeStoreData = distributeStoreData; +exports.distributeStoreDataSync = distributeStoreDataSync; +exports.dropEverything = dropEverything; +exports.dropEverythingSync = dropEverythingSync; + +// Data manipulation +exports.insertStoreData = insertStoreData; +exports.insertStoreDataSync = insertStoreDataSync; +exports.insertFileData = insertFileData; +exports.selectStoreData = selectStoreData; +exports.selectStoreDataSync = selectStoreDataSync; +exports.updateStoreData = updateStoreData; +exports.updateStoreDataSync = updateStoreDataSync; +exports.deleteStoreData = deleteStoreData; +exports.deleteStoreDataSync = deleteStoreDataSync; +exports.aggregateStoreData = aggregateStoreData; +exports.aggregateStoreDataSync = aggregateStoreDataSync; + diff --git a/server/njodb/objects.js b/server/njodb/objects.js new file mode 100644 index 00000000..6545ec83 --- /dev/null +++ b/server/njodb/objects.js @@ -0,0 +1,608 @@ +"use strict"; + +const { + convertSize, + max, + min +} = require("./utils"); + +const Randomizer = (data, replacement) => { + var mutable = [...data]; + if (replacement === undefined || typeof replacement !== "boolean") replacement = true; + + function _next() { + var selection; + const index = Math.floor(Math.random() * mutable.length); + + if (replacement) { + selection = mutable.slice(index, index + 1)[0]; + } else { + selection = mutable.splice(index, 1)[0]; + if (mutable.length === 0) mutable = [...data]; + } + + return selection; + } + + return { + next: _next + }; +}; + +const Result = (type) => { + var _result; + + switch (type) { + case "stats": + _result = { + size: 0, + lines: 0, + records: 0, + errors: [], + blanks: 0, + created: undefined, + modified: undefined, + start: Date.now(), + end: undefined, + elapsed: 0 + }; + break; + case "distribute": + _result = { + stores: undefined, + records: 0, + errors: [], + start: Date.now(), + end: undefined, + elapsed: undefined + }; + break; + case "insert": + _result = { + inserted: 0, + start: Date.now(), + end: undefined, + elapsed: 0 + }; + break; + case "insertFile": + _result = { + lines: 0, + inserted: 0, + errors: [], + blanks: 0, + start: Date.now(), + end: undefined + }; + break; + case "select": + _result = { + lines: 0, + selected: 0, + ignored: 0, + errors: [], + blanks: 0, + start: Date.now(), + end: undefined, + elapsed: 0, + data: [], + }; + break; + case "update": + _result = { + lines: 0, + selected: 0, + updated: 0, + unchanged: 0, + errors: [], + blanks: 0, + start: Date.now(), + end: undefined, + elapsed: 0, + data: [], + records: [] + }; + break; + case "delete": + _result = { + lines: 0, + deleted: 0, + retained: 0, + errors: [], + blanks: 0, + start: Date.now(), + end: undefined, + elapsed: 0, + data: [], + records: [] + }; + break; + case "aggregate": + _result = { + lines: 0, + aggregates: {}, + indexed: 0, + unindexed: 0, + errors: [], + blanks: 0, + start: Date.now(), + end: undefined, + elapsed: 0 + }; + break; + } + + return _result; +} + +const Reduce = (type) => { + var _reduce; + + switch (type) { + case "stats": + _reduce = Object.assign(Result("stats"), { + stores: 0, + min: undefined, + max: undefined, + mean: undefined, + var: undefined, + std: undefined, + m2: 0 + }); + break; + case "drop": + _reduce = { + dropped: false, + start: Date.now(), + end: 0, + elapsed: 0 + }; + break; + case "aggregate": + _reduce = Object.assign(Result("aggregate"), { + data: [] + }); + break; + default: + _reduce = Result(type); + break; + } + + _reduce.details = undefined; + + return _reduce; +}; + +const Handler = (type, ...functions) => { + var _results = Result(type); + + const _next = (record, writer) => { + record = new Record(record); + _results.lines++; + + if (record.length === 0) { + _results.blanks++; + } else { + if (record.data) { + switch (type) { + case "stats": + statsHandler(record, _results); + break; + case "select": + selectHandler(record, functions[0], functions[1], _results); + break; + case "update": + updateHandler(record, functions[0], functions[1], writer, _results); + break; + case "delete": + deleteHandler(record, functions[0], writer, _results); + break; + case "aggregate": + aggregateHandler(record, functions[0], functions[1], functions[2], _results); + break; + } + } else { + _results.errors.push({ error: record.error, line: _results.lines, data: record.source }); + + if (type === "update" || type === "delete") { + if (writer) { + writer.write(record.source + "\n"); + } else { + _results.data.push(record.source); + } + } + } + } + }; + + const _return = () => { + _results.end = Date.now(); + _results.elapsed = _results.end - _results.start; + return _results; + } + + return { + next: _next, + return: _return + }; +}; + +const statsHandler = (record, results) => { + results.records++; + return results; +}; + +const selectHandler = (record, selecter, projecter, results) => { + if (record.select(selecter)) { + if (projecter) { + results.data.push(record.project(projecter)); + } else { + results.data.push(record.data); + } + results.selected++; + } else { + results.ignored++; + } +}; + +const updateHandler = (record, selecter, updater, writer, results) => { + if (record.select(selecter)) { + results.selected++; + if (record.update(updater)) { + results.updated++; + results.records.push(record.data); + } else { + results.unchanged++; + } + } else { + results.unchanged++; + } + + if (writer) { + writer.write(JSON.stringify(record.data) + "\n"); + } else { + results.data.push(JSON.stringify(record.data)); + } +}; + +const deleteHandler = (record, selecter, writer, results) => { + if (record.select(selecter)) { + results.deleted++; + results.records.push(record.data); + } else { + results.retained++; + + if (writer) { + writer.write(JSON.stringify(record.data) + "\n"); + } else { + results.data.push(JSON.stringify(record.data)); + } + } +}; + +const aggregateHandler = (record, selecter, indexer, projecter, results) => { + if (record.select(selecter)) { + const index = record.index(indexer); + + if (!index) { + results.unindexed++; + } else { + var projection; + var fields; + + if (results.aggregates[index]) { + results.aggregates[index].count++; + } else { + results.aggregates[index] = { + count: 1, + aggregates: {} + }; + } + + if (projecter) { + projection = record.project(projecter); + fields = Object.keys(projection); + } else { + projection = record.data; + fields = Object.keys(record.data); + } + + for (const field of fields) { + if (projection[field] !== undefined) { + if (results.aggregates[index].aggregates[field]) { + accumulateAggregate(results.aggregates[index].aggregates[field], projection[field]); + } else { + results.aggregates[index].aggregates[field] = { + min: projection[field], + max: projection[field], + count: 1 + }; + if (typeof projection[field] === "number") { + results.aggregates[index].aggregates[field]["sum"] = projection[field]; + results.aggregates[index].aggregates[field]["mean"] = projection[field]; + results.aggregates[index].aggregates[field]["m2"] = 0; + } + } + } + } + + results.indexed++; + } + } +} + +const accumulateAggregate = (index, projection) => { + index["min"] = min(index["min"], projection); + index["max"] = max(index["max"], projection); + index["count"]++; + + // Welford's algorithm + if (typeof projection === "number") { + const delta1 = projection - index["mean"]; + index["sum"] += projection; + index["mean"] += delta1 / index["count"]; + const delta2 = projection - index["mean"]; + index["m2"] += delta1 * delta2; + } + + return index; +}; + +class Record { + constructor(record) { + this.source = record.trim(); + this.length = this.source.length + this.data = {}; + this.error = ""; + + try { + this.data = JSON.parse(this.source) + } catch (e) { + this.data = undefined; + this.error = e.message; + } + } +} + +Record.prototype.select = function (selecter) { + var result; + + try { + result = selecter(this.data); + } catch { + return false; + } + + if (typeof result !== "boolean") { + throw new TypeError("Selecter must return a boolean"); + } else { + return result; + } +}; + +Record.prototype.update = function (updater) { + var result; + + try { + result = updater(this.data); + } catch { + return false; + } + + if (typeof result !== "object") { + throw new TypeError("Updater must return an object"); + } else { + this.data = result; + return true; + } +} + +Record.prototype.project = function (projecter) { + var result; + + try { + result = projecter(this.data); + } catch { + return undefined; + } + + if (Array.isArray(result) || typeof result !== "object") { + throw new TypeError("Projecter must return an object"); + } else { + return result; + } +}; + +Record.prototype.index = function (indexer) { + try { + return indexer(this.data); + } catch { + return undefined; + } +}; + +const Reducer = (type, results) => { + var _reduce = Reduce(type); + + var i = 0; + var aggregates = {}; + + for (const result of results) { + switch (type) { + case "stats": + statsReducer(_reduce, result, i); + break; + case "insert": + insertReducer(_reduce, result); + break; + case "select": + selectReducer(_reduce, result); + break; + case "update": + updateReducer(_reduce, result); + break; + case "delete": + deleteReducer(_reduce, result); + break; + case "aggregate": + aggregateReducer(_reduce, result, aggregates); + break + } + + if (type === "stats") { + _reduce.stores++; + i++; + } + + if (type === "drop") { + _reduce.dropped = true; + } else if (type !== "insert") { + _reduce.lines += result.lines; + _reduce.errors = _reduce.errors.concat(result.errors); + _reduce.blanks += result.blanks; + } + + _reduce.start = min(_reduce.start, result.start); + _reduce.end = max(_reduce.end, result.end); + } + + if (type === "stats") { + _reduce.size = convertSize(_reduce.size); + _reduce.var = _reduce.m2 / (results.length); + _reduce.std = Math.sqrt(_reduce.m2 / (results.length)); + delete _reduce.m2; + } else if (type === "aggregate") { + for (const index of Object.keys(aggregates)) { + var aggregate = { + index: index, + count: aggregates[index].count, + aggregates: [] + }; + for (const field of Object.keys(aggregates[index].aggregates)) { + delete aggregates[index].aggregates[field].m2; + aggregate.aggregates.push({ field: field, data: aggregates[index].aggregates[field] }); + } + _reduce.data.push(aggregate); + } + delete _reduce.aggregates; + } + + _reduce.elapsed = _reduce.end - _reduce.start; + _reduce.details = results; + + return _reduce; +}; + +const statsReducer = (reduce, result, i) => { + reduce.size += result.size; + reduce.records += result.records; + reduce.min = min(reduce.min, result.records); + reduce.max = max(reduce.max, result.records); + if (reduce.mean === undefined) reduce.mean = result.records; + const delta1 = result.records - reduce.mean; + reduce.mean += delta1 / (i + 2); + const delta2 = result.records - reduce.mean; + reduce.m2 += delta1 * delta2; + reduce.created = min(reduce.created, result.created); + reduce.modified = max(reduce.modified, result.modified); +}; + +const insertReducer = (reduce, result) => { + reduce.inserted += result.inserted; +}; + +const selectReducer = (reduce, result) => { + reduce.selected += result.selected; + reduce.ignored += result.ignored; + reduce.data = reduce.data.concat(result.data); + delete result.data; +}; + +const updateReducer = (reduce, result) => { + reduce.selected += result.selected; + reduce.updated += result.updated; + reduce.unchanged += result.unchanged; +}; + +const deleteReducer = (reduce, result) => { + reduce.deleted += result.deleted; + reduce.retained += result.retained; +}; + +const aggregateReducer = (reduce, result, aggregates) => { + reduce.indexed += result.indexed; + reduce.unindexed += result.unindexed; + + const indexes = Object.keys(result.aggregates); + + for (const index of indexes) { + if (aggregates[index]) { + aggregates[index].count += result.aggregates[index].count; + } else { + aggregates[index] = { + count: result.aggregates[index].count, + aggregates: {} + }; + } + + const fields = Object.keys(result.aggregates[index].aggregates); + + for (const field of fields) { + const aggregateObject = aggregates[index].aggregates[field]; + const resultObject = result.aggregates[index].aggregates[field]; + + if (aggregateObject) { + reduceAggregate(aggregateObject, resultObject); + } else { + aggregates[index].aggregates[field] = { + min: resultObject["min"], + max: resultObject["max"], + count: resultObject["count"] + }; + + if (resultObject["m2"] !== undefined) { + aggregates[index].aggregates[field]["sum"] = resultObject["sum"]; + aggregates[index].aggregates[field]["mean"] = resultObject["mean"]; + aggregates[index].aggregates[field]["varp"] = resultObject["m2"] / resultObject["count"]; + aggregates[index].aggregates[field]["vars"] = resultObject["m2"] / (resultObject["count"] - 1); + aggregates[index].aggregates[field]["stdp"] = Math.sqrt(resultObject["m2"] / resultObject["count"]); + aggregates[index].aggregates[field]["stds"] = Math.sqrt(resultObject["m2"] / (resultObject["count"] - 1)); + aggregates[index].aggregates[field]["m2"] = resultObject["m2"]; + } + } + } + } + + delete result.aggregates; +}; + +const reduceAggregate = (aggregate, result) => { + const n = aggregate["count"] + result["count"]; + + aggregate["min"] = min(aggregate["min"], result["min"]); + aggregate["max"] = max(aggregate["max"], result["max"]); + + // Parallel version of Welford's algorithm + if (result["m2"] !== undefined) { + const delta = result["mean"] - aggregate["mean"]; + const m2 = aggregate["m2"] + result["m2"] + (Math.pow(delta, 2) * ((aggregate["count"] * result["count"]) / n)); + aggregate["m2"] = m2; + aggregate["varp"] = m2 / n; + aggregate["vars"] = m2 / (n - 1); + aggregate["stdp"] = Math.sqrt(m2 / n); + aggregate["stds"] = Math.sqrt(m2 / (n - 1)); + } + + if (result["sum"] !== undefined) { + aggregate["mean"] = (aggregate["sum"] + result["sum"]) / n; + aggregate["sum"] += result["sum"]; + } + + aggregate["count"] = n; +}; + +exports.Randomizer = Randomizer; +exports.Result = Result; +exports.Reduce = Reduce; +exports.Handler = Handler; +exports.Reducer = Reducer; diff --git a/server/njodb/utils.js b/server/njodb/utils.js new file mode 100644 index 00000000..f025f495 --- /dev/null +++ b/server/njodb/utils.js @@ -0,0 +1,178 @@ +"use strict"; + +const { + access, + constants, + existsSync, + rename, + renameSync, + rmdir, + rmdirSync, + unlink, + unlinkSync +} = require("graceful-fs"); + +const { promisify } = require("util"); + +const min = (a, b) => { + if (b === undefined || a <= b) return a; + return b; +}; + +const max = (a, b) => { + if (b === undefined || a > b) return a; + return b; +}; + +const convertSize = (size) => { + const sizes = ["bytes", "KB", "MB", "GB"]; + + var index = Math.floor(Math.log2(size)/10); + if (index > 3) index = 3; + + return Math.round(((size / Math.pow(1024, index)) + Number.EPSILON) * 100) / 100 + " " + sizes[index]; +}; + +const fileExists = async (a) => { + try { + await promisify(access)(a, constants.F_OK); + return true; + } catch (error) { + console.error(error); + return false; + } +} + +const fileExistsSync = (a) => { + try { + return existsSync(a); + } catch (error) { + console.error(error); + return false; + } +} + +const moveFile = async (a, b) => { + try { + await promisify(rename)(a, b); + return true; + } catch (error) { + console.error(error); + return false; + } +}; + +const moveFileSync = (a, b) => { + try { + renameSync(a, b); + return true; + } catch (error) { + console.error(error); + return false; + } +}; + +const deleteFile = async (filepath) => { + try { + await promisify(unlink)(filepath); + return true; + } catch (error) { + console.error(error); + return false; + } +}; + +const deleteFileSync = (filepath) => { + try { + unlinkSync(filepath); + return true; + } catch (error) { + console.error(error); + return false; + } +} + +const replaceFile = async (a, b) => { + if (!await moveFile(a, a + ".old")) return false; + + if (!await moveFile(b, a)) { + await moveFile(a + ".old", a); + return false; + } + + await deleteFile(a + ".old"); + + return true; +}; + +const replaceFileSync = (a, b) => { + if (!moveFileSync(a, a + ".old")) return false; + + if (!moveFileSync(b, a)) { + moveFile(a + ".old", a); + return false; + } + + deleteFileSync(a + ".old"); + + return true; +}; + +const deleteDirectory = async (dirpath) => { + try { + await promisify(rmdir)(dirpath); + return true; + } catch { + return false; + } +}; + +const deleteDirectorySync = (dirpath) => { + try { + rmdirSync(dirpath); + return true; + } catch { + return false; + } +}; + +const releaseLock = async (store, release) => { + try { + await release(); + } catch (error) { + if (!["ERELEASED", "ENOTACQUIRED"].includes(error.code)) { + error.store = store; + throw error; + } + } +} + +const releaseLockSync = (store, release) => { + try { + release(); + } catch (error) { + if (!["ERELEASED", "ENOTACQUIRED"].includes(error.code)) { + error.store = store; + throw error; + } + } +} + +exports.min = min; +exports.max = max; + +exports.convertSize = convertSize; + +exports.fileExists = fileExists; +exports.fileExistsSync = fileExistsSync; +exports.moveFile = moveFile; +exports.moveFileSync = moveFileSync; +exports.replaceFile = replaceFile; +exports.replaceFileSync = replaceFileSync; +exports.deleteFile = deleteFile; +exports.deleteFileSync = deleteFileSync; +exports.deleteDirectory = deleteDirectory; +exports.deleteDirectorySync = deleteDirectorySync; + +exports.releaseLock = releaseLock; +exports.releaseLockSync = releaseLockSync; \ No newline at end of file diff --git a/server/njodb/validators.js b/server/njodb/validators.js new file mode 100644 index 00000000..7b8a3224 --- /dev/null +++ b/server/njodb/validators.js @@ -0,0 +1,69 @@ +"use strict"; + +const { existsSync } = require("graceful-fs"); + +const validateSize = (s) => { + if (typeof s !== "number") { + throw new TypeError("Size must be a number"); + } else if (s <= 0) { + throw new RangeError("Size must be greater than zero"); + } + + return s; +}; + +const validateName = (n) => { + if (typeof n !== "string") { + throw new TypeError("Name must be a string"); + } else if (n.trim().length <= 0) { + throw new Error("Name must be a non-blank string") + } + + return n; +}; + +const validatePath = (p) => { + if (typeof p !== "string") { + throw new TypeError("Path must be a string"); + } else if (p.trim().length <= 0) { + throw new Error("Path must be a non-blank string"); + } else if (!existsSync(p)) { + throw new Error("Path does not exist"); + } + + return p; +}; + +const validateArray = (a) => { + if (!Array.isArray(a)) { + throw new TypeError("Not an array"); + } + + return a; +}; + +const validateObject = (o) => { + if (typeof o !== "object") { + throw new TypeError("Not an object"); + } + + return o; +}; + +const validateFunction = (f) => { + if (typeof f !== "function") { + throw new TypeError("Not a function") + } else { + const fString = f.toString(); + if (/\s*function/.test(fString) && !/\W+return\W+/.test(fString)) throw new Error("Function must return a value"); + } + + return f; +} + +exports.validateSize = validateSize; +exports.validateName = validateName; +exports.validatePath = validatePath; +exports.validateArray = validateArray; +exports.validateObject = validateObject; +exports.validateFunction = validateFunction; \ No newline at end of file