Compare commits

...

7 Commits

Author SHA1 Message Date
d10e9bb2c6 Create influx-client 2021-11-26 17:43:42 +01:00
44cd3288cf Fixed stram not being in object-mode 2021-11-26 17:33:19 +01:00
3af4bb7cc6 Fixed wrong push 2021-11-26 17:33:07 +01:00
2a662e0bd1 Changed constructor to take influxDb 2021-11-26 17:32:31 +01:00
d7a9530b68 Cleaned up Promise 2021-11-26 17:31:31 +01:00
bb3d843895 Fixed wrong joining 2021-11-25 18:42:45 +01:00
9472ed9198 Fix wrong usage of join()
Default splitter is ',' but we want nothing
2021-11-25 18:42:26 +01:00
4 changed files with 18 additions and 12 deletions

View File

@ -2,6 +2,7 @@ const logger = require("./helper/logger.js")("main");
const { requireEnvVars } = require("./helper/env.js"); const { requireEnvVars } = require("./helper/env.js");
const { exit } = require("process"); const { exit } = require("process");
const { InfluxDB } = require('@influxdata/influxdb-client');
/// Setup ENVs /// Setup ENVs
const env = process.env; const env = process.env;
@ -20,3 +21,8 @@ if(errorMsg){
exit(1); exit(1);
} }
(async function() {
logger.info("Setup Influx..");
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
})();

View File

@ -8,14 +8,16 @@ const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client')
class InfluxPointWriter extends Writable{ class InfluxPointWriter extends Writable{
/** /**
* *
* @param {string} url Influx-Url * @param {InfluxDB} influxDb InfluxDb
* @param {string} token Auth-token
* @param {string} org Organization to use * @param {string} org Organization to use
* @param {string} bucket Bucket to use * @param {string} bucket Bucket to use
* @param {string} precision Precision to use * @param {Partial<WriteOptions>} options Options for WriteApi
*/ */
constructor(url, token, org, bucket, precision = 'us'){ constructor(influxDb, org, bucket, options){
this._api = new InfluxDB({url, token}).getWriteApi(org, bucket, precision); super({
objectMode: true
});
this._api = influxDb.getWriteApi(org, bucket, 'us', options);
} }
_write(point, encoding, next){ _write(point, encoding, next){
@ -25,10 +27,8 @@ class InfluxPointWriter extends Writable{
_flush(next){ _flush(next){
this._api.flush(true) this._api.flush(true)
.then( .catch((err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); })
next, .then(next);
(err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); }
);
} }
} }

View File

@ -47,7 +47,7 @@ class PacketInfluxPointFactory extends Transform{
point.setField('value', packet[objKey]); // Set field point.setField('value', packet[objKey]); // Set field
this.push(null, point); // Push point into stream this.push(point); // Push point into stream
}); });
next(); // Get next packet next(); // Get next packet

View File

@ -27,7 +27,7 @@ class RegexBlockStream extends Transform{
} }
_transform(chunk, encoding, next){ _transform(chunk, encoding, next){
chunk = this.readableBuffer.length? this.readableBuffer.join() + chunk: chunk; // Add previous buffer to current chunk chunk = this.readableBuffer.length? this.readableBuffer.join('') + chunk: chunk; // Add previous buffer to current chunk
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
let matches = chunk.match(this.matcher); // Match let matches = chunk.match(this.matcher); // Match
@ -52,7 +52,7 @@ class RegexBlockStream extends Transform{
_flush(next){ _flush(next){
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
let chunk = this.readableBuffer.toString(); let chunk = this.readableBuffer.join('');
let matches = chunk.match(this.matcher); // Match remaining buffer let matches = chunk.match(this.matcher); // Match remaining buffer
_writeMatches(matches); // Write matches including last element _writeMatches(matches); // Write matches including last element
} }