Implement connection-checking

f_influxdb-line-protocol
Ruakij 3 years ago
parent c09c6c29fb
commit 2f84bb4408

@ -15,6 +15,7 @@ const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js"
const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js"); const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js");
const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js"); const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js");
const { InfluxDbLineProtocolWriter } = require("./streamHandler/InfluxDbLineProtocolWriter.js"); const { InfluxDbLineProtocolWriter } = require("./streamHandler/InfluxDbLineProtocolWriter.js");
const { InfluxPointToLineProtoStream } = require("./streamHandler/InfluxPointToLineProtoStream.js");
const userHelper = require("./helper/userHelper.js"); const userHelper = require("./helper/userHelper.js");
@ -77,6 +78,37 @@ if(errorMsg){
else { else {
logger.info("Setup Influxdb-LineProtocol.."); logger.info("Setup Influxdb-LineProtocol..");
let lineProtocolWriter = new InfluxDbLineProtocolWriter(env.INFLUXDB_LINEPROTOCOL_HOST, env.INFLUXDB_LINEPROTOCOL_PORT);
logger.debug("Create PointToLineProto and pipe to LineProtocolWriter");
pointWriter = new InfluxPointToLineProtoStream();
pointWriter
.setEncoding("utf8")
.pipe(lineProtocolWriter);
logger.debug("Waiting for connection..");
await new Promise((resolve, reject) => {
lineProtocolWriter.once("connect", () => {
resolve();
});
lineProtocolWriter.once("error", (err) => {
reject(err);
});
setTimeout(() => { // After timeout, reject promise
reject("Timeout whilst waiting to connect");
}, 6500);
})
.then(() => {
logger.info("Influxdb-LineProtocol ok");
})
.catch((err) => {
if(err) {
logger.error("Error whilst checking Influxdb-LineProtocol:");
logger.error(err);
}
logger.fatal("Setup Influxdb-LineProtocol failed!");
exit(1);
});
} }
logger.info("Starting tcpdump.."); logger.info("Starting tcpdump..");
@ -88,13 +120,12 @@ if(errorMsg){
let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm); let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm);
let packetStreamFactory = new PacketStreamFactory(); let packetStreamFactory = new PacketStreamFactory();
let packetInfluxPointFactory = new PacketInfluxPointFactory(); let packetInfluxPointFactory = new PacketInfluxPointFactory();
let influxPointWriter = new InfluxPointWriter(influxWriteApi);
proc.stdout proc.stdout
.setEncoding("utf8") .setEncoding("utf8")
.pipe(regexBlockStream) .pipe(regexBlockStream)
.pipe(packetStreamFactory) .pipe(packetStreamFactory)
.pipe(packetInfluxPointFactory) .pipe(packetInfluxPointFactory)
.pipe(influxPointWriter); .pipe(pointWriter);
logger.debug("Attaching error-logger.."); logger.debug("Attaching error-logger..");
const loggerTcpdump = logFactory("tcpdump"); const loggerTcpdump = logFactory("tcpdump");

Loading…
Cancel
Save