diff --git a/src/main.js b/src/main.js index fd2199e..3382e03 100644 --- a/src/main.js +++ b/src/main.js @@ -15,6 +15,7 @@ const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js" const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js"); const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js"); const { InfluxDbLineProtocolWriter } = require("./streamHandler/InfluxDbLineProtocolWriter.js"); +const { InfluxPointToLineProtoStream } = require("./streamHandler/InfluxPointToLineProtoStream.js"); const userHelper = require("./helper/userHelper.js"); @@ -77,6 +78,37 @@ if(errorMsg){ else { logger.info("Setup Influxdb-LineProtocol.."); + let lineProtocolWriter = new InfluxDbLineProtocolWriter(env.INFLUXDB_LINEPROTOCOL_HOST, env.INFLUXDB_LINEPROTOCOL_PORT); + + logger.debug("Create PointToLineProto and pipe to LineProtocolWriter"); + pointWriter = new InfluxPointToLineProtoStream(); + pointWriter + .setEncoding("utf8") + .pipe(lineProtocolWriter); + + logger.debug("Waiting for connection.."); + await new Promise((resolve, reject) => { + lineProtocolWriter.once("connect", () => { + resolve(); + }); + lineProtocolWriter.once("error", (err) => { + reject(err); + }); + setTimeout(() => { // After timeout, reject promise + reject("Timeout whilst waiting to connect"); + }, 6500); + }) + .then(() => { + logger.info("Influxdb-LineProtocol ok"); + }) + .catch((err) => { + if(err) { + logger.error("Error whilst checking Influxdb-LineProtocol:"); + logger.error(err); + } + logger.fatal("Setup Influxdb-LineProtocol failed!"); + exit(1); + }); } logger.info("Starting tcpdump.."); @@ -88,13 +120,12 @@ if(errorMsg){ let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm); let packetStreamFactory = new PacketStreamFactory(); let packetInfluxPointFactory = new PacketInfluxPointFactory(); - let influxPointWriter = new InfluxPointWriter(influxWriteApi); proc.stdout .setEncoding("utf8") .pipe(regexBlockStream) .pipe(packetStreamFactory) .pipe(packetInfluxPointFactory) - .pipe(influxPointWriter); + .pipe(pointWriter); logger.debug("Attaching error-logger.."); const loggerTcpdump = logFactory("tcpdump");