Compare commits
No commits in common. 'master' and 'code-smell_handling' have entirely different histories.
master
...
code-smell
@ -1,29 +1,13 @@
|
|||||||
# ---- Base ----
|
FROM node:16
|
||||||
FROM alpine:3 AS base
|
|
||||||
|
|
||||||
# Create app directory
|
# Create app directory
|
||||||
WORKDIR /usr/src/app
|
WORKDIR /usr/src/app
|
||||||
|
|
||||||
# Copy project file
|
|
||||||
COPY package.json .
|
|
||||||
|
|
||||||
# Install required apk-packages
|
|
||||||
RUN apk add --no-cache nodejs npm tcpdump
|
|
||||||
|
|
||||||
|
|
||||||
# ---- Dependencies ----
|
|
||||||
FROM base AS dependencies
|
|
||||||
|
|
||||||
# Install app dependencies
|
# Install app dependencies
|
||||||
RUN npm install --only=production
|
COPY package*.json ./
|
||||||
|
RUN npm install
|
||||||
|
|
||||||
# ---- Release ----
|
|
||||||
FROM base AS release
|
|
||||||
|
|
||||||
# copy from build image
|
|
||||||
COPY --from=dependencies /usr/src/app/ ./
|
|
||||||
# Bundle app source
|
# Bundle app source
|
||||||
COPY ./src/ .
|
COPY ./src/ .
|
||||||
|
|
||||||
CMD ["npm", "run", "start"]
|
CMD ["npm", "run"]
|
@ -1,9 +0,0 @@
|
|||||||
TAG="ruakij/rfmon-to-influx"
|
|
||||||
PLATFORM="linux/amd64,linux/arm64/v8,linux/arm/v7"
|
|
||||||
EXTRA_ARGS="$@"
|
|
||||||
|
|
||||||
docker buildx build \
|
|
||||||
--platform $PLATFORM \
|
|
||||||
--tag $TAG \
|
|
||||||
$EXTRA_ARGS \
|
|
||||||
.
|
|
@ -1,7 +0,0 @@
|
|||||||
TAG="ruakij/rfmon-to-influx"
|
|
||||||
EXTRA_ARGS="$@"
|
|
||||||
|
|
||||||
docker build \
|
|
||||||
--tag $TAG \
|
|
||||||
$EXTRA_ARGS \
|
|
||||||
.
|
|
Binary file not shown.
Before Width: | Height: | Size: 147 KiB |
Binary file not shown.
Before Width: | Height: | Size: 14 KiB |
@ -1,46 +0,0 @@
|
|||||||
// This file specifies functions to help a user with e.g. configuration-errors
|
|
||||||
|
|
||||||
function detectStreamData(stream, timeout = 5000){
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
let timeoutHandler;
|
|
||||||
if(timeout){
|
|
||||||
timeoutHandler = setTimeout(() => {
|
|
||||||
reject("timeout");
|
|
||||||
remListeners();
|
|
||||||
},
|
|
||||||
timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
function remListeners(){
|
|
||||||
stream.removeListener("error", errorHandler);
|
|
||||||
stream.removeListener("data", dataHandler);
|
|
||||||
if(timeoutHandler) clearTimeout(timeoutHandler);
|
|
||||||
}
|
|
||||||
|
|
||||||
function errorHandler(err) {
|
|
||||||
remListeners();
|
|
||||||
}
|
|
||||||
function dataHandler(data) {
|
|
||||||
resolve(data);
|
|
||||||
remListeners();
|
|
||||||
}
|
|
||||||
|
|
||||||
stream.on("error", errorHandler);
|
|
||||||
stream.on("data", dataHandler);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function detectStreamsData(streams, timeout = 5000){
|
|
||||||
let promises = [];
|
|
||||||
streams.forEach((stream) => {
|
|
||||||
promises.push(detectStreamData(stream, timeout));
|
|
||||||
});
|
|
||||||
return promises;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Specify exports
|
|
||||||
module.exports = {
|
|
||||||
detectStreamData,
|
|
||||||
detectStreamsData,
|
|
||||||
};
|
|
@ -1,201 +1,87 @@
|
|||||||
"use strict";
|
"use strict";
|
||||||
const logFactory = require("./helper/logger.js");
|
const logger = require("./helper/logger.js")("main");
|
||||||
const logger = logFactory("main");
|
|
||||||
|
|
||||||
const { requireEnvVars } = require("./helper/env.js");
|
const { requireEnvVars } = require("./helper/env.js");
|
||||||
const { exit } = require("process");
|
const { exit } = require("process");
|
||||||
const { exec } = require("./helper/exec.js");
|
const { exec } = require("./helper/exec.js");
|
||||||
const Os = require("os");
|
|
||||||
|
|
||||||
const { InfluxDB } = require("@influxdata/influxdb-client");
|
const { InfluxDB } = require('@influxdata/influxdb-client');
|
||||||
const InfluxChecks = require("./helper/influx-checks.js");
|
const InfluxChecks = require('./helper/influx-checks.js');
|
||||||
|
|
||||||
const { RegexBlockStream } = require("./streamHandler/RegexBlockStream.js");
|
const { RegexBlockStream } = require("./streamHandler/RegexBlockStream.js");
|
||||||
const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js");
|
const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js");
|
||||||
const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js");
|
const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js");
|
||||||
const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js");
|
const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js");
|
||||||
const { InfluxDbLineProtocolWriter } = require("./streamHandler/InfluxDbLineProtocolWriter.js");
|
|
||||||
const { InfluxPointToLineProtoStream } = require("./streamHandler/InfluxPointToLineProtoStream.js");
|
|
||||||
|
|
||||||
const userHelper = require("./helper/userHelper.js");
|
|
||||||
|
|
||||||
|
|
||||||
/// Setup ENVs
|
/// Setup ENVs
|
||||||
const env = process.env;
|
const env = process.env;
|
||||||
// Defaults
|
// Defaults
|
||||||
{
|
{
|
||||||
env.LOGLEVEL ??= "INFO";
|
env.LOGLEVEL ??= "INFO";
|
||||||
env.WIFI_INTERFACE ??= "wlan0";
|
env.WIFI_INTERFACE ??= "wlan0";
|
||||||
env.HOSTNAME ??= Os.hostname();
|
|
||||||
|
|
||||||
env.USE_INFLUXDB_LINEPROTOCOL ??= false;
|
|
||||||
}
|
}
|
||||||
// Required vars
|
// Required vars
|
||||||
let errorMsg = requireEnvVars(
|
let errorMsg = requireEnvVars([
|
||||||
env.USE_INFLUXDB_LINEPROTOCOL? [ // When lineprotocol is enabled, we need host and port
|
"INFLUX_URL", "INFLUX_TOKEN",
|
||||||
"INFLUXDB_LINEPROTOCOL_HOST", "INFLUXDB_LINEPROTOCOL_PORT",
|
"INFLUX_ORG", "INFLUX_BUCKET"
|
||||||
] : [ // When its disabled, influxdb-data
|
]);
|
||||||
"INFLUX_URL", "INFLUX_TOKEN",
|
|
||||||
"INFLUX_ORG", "INFLUX_BUCKET"
|
|
||||||
]);
|
|
||||||
if(errorMsg){
|
if(errorMsg){
|
||||||
logger.fatal(errorMsg);
|
logger.fatal(errorMsg);
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
(async function() {
|
(async function() {
|
||||||
let pointWriter;
|
logger.info("Setup Influx..");
|
||||||
if(!env.USE_INFLUXDB_LINEPROTOCOL){
|
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
|
||||||
logger.info("Setup Influx..");
|
|
||||||
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
|
await InfluxChecks.checkHealth(influxDb)
|
||||||
|
.then((res) => {return InfluxChecks.checkBucket(influxDb, {
|
||||||
await InfluxChecks.checkHealth(influxDb)
|
org: env.INFLUX_ORG,
|
||||||
.then((res) => {return InfluxChecks.checkBucket(influxDb, {
|
name: env.INFLUX_BUCKET
|
||||||
org: env.INFLUX_ORG,
|
})})
|
||||||
name: env.INFLUX_BUCKET
|
.then((res) => {return InfluxChecks.checkWriteApi(influxDb, {
|
||||||
});})
|
org: env.INFLUX_ORG,
|
||||||
.then((res) => {return InfluxChecks.checkWriteApi(influxDb, {
|
bucket: env.INFLUX_BUCKET
|
||||||
org: env.INFLUX_ORG,
|
})})
|
||||||
bucket: env.INFLUX_BUCKET
|
.catch((err) => {
|
||||||
});})
|
if(err) {
|
||||||
.catch((err) => {
|
logger.error("Error whilst checking influx:");
|
||||||
if(err) {
|
logger.error(err);
|
||||||
logger.error("Error whilst checking influx:");
|
}
|
||||||
logger.error(err);
|
logger.fatal("Setup influx failed!");
|
||||||
}
|
exit(1);
|
||||||
logger.fatal("Setup influx failed!");
|
|
||||||
exit(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.debug("Get WriteApi & set default-hostname to", `'${env.HOSTNAME}'`);
|
|
||||||
const influxWriteApi = influxDb.getWriteApi(env.INFLUX_ORG, env.INFLUX_BUCKET, "us");
|
|
||||||
//influxWriteApi.useDefaultTags({"hostname": env.HOSTNAME});
|
|
||||||
|
|
||||||
pointWriter = new InfluxPointWriter(influxWriteApi);
|
|
||||||
|
|
||||||
logger.info("Influx ok");
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
logger.info("Setup Influxdb-LineProtocol..");
|
|
||||||
|
|
||||||
let lineProtocolWriter = new InfluxDbLineProtocolWriter(env.INFLUXDB_LINEPROTOCOL_HOST, env.INFLUXDB_LINEPROTOCOL_PORT);
|
|
||||||
|
|
||||||
logger.debug("Create PointToLineProto and pipe to LineProtocolWriter");
|
|
||||||
pointWriter = new InfluxPointToLineProtoStream();
|
|
||||||
pointWriter
|
|
||||||
.setEncoding("utf8")
|
|
||||||
.pipe(lineProtocolWriter);
|
|
||||||
|
|
||||||
logger.debug("Waiting for connection..");
|
|
||||||
await new Promise((resolve, reject) => {
|
|
||||||
lineProtocolWriter.once("connect", () => {
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
lineProtocolWriter.once("error", (err) => {
|
|
||||||
reject(err);
|
|
||||||
});
|
|
||||||
setTimeout(() => { // After timeout, reject promise
|
|
||||||
reject("Timeout whilst waiting to connect");
|
|
||||||
}, 6500);
|
|
||||||
})
|
|
||||||
.then(() => {
|
|
||||||
logger.info("Influxdb-LineProtocol ok");
|
|
||||||
})
|
|
||||||
.catch((err) => {
|
|
||||||
if(err) {
|
|
||||||
logger.error("Error whilst checking Influxdb-LineProtocol:");
|
|
||||||
logger.error(err);
|
|
||||||
}
|
|
||||||
logger.fatal("Setup Influxdb-LineProtocol failed!");
|
|
||||||
exit(1);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Starting tcpdump..");
|
|
||||||
const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i";
|
|
||||||
let cmd = `${TCPDUMP_BASECMD} ${env.WIFI_INTERFACE}`;
|
|
||||||
|
|
||||||
let proc = exec(cmd);
|
|
||||||
logger.debug("Creating & Attaching streams..");
|
|
||||||
let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm);
|
|
||||||
let packetStreamFactory = new PacketStreamFactory();
|
|
||||||
let packetInfluxPointFactory = new PacketInfluxPointFactory();
|
|
||||||
proc.stdout
|
|
||||||
.setEncoding("utf8")
|
|
||||||
.pipe(regexBlockStream)
|
|
||||||
.pipe(packetStreamFactory)
|
|
||||||
.pipe(packetInfluxPointFactory)
|
|
||||||
.pipe(pointWriter);
|
|
||||||
|
|
||||||
logger.debug("Attaching error-logger..");
|
|
||||||
const loggerTcpdump = logFactory("tcpdump");
|
|
||||||
let linkTypeId;
|
|
||||||
proc.stderr.setEncoding("utf8").on("data", (data) => {
|
|
||||||
if(data.match(/^(tcpdump: )?listening on /i) || data.match(/^\d+ packets captured/i)) { // Catch start-error
|
|
||||||
loggerTcpdump.debug(data);
|
|
||||||
|
|
||||||
if(!linkTypeId && data.match(/^(tcpdump: )?listening on/i)){ // Grab first data containing listen-info if proper header was found
|
|
||||||
const linkType = data.match(/((?<=link-type ))([a-z].*?) \(.*?\)(?=,)/i)[0];
|
|
||||||
const linkTypeData = linkType.match(/(\S*) (.*)/i);
|
|
||||||
linkTypeId = linkTypeData[1];
|
|
||||||
const linkTypeDetail = linkTypeData[2];
|
|
||||||
|
|
||||||
if(linkTypeId !== "IEEE802_11_RADIO"){
|
|
||||||
logger.error(`Interface not in Monitor-mode! (Expected 'IEEE802_11_RADIO', but got '${linkTypeId}')`);
|
|
||||||
shutdown(1, "SIGKILL");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else loggerTcpdump.error(data);
|
|
||||||
});
|
|
||||||
|
|
||||||
// FIXME: This is a hacky workaround to not let errors from subprocess bubble up and terminate our process
|
|
||||||
regexBlockStream.on("error", (err) => {});
|
|
||||||
|
|
||||||
proc.on("error", (err) => {
|
|
||||||
loggerTcpdump.error(err);
|
|
||||||
});
|
|
||||||
|
|
||||||
const loggerPacketStream = logFactory("PacketStreamFactory");
|
|
||||||
userHelper.detectStreamData(proc.stdout, 10000) // Expect tcpdump-logs to have data after max. 10s
|
|
||||||
.then(() => {
|
|
||||||
loggerTcpdump.debug("Got first data");
|
|
||||||
userHelper.detectStreamData(packetStreamFactory, 10000) // Expect then to have packets after further 10s
|
|
||||||
.then(() => {
|
|
||||||
loggerPacketStream.debug("Got first packet");
|
|
||||||
})
|
|
||||||
.catch((err) => {
|
|
||||||
if(err == "timeout") loggerPacketStream.warn("No packets");
|
|
||||||
});
|
|
||||||
})
|
|
||||||
.catch((err) => {
|
|
||||||
if(err == "timeout") loggerTcpdump.warn("No data after 10s! Wrong configuration?");
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.debug("Attaching exit-handler..");
|
|
||||||
proc.on("exit", (code) => {
|
|
||||||
loggerTcpdump.debug(`tcpdump exited code: ${code}`);
|
|
||||||
if (code) {
|
|
||||||
loggerTcpdump.fatal(`tcpdump exited with non-zero code: ${code}`);
|
|
||||||
if(!exitCode) exitCode = 1; // When exitCode is 0, set to 1
|
|
||||||
}
|
|
||||||
logger.info("Shutdown");
|
|
||||||
exit(exitCode);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// Handle stop-signals for graceful shutdown
|
logger.info("Influx ok");
|
||||||
var exitCode = 0;
|
|
||||||
function shutdownReq() {
|
logger.info("Starting tcpdump..");
|
||||||
logger.info("Shutdown request received..");
|
const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i"
|
||||||
shutdown();
|
let cmd = `sudo ${TCPDUMP_BASECMD} ${env.WIFI_INTERFACE}`;
|
||||||
}
|
|
||||||
function shutdown(code, signal = "SIGTERM"){
|
let proc = exec(cmd);
|
||||||
if(code) exitCode = code;
|
logger.debug("Creating & Attaching streams..");
|
||||||
logger.debug("Stopping subprocess tcpdump, then exiting myself..");
|
proc.stdout
|
||||||
proc.kill(signal); // Kill process, then upper event-handler will stop self
|
.setEncoding("utf8")
|
||||||
|
.pipe(new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm))
|
||||||
|
.pipe(new PacketStreamFactory())
|
||||||
|
.pipe(new PacketInfluxPointFactory())
|
||||||
|
.pipe(new InfluxPointWriter(influxDb, env.INFLUX_ORG, env.INFLUX_BUCKET));
|
||||||
|
|
||||||
|
logger.debug("Attaching error-logger..");
|
||||||
|
proc.stderr.setEncoding("utf8").on("data", (data) => {
|
||||||
|
logger.error(data);
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.debug("Attaching exit-handler..");
|
||||||
|
proc.on("exit", (code) => {
|
||||||
|
logger.info(`tcpdump exited code: ${code}`);
|
||||||
|
if (code) {
|
||||||
|
logger.fatal(`tcpdump exited with non-zero code: ${code}`);
|
||||||
|
exit(1);
|
||||||
}
|
}
|
||||||
process.on("SIGTERM", shutdownReq);
|
logger.info("Shutdown");
|
||||||
process.on("SIGINT", shutdownReq);
|
exit(0);
|
||||||
|
});
|
||||||
|
|
||||||
logger.info("Startup complete");
|
logger.info("Startup complete");
|
||||||
})();
|
})();
|
||||||
|
@ -1,72 +0,0 @@
|
|||||||
const logger = require.main.require("./helper/logger.js")("InfluxDbLineProtocolWriter");
|
|
||||||
const net = require("net");
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get points and write them into influx
|
|
||||||
*/
|
|
||||||
class InfluxDbLineProtocolWriter extends net.Socket{
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param {string} host Host of line-server
|
|
||||||
* @param {string} port Port of line-server
|
|
||||||
* @param {object} options Options for further configuration
|
|
||||||
*/
|
|
||||||
constructor(host, port, options = {}) {
|
|
||||||
super();
|
|
||||||
|
|
||||||
this._host = host;
|
|
||||||
this._port = port;
|
|
||||||
|
|
||||||
// options defaults
|
|
||||||
options.autoConnect ??= true;
|
|
||||||
options.timeout ??= 5000;
|
|
||||||
options.autoReconnect ??= true;
|
|
||||||
options.autoReconnectBackoffTime ??= 3000;
|
|
||||||
this._options = options;
|
|
||||||
|
|
||||||
this._isConnected = false;
|
|
||||||
|
|
||||||
super.setKeepAlive(true, 5000);
|
|
||||||
|
|
||||||
// Register auto-Reconnect if enabled
|
|
||||||
if(this._options.autoReconnect){
|
|
||||||
this.on("connect", () => {
|
|
||||||
logger.debug("Connection established!");
|
|
||||||
this._isConnected = true;
|
|
||||||
|
|
||||||
if(this._autoReconnectTimeout)
|
|
||||||
clearInterval(this._autoReconnectTimeout);
|
|
||||||
this._autoReconnectTimeout = 0;
|
|
||||||
});
|
|
||||||
|
|
||||||
this.on("error", (err) => {
|
|
||||||
logger.error(err.code, "TCP ERROR");
|
|
||||||
this._isConnected = false;
|
|
||||||
|
|
||||||
if(!this._autoReconnectTimeout)
|
|
||||||
this._autoReconnectTimeout = setInterval(() => {
|
|
||||||
this.connect();
|
|
||||||
},
|
|
||||||
this._options.autoReconnectBackoffTime);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Autoconnect if requested
|
|
||||||
if(this._options.autoConnect) this.connect();
|
|
||||||
}
|
|
||||||
|
|
||||||
get host(){ return this._host; }
|
|
||||||
get port(){ return this._port; }
|
|
||||||
|
|
||||||
get isConnected(){ return this._isConnected; }
|
|
||||||
|
|
||||||
connect(){
|
|
||||||
logger.debug("Connecting..");
|
|
||||||
super.connect(this._port, this._host);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Specify exports
|
|
||||||
module.exports = {
|
|
||||||
InfluxDbLineProtocolWriter
|
|
||||||
};
|
|
@ -1,22 +0,0 @@
|
|||||||
const logger = require.main.require("./helper/logger.js")("InfluxPointToLineProtoStream");
|
|
||||||
const { Transform } = require("stream");
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get points and converts them to Line-protocol
|
|
||||||
*/
|
|
||||||
class InfluxPointToLineProtoStream extends Transform{
|
|
||||||
constructor(){
|
|
||||||
super({
|
|
||||||
writableObjectMode: true
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
_transform(point, encoding, next){
|
|
||||||
next(null, point.toLineProtocol() +"\n");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Specify exports
|
|
||||||
module.exports = {
|
|
||||||
InfluxPointToLineProtoStream
|
|
||||||
};
|
|
Loading…
Reference in New Issue