diff --git a/src/main.js b/src/main.js index aa95a1c..95e1037 100644 --- a/src/main.js +++ b/src/main.js @@ -2,6 +2,7 @@ const logger = require("./helper/logger.js")("main"); const { requireEnvVars } = require("./helper/env.js"); const { exit } = require("process"); +const { InfluxDB } = require('@influxdata/influxdb-client'); /// Setup ENVs const env = process.env; @@ -20,3 +21,8 @@ if(errorMsg){ exit(1); } +(async function() { + logger.info("Setup Influx.."); + const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN}); + +})(); diff --git a/src/streamHandler/InfluxPointWriter.js b/src/streamHandler/InfluxPointWriter.js index 02b0a27..9bd96ad 100644 --- a/src/streamHandler/InfluxPointWriter.js +++ b/src/streamHandler/InfluxPointWriter.js @@ -8,14 +8,16 @@ const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client') class InfluxPointWriter extends Writable{ /** * - * @param {string} url Influx-Url - * @param {string} token Auth-token + * @param {InfluxDB} influxDb InfluxDb * @param {string} org Organization to use * @param {string} bucket Bucket to use - * @param {string} precision Precision to use + * @param {Partial} options Options for WriteApi */ - constructor(url, token, org, bucket, precision = 'us'){ - this._api = new InfluxDB({url, token}).getWriteApi(org, bucket, precision); + constructor(influxDb, org, bucket, options){ + super({ + objectMode: true + }); + this._api = influxDb.getWriteApi(org, bucket, 'us', options); } _write(point, encoding, next){ @@ -25,10 +27,8 @@ class InfluxPointWriter extends Writable{ _flush(next){ this._api.flush(true) - .then( - next, - (err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); } - ); + .catch((err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); }) + .then(next); } } diff --git a/src/streamHandler/PacketInfluxPointFactory.js b/src/streamHandler/PacketInfluxPointFactory.js index 11e502f..8d76878 100644 --- a/src/streamHandler/PacketInfluxPointFactory.js +++ b/src/streamHandler/PacketInfluxPointFactory.js @@ -47,7 +47,7 @@ class PacketInfluxPointFactory extends Transform{ point.setField('value', packet[objKey]); // Set field - this.push(null, point); // Push point into stream + this.push(point); // Push point into stream }); next(); // Get next packet