Compare commits
29 Commits
38985ea9e2
...
f_influx-c
| Author | SHA1 | Date | |
|---|---|---|---|
| 873f00b21b | |||
| 271554719e | |||
| e18de63d7c | |||
| 2356040572 | |||
| b0bbf0c71a | |||
| 37b78e7373 | |||
| c5e1bb4c64 | |||
| 3c29ed2000 | |||
| ddf39b9433 | |||
| 3a927688d0 | |||
| c51cfc1b14 | |||
| e1b2a7e016 | |||
| fc5900b0ba | |||
| 354ca32a61 | |||
| d10e9bb2c6 | |||
| 44cd3288cf | |||
| 3af4bb7cc6 | |||
| 2a662e0bd1 | |||
| d7a9530b68 | |||
| 7de2250983 | |||
| bb3d843895 | |||
| 9472ed9198 | |||
| dcd0ce8111 | |||
| 1a9ced0bb8 | |||
| d77e3f8844 | |||
| e715cc1cac | |||
| cf1b300f6a | |||
| 450f162cda | |||
| 7ebcf573b9 |
31
package-lock.json
generated
31
package-lock.json
generated
@@ -10,8 +10,10 @@
|
|||||||
"license": "AGPL-3.0",
|
"license": "AGPL-3.0",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@influxdata/influxdb-client": "^1.20.0",
|
"@influxdata/influxdb-client": "^1.20.0",
|
||||||
|
"@influxdata/influxdb-client-apis": "^1.20.0",
|
||||||
"log4js": "^6.3.0",
|
"log4js": "^6.3.0",
|
||||||
"luxon": "^2.1.1"
|
"luxon": "^2.1.1",
|
||||||
|
"string-argv": "^0.3.1"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/@influxdata/influxdb-client": {
|
"node_modules/@influxdata/influxdb-client": {
|
||||||
@@ -19,6 +21,14 @@
|
|||||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
||||||
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
||||||
},
|
},
|
||||||
|
"node_modules/@influxdata/influxdb-client-apis": {
|
||||||
|
"version": "1.20.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
|
||||||
|
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
|
||||||
|
"peerDependencies": {
|
||||||
|
"@influxdata/influxdb-client": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/date-format": {
|
"node_modules/date-format": {
|
||||||
"version": "3.0.0",
|
"version": "3.0.0",
|
||||||
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
||||||
@@ -128,6 +138,14 @@
|
|||||||
"node": ">=4.0"
|
"node": ">=4.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/string-argv": {
|
||||||
|
"version": "0.3.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/string-argv/-/string-argv-0.3.1.tgz",
|
||||||
|
"integrity": "sha512-a1uQGz7IyVy9YwhqjZIZu1c8JO8dNIe20xBmSS6qu9kv++k3JGzCVmprbNN5Kn+BgzD5E7YYwg1CcjuJMRNsvg==",
|
||||||
|
"engines": {
|
||||||
|
"node": ">=0.6.19"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/universalify": {
|
"node_modules/universalify": {
|
||||||
"version": "0.1.2",
|
"version": "0.1.2",
|
||||||
"resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz",
|
"resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz",
|
||||||
@@ -143,6 +161,12 @@
|
|||||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
||||||
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
||||||
},
|
},
|
||||||
|
"@influxdata/influxdb-client-apis": {
|
||||||
|
"version": "1.20.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
|
||||||
|
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
|
||||||
|
"requires": {}
|
||||||
|
},
|
||||||
"date-format": {
|
"date-format": {
|
||||||
"version": "3.0.0",
|
"version": "3.0.0",
|
||||||
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
||||||
@@ -228,6 +252,11 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"string-argv": {
|
||||||
|
"version": "0.3.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/string-argv/-/string-argv-0.3.1.tgz",
|
||||||
|
"integrity": "sha512-a1uQGz7IyVy9YwhqjZIZu1c8JO8dNIe20xBmSS6qu9kv++k3JGzCVmprbNN5Kn+BgzD5E7YYwg1CcjuJMRNsvg=="
|
||||||
|
},
|
||||||
"universalify": {
|
"universalify": {
|
||||||
"version": "0.1.2",
|
"version": "0.1.2",
|
||||||
"resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz",
|
"resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz",
|
||||||
|
|||||||
@@ -15,7 +15,9 @@
|
|||||||
"license": "AGPL-3.0",
|
"license": "AGPL-3.0",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@influxdata/influxdb-client": "^1.20.0",
|
"@influxdata/influxdb-client": "^1.20.0",
|
||||||
|
"@influxdata/influxdb-client-apis": "^1.20.0",
|
||||||
"log4js": "^6.3.0",
|
"log4js": "^6.3.0",
|
||||||
"luxon": "^2.1.1"
|
"luxon": "^2.1.1",
|
||||||
|
"string-argv": "^0.3.1"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,20 +1,15 @@
|
|||||||
const logger = require("./logger.js")("exec");
|
const logger = require("./logger.js")("exec");
|
||||||
|
|
||||||
const { spawn } = require("child_process");
|
const { spawn } = require("child_process");
|
||||||
|
const { parseArgsStringToArgv } = require('string-argv');
|
||||||
|
|
||||||
|
|
||||||
function exec(cmd, stdout, stderr, exit_handler){
|
function exec(cmd, options){
|
||||||
const [bin, ...args] = cmd.split(' ')
|
const [bin, ...args] = parseArgsStringToArgv(cmd);
|
||||||
|
|
||||||
logger.addContext("binary", "bin");
|
logger.addContext("binary", "bin");
|
||||||
logger.debug(`Spawn process '${cmd}'`);
|
logger.debug(`Spawn process '${cmd}'`);
|
||||||
let proc = spawn(bin, args);
|
return spawn(bin, args, options);
|
||||||
|
|
||||||
return {
|
|
||||||
"process": proc,
|
|
||||||
"stdout": proc.stdout,
|
|
||||||
"stderr": proc.stderr
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Specify exports
|
// Specify exports
|
||||||
|
|||||||
61
src/helper/influx-checks.js
Normal file
61
src/helper/influx-checks.js
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
const logger = require.main.require("./helper/logger.js")("influx-checks");
|
||||||
|
|
||||||
|
const Os = require("os");
|
||||||
|
const { InfluxDB, Point } = require('@influxdata/influxdb-client')
|
||||||
|
const Influx = require('@influxdata/influxdb-client-apis');
|
||||||
|
|
||||||
|
|
||||||
|
function checkHealth(influxDb){
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
new Influx.HealthAPI(influxDb) // Check influx health
|
||||||
|
.getHealth()
|
||||||
|
.catch((err) => {
|
||||||
|
logger.error("Could not communicate with Influx:");
|
||||||
|
logger.error(`Error [${err.code}]:`, err.message);
|
||||||
|
reject();
|
||||||
|
})
|
||||||
|
.then((res) => {
|
||||||
|
logger.debug("Server healthy.", "Version: ", res.version);
|
||||||
|
resolve(res);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function checkBucket(influxDb, options){
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
new Influx.BucketsAPI(influxDb).getBuckets(options)
|
||||||
|
.catch((err) => { // Weirdly the influx-Api returns 404 for searches of non-existing buckets
|
||||||
|
logger.error("Could not get bucket:");
|
||||||
|
logger.error(`Error [${err.code}]:`, err.message);
|
||||||
|
reject();
|
||||||
|
}).then((res) => { // But an empty list when the bucket exists, but token does not have permission to get details
|
||||||
|
logger.debug("Bucket found");
|
||||||
|
resolve(res);
|
||||||
|
// Now we know the bucket exists and we have some kind of permission.. but we still dont know if we are able to write to it..
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function checkWriteApi(influxDb, options){
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const writeApi = influxDb.getWriteApi(options.org, options.bucket); // Get WriteAPI
|
||||||
|
writeApi.writePoint(new Point("worker_connectionTest").tag("hostname", Os.hostname())) // Write point
|
||||||
|
writeApi.close()
|
||||||
|
.catch((err) => {
|
||||||
|
logger.error("Could not get writeApi:");
|
||||||
|
logger.error(`Error [${err.code}]:`, err.message);
|
||||||
|
reject();
|
||||||
|
}).then((res) => {
|
||||||
|
logger.debug("Writing ok");
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Specify exports
|
||||||
|
module.exports = {
|
||||||
|
checkHealth,
|
||||||
|
checkBucket,
|
||||||
|
checkWriteApi,
|
||||||
|
};
|
||||||
29
src/main.js
29
src/main.js
@@ -2,6 +2,8 @@ const logger = require("./helper/logger.js")("main");
|
|||||||
|
|
||||||
const { requireEnvVars } = require("./helper/env.js");
|
const { requireEnvVars } = require("./helper/env.js");
|
||||||
const { exit } = require("process");
|
const { exit } = require("process");
|
||||||
|
const { InfluxDB } = require('@influxdata/influxdb-client');
|
||||||
|
const InfluxChecks = require('./helper/influx-checks.js');
|
||||||
|
|
||||||
/// Setup ENVs
|
/// Setup ENVs
|
||||||
const env = process.env;
|
const env = process.env;
|
||||||
@@ -9,8 +11,6 @@ const env = process.env;
|
|||||||
{
|
{
|
||||||
env.LOGLEVEL ??= "INFO";
|
env.LOGLEVEL ??= "INFO";
|
||||||
env.WIFI_INTERFACE ??= "wlan0";
|
env.WIFI_INTERFACE ??= "wlan0";
|
||||||
env.WIFI_CHANNEL ??= [1,6,11];
|
|
||||||
env.WIFI_CHANNEL_TIME ??= 1;
|
|
||||||
}
|
}
|
||||||
// Required vars
|
// Required vars
|
||||||
let errorMsg = requireEnvVars([
|
let errorMsg = requireEnvVars([
|
||||||
@@ -22,3 +22,28 @@ if(errorMsg){
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(async function() {
|
||||||
|
logger.info("Setup Influx..");
|
||||||
|
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
|
||||||
|
|
||||||
|
await InfluxChecks.checkHealth(influxDb)
|
||||||
|
.then((res) => {return InfluxChecks.checkBucket(influxDb, {
|
||||||
|
org: env.INFLUX_ORG,
|
||||||
|
name: env.INFLUX_BUCKET
|
||||||
|
})})
|
||||||
|
.then((res) => {return InfluxChecks.checkWriteApi(influxDb, {
|
||||||
|
org: env.INFLUX_ORG,
|
||||||
|
bucket: env.INFLUX_BUCKET
|
||||||
|
})})
|
||||||
|
.catch((err) => {
|
||||||
|
if(err) {
|
||||||
|
logger.error("Error whilst checking influx:");
|
||||||
|
logger.error(err);
|
||||||
|
}
|
||||||
|
logger.fatal("Setup influx failed!");
|
||||||
|
exit(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info("Influx ok");
|
||||||
|
|
||||||
|
})();
|
||||||
|
|||||||
38
src/streamHandler/InfluxPointWriter.js
Normal file
38
src/streamHandler/InfluxPointWriter.js
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
const logger = require.main.require("./helper/logger.js")("InfluxPointWriter");
|
||||||
|
const { Writable } = require('stream');
|
||||||
|
const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get points and write them into influx
|
||||||
|
*/
|
||||||
|
class InfluxPointWriter extends Writable{
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param {InfluxDB} influxDb InfluxDb
|
||||||
|
* @param {string} org Organization to use
|
||||||
|
* @param {string} bucket Bucket to use
|
||||||
|
* @param {Partial<WriteOptions>} options Options for WriteApi
|
||||||
|
*/
|
||||||
|
constructor(influxDb, org, bucket, options){
|
||||||
|
super({
|
||||||
|
objectMode: true
|
||||||
|
});
|
||||||
|
this._api = influxDb.getWriteApi(org, bucket, 'us', options);
|
||||||
|
}
|
||||||
|
|
||||||
|
_write(point, encoding, next){
|
||||||
|
this._api.writePoint(point);
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
|
||||||
|
_flush(next){
|
||||||
|
this._api.flush(true)
|
||||||
|
.catch((err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); })
|
||||||
|
.then(next);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Specify exports
|
||||||
|
module.exports = {
|
||||||
|
InfluxPointWriter
|
||||||
|
};
|
||||||
72
src/streamHandler/PacketInfluxPointFactory.js
Normal file
72
src/streamHandler/PacketInfluxPointFactory.js
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
const logger = require.main.require("./helper/logger.js")("PacketStreamFactory");
|
||||||
|
const { Transform } = require('stream');
|
||||||
|
const {Point} = require('@influxdata/influxdb-client')
|
||||||
|
|
||||||
|
/** Keys to always use as tags */
|
||||||
|
const TAG_LIST = [
|
||||||
|
"srcMac",
|
||||||
|
"dstMac",
|
||||||
|
"bssid",
|
||||||
|
"frequency",
|
||||||
|
"flags",
|
||||||
|
];
|
||||||
|
|
||||||
|
/** Measurement-name and corresponding field-key */
|
||||||
|
const MEASUREMENT_MAP = new Map([
|
||||||
|
["Signal", "signal"],
|
||||||
|
["PayloadSize", "payloadSize"],
|
||||||
|
["DataRate", "dataRate"],
|
||||||
|
["SSID", "ssid"],
|
||||||
|
["AuthenticationType", "authenticationType"],
|
||||||
|
["AssociationSuccess", "associationIsSuccessful"],
|
||||||
|
["DisassociationReason", "disassociationReason"],
|
||||||
|
]);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get packets and convert them into influx-points
|
||||||
|
*/
|
||||||
|
class PacketInfluxPointFactory extends Transform{
|
||||||
|
constructor(){
|
||||||
|
super({
|
||||||
|
readableObjectMode: true,
|
||||||
|
writableObjectMode: true
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_transform(packet, encoding, next){
|
||||||
|
// Create measurements
|
||||||
|
MEASUREMENT_MAP.forEach((objKey, measurement) => {
|
||||||
|
if(!Object.keys(packet).includes(objKey)) return;
|
||||||
|
|
||||||
|
let point = new Point(measurement); // Create point
|
||||||
|
|
||||||
|
// Set tags
|
||||||
|
TAG_LIST.filter(tag => Object.keys(packet).includes(tag))
|
||||||
|
.forEach(tag => point.tag(tag, packet[tag]));
|
||||||
|
|
||||||
|
point.setField('value', packet[objKey]); // Set field
|
||||||
|
|
||||||
|
this.push(point); // Push point into stream
|
||||||
|
});
|
||||||
|
|
||||||
|
next(); // Get next packet
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/** Mapping for type -> field-method */
|
||||||
|
const POINT_FIELD_TYPE = new Map([
|
||||||
|
['boolean', function(key, value){ return this.booleanField(key, value); }],
|
||||||
|
['number', function(key, value){ return this.intField(key, value); }],
|
||||||
|
['string', function(key, value){ return this.stringField(key, value); }],
|
||||||
|
]);
|
||||||
|
Point.prototype.setField = function(key, value){
|
||||||
|
let setField = POINT_FIELD_TYPE.get(typeof value);
|
||||||
|
return setField.apply(this, [key, value]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Specify exports
|
||||||
|
module.exports = {
|
||||||
|
PacketInfluxPointFactory
|
||||||
|
};
|
||||||
@@ -10,9 +10,10 @@ class RegexBlockStream extends Transform{
|
|||||||
matchAllOnFlush;
|
matchAllOnFlush;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {RegExp} matcher Block-match - WARNING: It should match a clean-block (including e.g. newline)! Otherwise buffer will get dirty and use more and more ressources.
|
* @param {RegExp} matcher Block-match
|
||||||
* @param {boolean} withholdLastBlock When true, the last matches block will not be submitted to prevent submitting incomplete blocks.
|
* @param {boolean} withholdLastBlock When true, the last matches block will not be submitted to prevent submitting incomplete blocks.
|
||||||
* @param {boolean} matchAllOnFlush (Only in combination with withholdLastBlock) When enabled, the buffer will be matched on last time on _flush (stream deconstruction) and write any, also incomplete, blocks
|
* @param {boolean} matchAllOnFlush (Only in combination with withholdLastBlock) When enabled, the buffer will be matched on last time on _flush (stream deconstruction) and write any, also incomplete, blocks
|
||||||
|
* @remarks WARNING: It should match a clean-block (including e.g. newline)! Otherwise buffer will get dirty and use more and more ressources.
|
||||||
*/
|
*/
|
||||||
constructor(matcher, withholdLastBlock = true, matchAllOnFlush = false){
|
constructor(matcher, withholdLastBlock = true, matchAllOnFlush = false){
|
||||||
super({
|
super({
|
||||||
@@ -26,7 +27,7 @@ class RegexBlockStream extends Transform{
|
|||||||
}
|
}
|
||||||
|
|
||||||
_transform(chunk, encoding, next){
|
_transform(chunk, encoding, next){
|
||||||
chunk = this.readableBuffer.length? this.readableBuffer.join() + chunk: chunk; // Add previous buffer to current chunk
|
chunk = this.readableBuffer.length? this.readableBuffer.join('') + chunk: chunk; // Add previous buffer to current chunk
|
||||||
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
|
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
|
||||||
|
|
||||||
let matches = chunk.match(this.matcher); // Match
|
let matches = chunk.match(this.matcher); // Match
|
||||||
@@ -51,7 +52,7 @@ class RegexBlockStream extends Transform{
|
|||||||
|
|
||||||
_flush(next){
|
_flush(next){
|
||||||
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
|
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
|
||||||
let chunk = this.readableBuffer.toString();
|
let chunk = this.readableBuffer.join('');
|
||||||
let matches = chunk.match(this.matcher); // Match remaining buffer
|
let matches = chunk.match(this.matcher); // Match remaining buffer
|
||||||
_writeMatches(matches); // Write matches including last element
|
_writeMatches(matches); // Write matches including last element
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user