diff --git a/src/helper/userHelper.js b/src/helper/userHelper.js new file mode 100644 index 0000000..50a1624 --- /dev/null +++ b/src/helper/userHelper.js @@ -0,0 +1,46 @@ +// This file specifies functions to help a user with e.g. configuration-errors + +function detectStreamData(stream, timeout = 5000){ + return new Promise((resolve, reject) => { + let timeoutHandler; + if(timeout){ + timeoutHandler = setTimeout(() => { + reject('timeout'); + remListeners(); + }, + timeout); + } + + function remListeners(){ + stream.removeListener('error', errorHandler); + stream.removeListener('data', dataHandler); + if(timeoutHandler) clearTimeout(timeoutHandler); + } + + function errorHandler(err) { + remListeners(); + } + function dataHandler(data) { + resolve(data); + remListeners(); + } + + stream.on('error', errorHandler); + stream.on('data', dataHandler); + }); +} + +function detectStreamsData(streams, timeout = 5000){ + let promises = []; + streams.forEach((stream) => { + promises.push(detectStreamData(stream, timeout)); + }) + return promises; +} + + +// Specify exports +module.exports = { + detectStreamData, + detectStreamsData, +}; \ No newline at end of file diff --git a/src/main.js b/src/main.js index 512b8a4..2a0a0a6 100644 --- a/src/main.js +++ b/src/main.js @@ -14,6 +14,9 @@ const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js" const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js"); const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js"); +const userHelper = require("./helper/userHelper.js"); + + /// Setup ENVs const env = process.env; // Defaults @@ -62,12 +65,15 @@ if(errorMsg){ let proc = exec(cmd); logger.debug("Creating & Attaching streams.."); let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm); + let packetStreamFactory = new PacketStreamFactory(); + let packetInfluxPointFactory = new PacketInfluxPointFactory(); + let influxPointWriter = new InfluxPointWriter(influxDb, env.INFLUX_ORG, env.INFLUX_BUCKET); proc.stdout .setEncoding("utf8") .pipe(regexBlockStream) - .pipe(new PacketStreamFactory()) - .pipe(new PacketInfluxPointFactory()) - .pipe(new InfluxPointWriter(influxDb, env.INFLUX_ORG, env.INFLUX_BUCKET)); + .pipe(packetStreamFactory) + .pipe(packetInfluxPointFactory) + .pipe(influxPointWriter); logger.debug("Attaching error-logger.."); const loggerTcpdump = logFactory("tcpdump"); @@ -84,6 +90,22 @@ if(errorMsg){ loggerTcpdump.error(err); }); + const loggerPacketStream = logFactory("PacketStreamFactory"); + userHelper.detectStreamData(proc.stdout, 10000) // Expect tcpdump-logs to have data after max. 10s + .then(() => { + loggerTcpdump.debug("Got first data"); + userHelper.detectStreamData(packetStreamFactory, 10000) // Expect then to have packets after further 10s + .then(() => { + loggerPacketStream.debug("Got first packet"); + }) + .catch((err) => { + if(err == 'timeout') loggerPacketStream.warn("No packets"); + }); + }) + .catch((err) => { + if(err == 'timeout') loggerTcpdump.warn("No data after 10s! Wrong configuration?"); + }); + logger.debug("Attaching exit-handler.."); proc.on("exit", (code) => { loggerTcpdump.debug(`tcpdump exited code: ${code}`);