Compare commits
26 Commits
cf1b300f6a
...
f_influx-c
| Author | SHA1 | Date | |
|---|---|---|---|
| 873f00b21b | |||
| 271554719e | |||
| e18de63d7c | |||
| 2356040572 | |||
| b0bbf0c71a | |||
| 37b78e7373 | |||
| c5e1bb4c64 | |||
| 3c29ed2000 | |||
| ddf39b9433 | |||
| 3a927688d0 | |||
| c51cfc1b14 | |||
| e1b2a7e016 | |||
| fc5900b0ba | |||
| 354ca32a61 | |||
| d10e9bb2c6 | |||
| 44cd3288cf | |||
| 3af4bb7cc6 | |||
| 2a662e0bd1 | |||
| d7a9530b68 | |||
| 7de2250983 | |||
| bb3d843895 | |||
| 9472ed9198 | |||
| dcd0ce8111 | |||
| 1a9ced0bb8 | |||
| d77e3f8844 | |||
| e715cc1cac |
31
package-lock.json
generated
31
package-lock.json
generated
@@ -10,8 +10,10 @@
|
||||
"license": "AGPL-3.0",
|
||||
"dependencies": {
|
||||
"@influxdata/influxdb-client": "^1.20.0",
|
||||
"@influxdata/influxdb-client-apis": "^1.20.0",
|
||||
"log4js": "^6.3.0",
|
||||
"luxon": "^2.1.1"
|
||||
"luxon": "^2.1.1",
|
||||
"string-argv": "^0.3.1"
|
||||
}
|
||||
},
|
||||
"node_modules/@influxdata/influxdb-client": {
|
||||
@@ -19,6 +21,14 @@
|
||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
||||
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
||||
},
|
||||
"node_modules/@influxdata/influxdb-client-apis": {
|
||||
"version": "1.20.0",
|
||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
|
||||
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
|
||||
"peerDependencies": {
|
||||
"@influxdata/influxdb-client": "*"
|
||||
}
|
||||
},
|
||||
"node_modules/date-format": {
|
||||
"version": "3.0.0",
|
||||
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
||||
@@ -128,6 +138,14 @@
|
||||
"node": ">=4.0"
|
||||
}
|
||||
},
|
||||
"node_modules/string-argv": {
|
||||
"version": "0.3.1",
|
||||
"resolved": "https://registry.npmjs.org/string-argv/-/string-argv-0.3.1.tgz",
|
||||
"integrity": "sha512-a1uQGz7IyVy9YwhqjZIZu1c8JO8dNIe20xBmSS6qu9kv++k3JGzCVmprbNN5Kn+BgzD5E7YYwg1CcjuJMRNsvg==",
|
||||
"engines": {
|
||||
"node": ">=0.6.19"
|
||||
}
|
||||
},
|
||||
"node_modules/universalify": {
|
||||
"version": "0.1.2",
|
||||
"resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz",
|
||||
@@ -143,6 +161,12 @@
|
||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
||||
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
||||
},
|
||||
"@influxdata/influxdb-client-apis": {
|
||||
"version": "1.20.0",
|
||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
|
||||
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
|
||||
"requires": {}
|
||||
},
|
||||
"date-format": {
|
||||
"version": "3.0.0",
|
||||
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
||||
@@ -228,6 +252,11 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"string-argv": {
|
||||
"version": "0.3.1",
|
||||
"resolved": "https://registry.npmjs.org/string-argv/-/string-argv-0.3.1.tgz",
|
||||
"integrity": "sha512-a1uQGz7IyVy9YwhqjZIZu1c8JO8dNIe20xBmSS6qu9kv++k3JGzCVmprbNN5Kn+BgzD5E7YYwg1CcjuJMRNsvg=="
|
||||
},
|
||||
"universalify": {
|
||||
"version": "0.1.2",
|
||||
"resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz",
|
||||
|
||||
@@ -15,7 +15,9 @@
|
||||
"license": "AGPL-3.0",
|
||||
"dependencies": {
|
||||
"@influxdata/influxdb-client": "^1.20.0",
|
||||
"@influxdata/influxdb-client-apis": "^1.20.0",
|
||||
"log4js": "^6.3.0",
|
||||
"luxon": "^2.1.1"
|
||||
"luxon": "^2.1.1",
|
||||
"string-argv": "^0.3.1"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,20 +1,15 @@
|
||||
const logger = require("./logger.js")("exec");
|
||||
|
||||
const { spawn } = require("child_process");
|
||||
const { parseArgsStringToArgv } = require('string-argv');
|
||||
|
||||
|
||||
function exec(cmd, stdout, stderr, exit_handler){
|
||||
const [bin, ...args] = cmd.split(' ')
|
||||
function exec(cmd, options){
|
||||
const [bin, ...args] = parseArgsStringToArgv(cmd);
|
||||
|
||||
logger.addContext("binary", "bin");
|
||||
logger.debug(`Spawn process '${cmd}'`);
|
||||
let proc = spawn(bin, args);
|
||||
|
||||
return {
|
||||
"process": proc,
|
||||
"stdout": proc.stdout,
|
||||
"stderr": proc.stderr
|
||||
}
|
||||
return spawn(bin, args, options);
|
||||
}
|
||||
|
||||
// Specify exports
|
||||
|
||||
61
src/helper/influx-checks.js
Normal file
61
src/helper/influx-checks.js
Normal file
@@ -0,0 +1,61 @@
|
||||
const logger = require.main.require("./helper/logger.js")("influx-checks");
|
||||
|
||||
const Os = require("os");
|
||||
const { InfluxDB, Point } = require('@influxdata/influxdb-client')
|
||||
const Influx = require('@influxdata/influxdb-client-apis');
|
||||
|
||||
|
||||
function checkHealth(influxDb){
|
||||
return new Promise((resolve, reject) => {
|
||||
new Influx.HealthAPI(influxDb) // Check influx health
|
||||
.getHealth()
|
||||
.catch((err) => {
|
||||
logger.error("Could not communicate with Influx:");
|
||||
logger.error(`Error [${err.code}]:`, err.message);
|
||||
reject();
|
||||
})
|
||||
.then((res) => {
|
||||
logger.debug("Server healthy.", "Version: ", res.version);
|
||||
resolve(res);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function checkBucket(influxDb, options){
|
||||
return new Promise((resolve, reject) => {
|
||||
new Influx.BucketsAPI(influxDb).getBuckets(options)
|
||||
.catch((err) => { // Weirdly the influx-Api returns 404 for searches of non-existing buckets
|
||||
logger.error("Could not get bucket:");
|
||||
logger.error(`Error [${err.code}]:`, err.message);
|
||||
reject();
|
||||
}).then((res) => { // But an empty list when the bucket exists, but token does not have permission to get details
|
||||
logger.debug("Bucket found");
|
||||
resolve(res);
|
||||
// Now we know the bucket exists and we have some kind of permission.. but we still dont know if we are able to write to it..
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function checkWriteApi(influxDb, options){
|
||||
return new Promise((resolve, reject) => {
|
||||
const writeApi = influxDb.getWriteApi(options.org, options.bucket); // Get WriteAPI
|
||||
writeApi.writePoint(new Point("worker_connectionTest").tag("hostname", Os.hostname())) // Write point
|
||||
writeApi.close()
|
||||
.catch((err) => {
|
||||
logger.error("Could not get writeApi:");
|
||||
logger.error(`Error [${err.code}]:`, err.message);
|
||||
reject();
|
||||
}).then((res) => {
|
||||
logger.debug("Writing ok");
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// Specify exports
|
||||
module.exports = {
|
||||
checkHealth,
|
||||
checkBucket,
|
||||
checkWriteApi,
|
||||
};
|
||||
29
src/main.js
29
src/main.js
@@ -2,6 +2,8 @@ const logger = require("./helper/logger.js")("main");
|
||||
|
||||
const { requireEnvVars } = require("./helper/env.js");
|
||||
const { exit } = require("process");
|
||||
const { InfluxDB } = require('@influxdata/influxdb-client');
|
||||
const InfluxChecks = require('./helper/influx-checks.js');
|
||||
|
||||
/// Setup ENVs
|
||||
const env = process.env;
|
||||
@@ -9,8 +11,6 @@ const env = process.env;
|
||||
{
|
||||
env.LOGLEVEL ??= "INFO";
|
||||
env.WIFI_INTERFACE ??= "wlan0";
|
||||
env.WIFI_CHANNEL ??= [1,6,11];
|
||||
env.WIFI_CHANNEL_TIME ??= 1;
|
||||
}
|
||||
// Required vars
|
||||
let errorMsg = requireEnvVars([
|
||||
@@ -22,3 +22,28 @@ if(errorMsg){
|
||||
exit(1);
|
||||
}
|
||||
|
||||
(async function() {
|
||||
logger.info("Setup Influx..");
|
||||
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
|
||||
|
||||
await InfluxChecks.checkHealth(influxDb)
|
||||
.then((res) => {return InfluxChecks.checkBucket(influxDb, {
|
||||
org: env.INFLUX_ORG,
|
||||
name: env.INFLUX_BUCKET
|
||||
})})
|
||||
.then((res) => {return InfluxChecks.checkWriteApi(influxDb, {
|
||||
org: env.INFLUX_ORG,
|
||||
bucket: env.INFLUX_BUCKET
|
||||
})})
|
||||
.catch((err) => {
|
||||
if(err) {
|
||||
logger.error("Error whilst checking influx:");
|
||||
logger.error(err);
|
||||
}
|
||||
logger.fatal("Setup influx failed!");
|
||||
exit(1);
|
||||
});
|
||||
|
||||
logger.info("Influx ok");
|
||||
|
||||
})();
|
||||
|
||||
@@ -1,21 +1,23 @@
|
||||
const logger = require.main.require("./helper/logger.js")("InfluxPointWriter");
|
||||
const { Writeable } = require('stream');
|
||||
const { Writable } = require('stream');
|
||||
const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client')
|
||||
|
||||
/**
|
||||
* Get points and write them into influx
|
||||
*/
|
||||
class InfluxPointWriter extends Writeable{
|
||||
class InfluxPointWriter extends Writable{
|
||||
/**
|
||||
*
|
||||
* @param {string} url Influx-Url
|
||||
* @param {string} token Auth-token
|
||||
* @param {InfluxDB} influxDb InfluxDb
|
||||
* @param {string} org Organization to use
|
||||
* @param {string} bucket Bucket to use
|
||||
* @param {string} precision Precision to use
|
||||
* @param {Partial<WriteOptions>} options Options for WriteApi
|
||||
*/
|
||||
constructor(url, token, org, bucket, precision = 'us'){
|
||||
this._api = new InfluxDB({url, token}).getWriteApi(org, bucket, precision);
|
||||
constructor(influxDb, org, bucket, options){
|
||||
super({
|
||||
objectMode: true
|
||||
});
|
||||
this._api = influxDb.getWriteApi(org, bucket, 'us', options);
|
||||
}
|
||||
|
||||
_write(point, encoding, next){
|
||||
@@ -25,10 +27,8 @@ class InfluxPointWriter extends Writeable{
|
||||
|
||||
_flush(next){
|
||||
this._api.flush(true)
|
||||
.then(
|
||||
next,
|
||||
(err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); }
|
||||
);
|
||||
.catch((err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); })
|
||||
.then(next);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ class PacketInfluxPointFactory extends Transform{
|
||||
|
||||
point.setField('value', packet[objKey]); // Set field
|
||||
|
||||
this.push(null, point); // Push point into stream
|
||||
this.push(point); // Push point into stream
|
||||
});
|
||||
|
||||
next(); // Get next packet
|
||||
|
||||
@@ -27,7 +27,7 @@ class RegexBlockStream extends Transform{
|
||||
}
|
||||
|
||||
_transform(chunk, encoding, next){
|
||||
chunk = this.readableBuffer.length? this.readableBuffer.join() + chunk: chunk; // Add previous buffer to current chunk
|
||||
chunk = this.readableBuffer.length? this.readableBuffer.join('') + chunk: chunk; // Add previous buffer to current chunk
|
||||
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
|
||||
|
||||
let matches = chunk.match(this.matcher); // Match
|
||||
@@ -52,7 +52,7 @@ class RegexBlockStream extends Transform{
|
||||
|
||||
_flush(next){
|
||||
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
|
||||
let chunk = this.readableBuffer.toString();
|
||||
let chunk = this.readableBuffer.join('');
|
||||
let matches = chunk.match(this.matcher); // Match remaining buffer
|
||||
_writeMatches(matches); // Write matches including last element
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user