Compare commits
6 Commits
f_influx-c
...
ca3c37be0f
| Author | SHA1 | Date | |
|---|---|---|---|
| ca3c37be0f | |||
| 96b52e63a0 | |||
| 3c5e941cba | |||
| a468d7a57b | |||
| 4ad5eba7e0 | |||
| 2646c9787e |
38
src/main.js
38
src/main.js
@@ -1,10 +1,18 @@
|
|||||||
|
"use strict";
|
||||||
const logger = require("./helper/logger.js")("main");
|
const logger = require("./helper/logger.js")("main");
|
||||||
|
|
||||||
const { requireEnvVars } = require("./helper/env.js");
|
const { requireEnvVars } = require("./helper/env.js");
|
||||||
const { exit } = require("process");
|
const { exit } = require("process");
|
||||||
|
const { exec } = require("./helper/exec.js");
|
||||||
|
|
||||||
const { InfluxDB } = require('@influxdata/influxdb-client');
|
const { InfluxDB } = require('@influxdata/influxdb-client');
|
||||||
const InfluxChecks = require('./helper/influx-checks.js');
|
const InfluxChecks = require('./helper/influx-checks.js');
|
||||||
|
|
||||||
|
const { RegexBlockStream } = require("./streamHandler/RegexBlockStream.js");
|
||||||
|
const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js");
|
||||||
|
const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js");
|
||||||
|
const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js");
|
||||||
|
|
||||||
/// Setup ENVs
|
/// Setup ENVs
|
||||||
const env = process.env;
|
const env = process.env;
|
||||||
// Defaults
|
// Defaults
|
||||||
@@ -46,4 +54,34 @@ if(errorMsg){
|
|||||||
|
|
||||||
logger.info("Influx ok");
|
logger.info("Influx ok");
|
||||||
|
|
||||||
|
logger.info("Starting tcpdump..");
|
||||||
|
const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i"
|
||||||
|
let cmd = `sudo ${TCPDUMP_BASECMD} ${env.WIFI_INTERFACE}`;
|
||||||
|
|
||||||
|
let proc = exec(cmd);
|
||||||
|
logger.debug("Creating & Attaching streams..");
|
||||||
|
proc.stdout
|
||||||
|
.setEncoding("utf8")
|
||||||
|
.pipe(new RegexBlockStream(/^[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}.*(\n( {4,8}|\t\t?).*){1,}\n/gm))
|
||||||
|
.pipe(new PacketStreamFactory())
|
||||||
|
.pipe(new PacketInfluxPointFactory())
|
||||||
|
.pipe(new InfluxPointWriter(influxDb, env.INFLUX_ORG, env.INFLUX_BUCKET));
|
||||||
|
|
||||||
|
logger.debug("Attaching error-logger..");
|
||||||
|
proc.stderr.setEncoding("utf8").on("data", (data) => {
|
||||||
|
logger.error(data);
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.debug("Attaching exit-handler..");
|
||||||
|
proc.on("exit", (code) => {
|
||||||
|
logger.info(`tcpdump exited code: ${code}`);
|
||||||
|
if (code) {
|
||||||
|
logger.fatal(`tcpdump exited with non-zero code: ${code}`);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
logger.info("Shutdown");
|
||||||
|
exit(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info("Startup complete");
|
||||||
})();
|
})();
|
||||||
|
|||||||
@@ -37,13 +37,16 @@ class PacketInfluxPointFactory extends Transform{
|
|||||||
_transform(packet, encoding, next){
|
_transform(packet, encoding, next){
|
||||||
// Create measurements
|
// Create measurements
|
||||||
MEASUREMENT_MAP.forEach((objKey, measurement) => {
|
MEASUREMENT_MAP.forEach((objKey, measurement) => {
|
||||||
if(!Object.keys(packet).includes(objKey)) return;
|
if(packet[objKey] == null) return;
|
||||||
|
|
||||||
let point = new Point(measurement); // Create point
|
let point = new Point(measurement); // Create point
|
||||||
|
|
||||||
// Set tags
|
// Set tags
|
||||||
TAG_LIST.filter(tag => Object.keys(packet).includes(tag))
|
TAG_LIST.filter(tag => Object.keys(packet).includes(tag)) // Filter tags available on object
|
||||||
.forEach(tag => point.tag(tag, packet[tag]));
|
.filter(tag => packet[tag] != null) // Filter tags not falsy on object
|
||||||
|
.forEach(tag => {
|
||||||
|
tagObjectRecursively(point, tag, packet[tag]);
|
||||||
|
});
|
||||||
|
|
||||||
point.setField('value', packet[objKey]); // Set field
|
point.setField('value', packet[objKey]); // Set field
|
||||||
|
|
||||||
@@ -54,6 +57,15 @@ class PacketInfluxPointFactory extends Transform{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function tagObjectRecursively(point, tag, field, suffix = ""){
|
||||||
|
if(typeof(field) == "object"){
|
||||||
|
// TODO: Convert boolean-arrays like "packet.flags" to key: value
|
||||||
|
Object.entries(field).map(([key, value]) => {
|
||||||
|
tagObjectRecursively(point, tag, value, `_${key}${suffix}`);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else point.tag(tag+suffix, field);
|
||||||
|
}
|
||||||
|
|
||||||
/** Mapping for type -> field-method */
|
/** Mapping for type -> field-method */
|
||||||
const POINT_FIELD_TYPE = new Map([
|
const POINT_FIELD_TYPE = new Map([
|
||||||
|
|||||||
Reference in New Issue
Block a user