Compare commits
26 Commits
cf1b300f6a
...
f_influx-c
| Author | SHA1 | Date | |
|---|---|---|---|
| 873f00b21b | |||
| 271554719e | |||
| e18de63d7c | |||
| 2356040572 | |||
| b0bbf0c71a | |||
| 37b78e7373 | |||
| c5e1bb4c64 | |||
| 3c29ed2000 | |||
| ddf39b9433 | |||
| 3a927688d0 | |||
| c51cfc1b14 | |||
| e1b2a7e016 | |||
| fc5900b0ba | |||
| 354ca32a61 | |||
| d10e9bb2c6 | |||
| 44cd3288cf | |||
| 3af4bb7cc6 | |||
| 2a662e0bd1 | |||
| d7a9530b68 | |||
| 7de2250983 | |||
| bb3d843895 | |||
| 9472ed9198 | |||
| dcd0ce8111 | |||
| 1a9ced0bb8 | |||
| d77e3f8844 | |||
| e715cc1cac |
31
package-lock.json
generated
31
package-lock.json
generated
@@ -10,8 +10,10 @@
|
|||||||
"license": "AGPL-3.0",
|
"license": "AGPL-3.0",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@influxdata/influxdb-client": "^1.20.0",
|
"@influxdata/influxdb-client": "^1.20.0",
|
||||||
|
"@influxdata/influxdb-client-apis": "^1.20.0",
|
||||||
"log4js": "^6.3.0",
|
"log4js": "^6.3.0",
|
||||||
"luxon": "^2.1.1"
|
"luxon": "^2.1.1",
|
||||||
|
"string-argv": "^0.3.1"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/@influxdata/influxdb-client": {
|
"node_modules/@influxdata/influxdb-client": {
|
||||||
@@ -19,6 +21,14 @@
|
|||||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
||||||
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
||||||
},
|
},
|
||||||
|
"node_modules/@influxdata/influxdb-client-apis": {
|
||||||
|
"version": "1.20.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
|
||||||
|
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
|
||||||
|
"peerDependencies": {
|
||||||
|
"@influxdata/influxdb-client": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/date-format": {
|
"node_modules/date-format": {
|
||||||
"version": "3.0.0",
|
"version": "3.0.0",
|
||||||
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
||||||
@@ -128,6 +138,14 @@
|
|||||||
"node": ">=4.0"
|
"node": ">=4.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/string-argv": {
|
||||||
|
"version": "0.3.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/string-argv/-/string-argv-0.3.1.tgz",
|
||||||
|
"integrity": "sha512-a1uQGz7IyVy9YwhqjZIZu1c8JO8dNIe20xBmSS6qu9kv++k3JGzCVmprbNN5Kn+BgzD5E7YYwg1CcjuJMRNsvg==",
|
||||||
|
"engines": {
|
||||||
|
"node": ">=0.6.19"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/universalify": {
|
"node_modules/universalify": {
|
||||||
"version": "0.1.2",
|
"version": "0.1.2",
|
||||||
"resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz",
|
"resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz",
|
||||||
@@ -143,6 +161,12 @@
|
|||||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
||||||
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
||||||
},
|
},
|
||||||
|
"@influxdata/influxdb-client-apis": {
|
||||||
|
"version": "1.20.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
|
||||||
|
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
|
||||||
|
"requires": {}
|
||||||
|
},
|
||||||
"date-format": {
|
"date-format": {
|
||||||
"version": "3.0.0",
|
"version": "3.0.0",
|
||||||
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
||||||
@@ -228,6 +252,11 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"string-argv": {
|
||||||
|
"version": "0.3.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/string-argv/-/string-argv-0.3.1.tgz",
|
||||||
|
"integrity": "sha512-a1uQGz7IyVy9YwhqjZIZu1c8JO8dNIe20xBmSS6qu9kv++k3JGzCVmprbNN5Kn+BgzD5E7YYwg1CcjuJMRNsvg=="
|
||||||
|
},
|
||||||
"universalify": {
|
"universalify": {
|
||||||
"version": "0.1.2",
|
"version": "0.1.2",
|
||||||
"resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz",
|
"resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz",
|
||||||
|
|||||||
@@ -15,7 +15,9 @@
|
|||||||
"license": "AGPL-3.0",
|
"license": "AGPL-3.0",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@influxdata/influxdb-client": "^1.20.0",
|
"@influxdata/influxdb-client": "^1.20.0",
|
||||||
|
"@influxdata/influxdb-client-apis": "^1.20.0",
|
||||||
"log4js": "^6.3.0",
|
"log4js": "^6.3.0",
|
||||||
"luxon": "^2.1.1"
|
"luxon": "^2.1.1",
|
||||||
|
"string-argv": "^0.3.1"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,20 +1,15 @@
|
|||||||
const logger = require("./logger.js")("exec");
|
const logger = require("./logger.js")("exec");
|
||||||
|
|
||||||
const { spawn } = require("child_process");
|
const { spawn } = require("child_process");
|
||||||
|
const { parseArgsStringToArgv } = require('string-argv');
|
||||||
|
|
||||||
|
|
||||||
function exec(cmd, stdout, stderr, exit_handler){
|
function exec(cmd, options){
|
||||||
const [bin, ...args] = cmd.split(' ')
|
const [bin, ...args] = parseArgsStringToArgv(cmd);
|
||||||
|
|
||||||
logger.addContext("binary", "bin");
|
logger.addContext("binary", "bin");
|
||||||
logger.debug(`Spawn process '${cmd}'`);
|
logger.debug(`Spawn process '${cmd}'`);
|
||||||
let proc = spawn(bin, args);
|
return spawn(bin, args, options);
|
||||||
|
|
||||||
return {
|
|
||||||
"process": proc,
|
|
||||||
"stdout": proc.stdout,
|
|
||||||
"stderr": proc.stderr
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Specify exports
|
// Specify exports
|
||||||
|
|||||||
61
src/helper/influx-checks.js
Normal file
61
src/helper/influx-checks.js
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
const logger = require.main.require("./helper/logger.js")("influx-checks");
|
||||||
|
|
||||||
|
const Os = require("os");
|
||||||
|
const { InfluxDB, Point } = require('@influxdata/influxdb-client')
|
||||||
|
const Influx = require('@influxdata/influxdb-client-apis');
|
||||||
|
|
||||||
|
|
||||||
|
function checkHealth(influxDb){
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
new Influx.HealthAPI(influxDb) // Check influx health
|
||||||
|
.getHealth()
|
||||||
|
.catch((err) => {
|
||||||
|
logger.error("Could not communicate with Influx:");
|
||||||
|
logger.error(`Error [${err.code}]:`, err.message);
|
||||||
|
reject();
|
||||||
|
})
|
||||||
|
.then((res) => {
|
||||||
|
logger.debug("Server healthy.", "Version: ", res.version);
|
||||||
|
resolve(res);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function checkBucket(influxDb, options){
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
new Influx.BucketsAPI(influxDb).getBuckets(options)
|
||||||
|
.catch((err) => { // Weirdly the influx-Api returns 404 for searches of non-existing buckets
|
||||||
|
logger.error("Could not get bucket:");
|
||||||
|
logger.error(`Error [${err.code}]:`, err.message);
|
||||||
|
reject();
|
||||||
|
}).then((res) => { // But an empty list when the bucket exists, but token does not have permission to get details
|
||||||
|
logger.debug("Bucket found");
|
||||||
|
resolve(res);
|
||||||
|
// Now we know the bucket exists and we have some kind of permission.. but we still dont know if we are able to write to it..
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function checkWriteApi(influxDb, options){
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const writeApi = influxDb.getWriteApi(options.org, options.bucket); // Get WriteAPI
|
||||||
|
writeApi.writePoint(new Point("worker_connectionTest").tag("hostname", Os.hostname())) // Write point
|
||||||
|
writeApi.close()
|
||||||
|
.catch((err) => {
|
||||||
|
logger.error("Could not get writeApi:");
|
||||||
|
logger.error(`Error [${err.code}]:`, err.message);
|
||||||
|
reject();
|
||||||
|
}).then((res) => {
|
||||||
|
logger.debug("Writing ok");
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Specify exports
|
||||||
|
module.exports = {
|
||||||
|
checkHealth,
|
||||||
|
checkBucket,
|
||||||
|
checkWriteApi,
|
||||||
|
};
|
||||||
29
src/main.js
29
src/main.js
@@ -2,6 +2,8 @@ const logger = require("./helper/logger.js")("main");
|
|||||||
|
|
||||||
const { requireEnvVars } = require("./helper/env.js");
|
const { requireEnvVars } = require("./helper/env.js");
|
||||||
const { exit } = require("process");
|
const { exit } = require("process");
|
||||||
|
const { InfluxDB } = require('@influxdata/influxdb-client');
|
||||||
|
const InfluxChecks = require('./helper/influx-checks.js');
|
||||||
|
|
||||||
/// Setup ENVs
|
/// Setup ENVs
|
||||||
const env = process.env;
|
const env = process.env;
|
||||||
@@ -9,8 +11,6 @@ const env = process.env;
|
|||||||
{
|
{
|
||||||
env.LOGLEVEL ??= "INFO";
|
env.LOGLEVEL ??= "INFO";
|
||||||
env.WIFI_INTERFACE ??= "wlan0";
|
env.WIFI_INTERFACE ??= "wlan0";
|
||||||
env.WIFI_CHANNEL ??= [1,6,11];
|
|
||||||
env.WIFI_CHANNEL_TIME ??= 1;
|
|
||||||
}
|
}
|
||||||
// Required vars
|
// Required vars
|
||||||
let errorMsg = requireEnvVars([
|
let errorMsg = requireEnvVars([
|
||||||
@@ -22,3 +22,28 @@ if(errorMsg){
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(async function() {
|
||||||
|
logger.info("Setup Influx..");
|
||||||
|
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
|
||||||
|
|
||||||
|
await InfluxChecks.checkHealth(influxDb)
|
||||||
|
.then((res) => {return InfluxChecks.checkBucket(influxDb, {
|
||||||
|
org: env.INFLUX_ORG,
|
||||||
|
name: env.INFLUX_BUCKET
|
||||||
|
})})
|
||||||
|
.then((res) => {return InfluxChecks.checkWriteApi(influxDb, {
|
||||||
|
org: env.INFLUX_ORG,
|
||||||
|
bucket: env.INFLUX_BUCKET
|
||||||
|
})})
|
||||||
|
.catch((err) => {
|
||||||
|
if(err) {
|
||||||
|
logger.error("Error whilst checking influx:");
|
||||||
|
logger.error(err);
|
||||||
|
}
|
||||||
|
logger.fatal("Setup influx failed!");
|
||||||
|
exit(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info("Influx ok");
|
||||||
|
|
||||||
|
})();
|
||||||
|
|||||||
@@ -1,21 +1,23 @@
|
|||||||
const logger = require.main.require("./helper/logger.js")("InfluxPointWriter");
|
const logger = require.main.require("./helper/logger.js")("InfluxPointWriter");
|
||||||
const { Writeable } = require('stream');
|
const { Writable } = require('stream');
|
||||||
const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client')
|
const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client')
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get points and write them into influx
|
* Get points and write them into influx
|
||||||
*/
|
*/
|
||||||
class InfluxPointWriter extends Writeable{
|
class InfluxPointWriter extends Writable{
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param {string} url Influx-Url
|
* @param {InfluxDB} influxDb InfluxDb
|
||||||
* @param {string} token Auth-token
|
|
||||||
* @param {string} org Organization to use
|
* @param {string} org Organization to use
|
||||||
* @param {string} bucket Bucket to use
|
* @param {string} bucket Bucket to use
|
||||||
* @param {string} precision Precision to use
|
* @param {Partial<WriteOptions>} options Options for WriteApi
|
||||||
*/
|
*/
|
||||||
constructor(url, token, org, bucket, precision = 'us'){
|
constructor(influxDb, org, bucket, options){
|
||||||
this._api = new InfluxDB({url, token}).getWriteApi(org, bucket, precision);
|
super({
|
||||||
|
objectMode: true
|
||||||
|
});
|
||||||
|
this._api = influxDb.getWriteApi(org, bucket, 'us', options);
|
||||||
}
|
}
|
||||||
|
|
||||||
_write(point, encoding, next){
|
_write(point, encoding, next){
|
||||||
@@ -25,10 +27,8 @@ class InfluxPointWriter extends Writeable{
|
|||||||
|
|
||||||
_flush(next){
|
_flush(next){
|
||||||
this._api.flush(true)
|
this._api.flush(true)
|
||||||
.then(
|
.catch((err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); })
|
||||||
next,
|
.then(next);
|
||||||
(err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); }
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ class PacketInfluxPointFactory extends Transform{
|
|||||||
|
|
||||||
point.setField('value', packet[objKey]); // Set field
|
point.setField('value', packet[objKey]); // Set field
|
||||||
|
|
||||||
this.push(null, point); // Push point into stream
|
this.push(point); // Push point into stream
|
||||||
});
|
});
|
||||||
|
|
||||||
next(); // Get next packet
|
next(); // Get next packet
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ class RegexBlockStream extends Transform{
|
|||||||
}
|
}
|
||||||
|
|
||||||
_transform(chunk, encoding, next){
|
_transform(chunk, encoding, next){
|
||||||
chunk = this.readableBuffer.length? this.readableBuffer.join() + chunk: chunk; // Add previous buffer to current chunk
|
chunk = this.readableBuffer.length? this.readableBuffer.join('') + chunk: chunk; // Add previous buffer to current chunk
|
||||||
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
|
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
|
||||||
|
|
||||||
let matches = chunk.match(this.matcher); // Match
|
let matches = chunk.match(this.matcher); // Match
|
||||||
@@ -52,7 +52,7 @@ class RegexBlockStream extends Transform{
|
|||||||
|
|
||||||
_flush(next){
|
_flush(next){
|
||||||
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
|
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
|
||||||
let chunk = this.readableBuffer.toString();
|
let chunk = this.readableBuffer.join('');
|
||||||
let matches = chunk.match(this.matcher); // Match remaining buffer
|
let matches = chunk.match(this.matcher); // Match remaining buffer
|
||||||
_writeMatches(matches); // Write matches including last element
|
_writeMatches(matches); // Write matches including last element
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user