Compare commits
No commits in common. "d10e9bb2c67dd4b3c1b0c55185722fc79c2ae8fb" and "dcd0ce8111613025aa3ae9152a62e7c6cff5d1c6" have entirely different histories.
d10e9bb2c6
...
dcd0ce8111
@ -2,7 +2,6 @@ const logger = require("./helper/logger.js")("main");
|
||||
|
||||
const { requireEnvVars } = require("./helper/env.js");
|
||||
const { exit } = require("process");
|
||||
const { InfluxDB } = require('@influxdata/influxdb-client');
|
||||
|
||||
/// Setup ENVs
|
||||
const env = process.env;
|
||||
@ -21,8 +20,3 @@ if(errorMsg){
|
||||
exit(1);
|
||||
}
|
||||
|
||||
(async function() {
|
||||
logger.info("Setup Influx..");
|
||||
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
|
||||
|
||||
})();
|
||||
|
@ -8,16 +8,14 @@ const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client')
|
||||
class InfluxPointWriter extends Writable{
|
||||
/**
|
||||
*
|
||||
* @param {InfluxDB} influxDb InfluxDb
|
||||
* @param {string} url Influx-Url
|
||||
* @param {string} token Auth-token
|
||||
* @param {string} org Organization to use
|
||||
* @param {string} bucket Bucket to use
|
||||
* @param {Partial<WriteOptions>} options Options for WriteApi
|
||||
* @param {string} precision Precision to use
|
||||
*/
|
||||
constructor(influxDb, org, bucket, options){
|
||||
super({
|
||||
objectMode: true
|
||||
});
|
||||
this._api = influxDb.getWriteApi(org, bucket, 'us', options);
|
||||
constructor(url, token, org, bucket, precision = 'us'){
|
||||
this._api = new InfluxDB({url, token}).getWriteApi(org, bucket, precision);
|
||||
}
|
||||
|
||||
_write(point, encoding, next){
|
||||
@ -27,8 +25,10 @@ class InfluxPointWriter extends Writable{
|
||||
|
||||
_flush(next){
|
||||
this._api.flush(true)
|
||||
.catch((err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); })
|
||||
.then(next);
|
||||
.then(
|
||||
next,
|
||||
(err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -47,7 +47,7 @@ class PacketInfluxPointFactory extends Transform{
|
||||
|
||||
point.setField('value', packet[objKey]); // Set field
|
||||
|
||||
this.push(point); // Push point into stream
|
||||
this.push(null, point); // Push point into stream
|
||||
});
|
||||
|
||||
next(); // Get next packet
|
||||
|
@ -27,7 +27,7 @@ class RegexBlockStream extends Transform{
|
||||
}
|
||||
|
||||
_transform(chunk, encoding, next){
|
||||
chunk = this.readableBuffer.length? this.readableBuffer.join('') + chunk: chunk; // Add previous buffer to current chunk
|
||||
chunk = this.readableBuffer.length? this.readableBuffer.join() + chunk: chunk; // Add previous buffer to current chunk
|
||||
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
|
||||
|
||||
let matches = chunk.match(this.matcher); // Match
|
||||
@ -52,7 +52,7 @@ class RegexBlockStream extends Transform{
|
||||
|
||||
_flush(next){
|
||||
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
|
||||
let chunk = this.readableBuffer.join('');
|
||||
let chunk = this.readableBuffer.toString();
|
||||
let matches = chunk.match(this.matcher); // Match remaining buffer
|
||||
_writeMatches(matches); // Write matches including last element
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user