Compare commits
6 Commits
f_influx-c
...
ca3c37be0f
| Author | SHA1 | Date | |
|---|---|---|---|
| ca3c37be0f | |||
| 96b52e63a0 | |||
| 3c5e941cba | |||
| a468d7a57b | |||
| 4ad5eba7e0 | |||
| 2646c9787e |
38
src/main.js
38
src/main.js
@@ -1,10 +1,18 @@
|
||||
"use strict";
|
||||
const logger = require("./helper/logger.js")("main");
|
||||
|
||||
const { requireEnvVars } = require("./helper/env.js");
|
||||
const { exit } = require("process");
|
||||
const { exec } = require("./helper/exec.js");
|
||||
|
||||
const { InfluxDB } = require('@influxdata/influxdb-client');
|
||||
const InfluxChecks = require('./helper/influx-checks.js');
|
||||
|
||||
const { RegexBlockStream } = require("./streamHandler/RegexBlockStream.js");
|
||||
const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js");
|
||||
const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js");
|
||||
const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js");
|
||||
|
||||
/// Setup ENVs
|
||||
const env = process.env;
|
||||
// Defaults
|
||||
@@ -46,4 +54,34 @@ if(errorMsg){
|
||||
|
||||
logger.info("Influx ok");
|
||||
|
||||
logger.info("Starting tcpdump..");
|
||||
const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i"
|
||||
let cmd = `sudo ${TCPDUMP_BASECMD} ${env.WIFI_INTERFACE}`;
|
||||
|
||||
let proc = exec(cmd);
|
||||
logger.debug("Creating & Attaching streams..");
|
||||
proc.stdout
|
||||
.setEncoding("utf8")
|
||||
.pipe(new RegexBlockStream(/^[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}.*(\n( {4,8}|\t\t?).*){1,}\n/gm))
|
||||
.pipe(new PacketStreamFactory())
|
||||
.pipe(new PacketInfluxPointFactory())
|
||||
.pipe(new InfluxPointWriter(influxDb, env.INFLUX_ORG, env.INFLUX_BUCKET));
|
||||
|
||||
logger.debug("Attaching error-logger..");
|
||||
proc.stderr.setEncoding("utf8").on("data", (data) => {
|
||||
logger.error(data);
|
||||
});
|
||||
|
||||
logger.debug("Attaching exit-handler..");
|
||||
proc.on("exit", (code) => {
|
||||
logger.info(`tcpdump exited code: ${code}`);
|
||||
if (code) {
|
||||
logger.fatal(`tcpdump exited with non-zero code: ${code}`);
|
||||
exit(1);
|
||||
}
|
||||
logger.info("Shutdown");
|
||||
exit(0);
|
||||
});
|
||||
|
||||
logger.info("Startup complete");
|
||||
})();
|
||||
|
||||
@@ -37,13 +37,16 @@ class PacketInfluxPointFactory extends Transform{
|
||||
_transform(packet, encoding, next){
|
||||
// Create measurements
|
||||
MEASUREMENT_MAP.forEach((objKey, measurement) => {
|
||||
if(!Object.keys(packet).includes(objKey)) return;
|
||||
if(packet[objKey] == null) return;
|
||||
|
||||
let point = new Point(measurement); // Create point
|
||||
|
||||
// Set tags
|
||||
TAG_LIST.filter(tag => Object.keys(packet).includes(tag))
|
||||
.forEach(tag => point.tag(tag, packet[tag]));
|
||||
TAG_LIST.filter(tag => Object.keys(packet).includes(tag)) // Filter tags available on object
|
||||
.filter(tag => packet[tag] != null) // Filter tags not falsy on object
|
||||
.forEach(tag => {
|
||||
tagObjectRecursively(point, tag, packet[tag]);
|
||||
});
|
||||
|
||||
point.setField('value', packet[objKey]); // Set field
|
||||
|
||||
@@ -54,6 +57,15 @@ class PacketInfluxPointFactory extends Transform{
|
||||
}
|
||||
}
|
||||
|
||||
function tagObjectRecursively(point, tag, field, suffix = ""){
|
||||
if(typeof(field) == "object"){
|
||||
// TODO: Convert boolean-arrays like "packet.flags" to key: value
|
||||
Object.entries(field).map(([key, value]) => {
|
||||
tagObjectRecursively(point, tag, value, `_${key}${suffix}`);
|
||||
});
|
||||
}
|
||||
else point.tag(tag+suffix, field);
|
||||
}
|
||||
|
||||
/** Mapping for type -> field-method */
|
||||
const POINT_FIELD_TYPE = new Map([
|
||||
|
||||
Reference in New Issue
Block a user