From 450f162cda0ebce6aa9e03d51400039ec1dd5b2e Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 25 Nov 2021 03:13:05 +0100 Subject: [PATCH] Implemented InfluxPointWriter for writing into influx with stream --- src/streamHandler/InfluxPointWriter.js | 38 ++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 src/streamHandler/InfluxPointWriter.js diff --git a/src/streamHandler/InfluxPointWriter.js b/src/streamHandler/InfluxPointWriter.js new file mode 100644 index 0000000..808e8d6 --- /dev/null +++ b/src/streamHandler/InfluxPointWriter.js @@ -0,0 +1,38 @@ +const logger = require.main.require("./helper/logger.js")("InfluxPointWriter"); +const { Writeable } = require('stream'); +const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client') + +/** + * Get points and write them into influx + */ +class InfluxPointWriter extends Writeable{ + /** + * + * @param {string} url Influx-Url + * @param {string} token Auth-token + * @param {string} org Organization to use + * @param {string} bucket Bucket to use + * @param {string} precision Precision to use + */ + constructor(url, token, org, bucket, precision = 'us'){ + this._api = new InfluxDB({url, token}).getWriteApi(org, bucket, precision); + } + + _write(point, encoding, next){ + this._api.writePoint(point); + next(); + } + + _flush(next){ + this._api.flush(true) + .then( + next, + (err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); } + ); + } +} + +// Specify exports +module.exports = { + InfluxPointWriter +}; \ No newline at end of file