From d7a9530b68e924a26a7f55bf9bed54ea4260a2e7 Mon Sep 17 00:00:00 2001 From: Ruakij Date: Fri, 26 Nov 2021 17:31:31 +0100 Subject: [PATCH 1/5] Cleaned up Promise --- src/streamHandler/InfluxPointWriter.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/streamHandler/InfluxPointWriter.js b/src/streamHandler/InfluxPointWriter.js index 02b0a27..9a57136 100644 --- a/src/streamHandler/InfluxPointWriter.js +++ b/src/streamHandler/InfluxPointWriter.js @@ -25,10 +25,8 @@ class InfluxPointWriter extends Writable{ _flush(next){ this._api.flush(true) - .then( - next, - (err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); } - ); + .catch((err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); }) + .then(next); } } From 2a662e0bd1f7c594d031bbb8e5906f69d12e880f Mon Sep 17 00:00:00 2001 From: Ruakij Date: Fri, 26 Nov 2021 17:32:31 +0100 Subject: [PATCH 2/5] Changed constructor to take influxDb --- src/streamHandler/InfluxPointWriter.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/streamHandler/InfluxPointWriter.js b/src/streamHandler/InfluxPointWriter.js index 9a57136..fbf64cc 100644 --- a/src/streamHandler/InfluxPointWriter.js +++ b/src/streamHandler/InfluxPointWriter.js @@ -8,14 +8,12 @@ const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client') class InfluxPointWriter extends Writable{ /** * - * @param {string} url Influx-Url - * @param {string} token Auth-token + * @param {InfluxDB} influxDb InfluxDb * @param {string} org Organization to use * @param {string} bucket Bucket to use - * @param {string} precision Precision to use + * @param {Partial} options Options for WriteApi */ - constructor(url, token, org, bucket, precision = 'us'){ - this._api = new InfluxDB({url, token}).getWriteApi(org, bucket, precision); + this._api = influxDb.getWriteApi(org, bucket, 'us', options); } _write(point, encoding, next){ From 3af4bb7cc689ea20685736f04f1bcbfe7f2bcaf5 Mon Sep 17 00:00:00 2001 From: Ruakij Date: Fri, 26 Nov 2021 17:33:07 +0100 Subject: [PATCH 3/5] Fixed wrong push --- src/streamHandler/PacketInfluxPointFactory.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/streamHandler/PacketInfluxPointFactory.js b/src/streamHandler/PacketInfluxPointFactory.js index 11e502f..8d76878 100644 --- a/src/streamHandler/PacketInfluxPointFactory.js +++ b/src/streamHandler/PacketInfluxPointFactory.js @@ -47,7 +47,7 @@ class PacketInfluxPointFactory extends Transform{ point.setField('value', packet[objKey]); // Set field - this.push(null, point); // Push point into stream + this.push(point); // Push point into stream }); next(); // Get next packet From 44cd3288cf95ef862de8c22f2c909e0b8b3ca721 Mon Sep 17 00:00:00 2001 From: Ruakij Date: Fri, 26 Nov 2021 17:33:19 +0100 Subject: [PATCH 4/5] Fixed stram not being in object-mode --- src/streamHandler/InfluxPointWriter.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/streamHandler/InfluxPointWriter.js b/src/streamHandler/InfluxPointWriter.js index fbf64cc..9bd96ad 100644 --- a/src/streamHandler/InfluxPointWriter.js +++ b/src/streamHandler/InfluxPointWriter.js @@ -13,6 +13,10 @@ class InfluxPointWriter extends Writable{ * @param {string} bucket Bucket to use * @param {Partial} options Options for WriteApi */ + constructor(influxDb, org, bucket, options){ + super({ + objectMode: true + }); this._api = influxDb.getWriteApi(org, bucket, 'us', options); } From d10e9bb2c67dd4b3c1b0c55185722fc79c2ae8fb Mon Sep 17 00:00:00 2001 From: Ruakij Date: Fri, 26 Nov 2021 17:43:42 +0100 Subject: [PATCH 5/5] Create influx-client --- src/main.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main.js b/src/main.js index aa95a1c..95e1037 100644 --- a/src/main.js +++ b/src/main.js @@ -2,6 +2,7 @@ const logger = require("./helper/logger.js")("main"); const { requireEnvVars } = require("./helper/env.js"); const { exit } = require("process"); +const { InfluxDB } = require('@influxdata/influxdb-client'); /// Setup ENVs const env = process.env; @@ -20,3 +21,8 @@ if(errorMsg){ exit(1); } +(async function() { + logger.info("Setup Influx.."); + const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN}); + +})();