22 Commits

Author SHA1 Message Date
873f00b21b Remove deleting point and rename point to something useable 2021-11-26 19:17:43 +01:00
271554719e Remove wrong hint 2021-11-26 19:17:24 +01:00
e18de63d7c Revert "Move catch into then-onRejected to fix resolving then anyways."
This reverts commit 2356040572.
2021-11-26 19:14:01 +01:00
2356040572 Move catch into then-onRejected to fix resolving then anyways. 2021-11-26 19:07:27 +01:00
b0bbf0c71a Fixed wrong option-subname 2021-11-26 19:06:55 +01:00
37b78e7373 Fixed wrong logging-mode used 2021-11-26 18:54:09 +01:00
c5e1bb4c64 Added info-log "ok" 2021-11-26 18:37:55 +01:00
3c29ed2000 Implemented CheckWriteApi by writing and then deleting a TestPoint 2021-11-26 18:37:34 +01:00
ddf39b9433 Fixed require 2021-11-26 18:36:50 +01:00
3a927688d0 Changed fatal to error and added error handling to promise catch in main 2021-11-26 18:36:35 +01:00
c51cfc1b14 Fixed missing require 2021-11-26 18:35:51 +01:00
e1b2a7e016 Added influx checkBucket 2021-11-26 18:03:14 +01:00
fc5900b0ba Added influx checkHealth 2021-11-26 17:55:51 +01:00
354ca32a61 Merge branch 'dev' into f_influx-checks 2021-11-26 17:47:59 +01:00
d10e9bb2c6 Create influx-client 2021-11-26 17:43:42 +01:00
44cd3288cf Fixed stram not being in object-mode 2021-11-26 17:33:19 +01:00
3af4bb7cc6 Fixed wrong push 2021-11-26 17:33:07 +01:00
2a662e0bd1 Changed constructor to take influxDb 2021-11-26 17:32:31 +01:00
d7a9530b68 Cleaned up Promise 2021-11-26 17:31:31 +01:00
7de2250983 Added client-apis 2021-11-26 17:30:46 +01:00
bb3d843895 Fixed wrong joining 2021-11-25 18:42:45 +01:00
9472ed9198 Fix wrong usage of join()
Default splitter is ',' but we want nothing
2021-11-25 18:42:26 +01:00
7 changed files with 116 additions and 12 deletions

15
package-lock.json generated
View File

@@ -10,6 +10,7 @@
"license": "AGPL-3.0",
"dependencies": {
"@influxdata/influxdb-client": "^1.20.0",
"@influxdata/influxdb-client-apis": "^1.20.0",
"log4js": "^6.3.0",
"luxon": "^2.1.1",
"string-argv": "^0.3.1"
@@ -20,6 +21,14 @@
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
},
"node_modules/@influxdata/influxdb-client-apis": {
"version": "1.20.0",
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
"peerDependencies": {
"@influxdata/influxdb-client": "*"
}
},
"node_modules/date-format": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
@@ -152,6 +161,12 @@
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
},
"@influxdata/influxdb-client-apis": {
"version": "1.20.0",
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
"requires": {}
},
"date-format": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",

View File

@@ -15,6 +15,7 @@
"license": "AGPL-3.0",
"dependencies": {
"@influxdata/influxdb-client": "^1.20.0",
"@influxdata/influxdb-client-apis": "^1.20.0",
"log4js": "^6.3.0",
"luxon": "^2.1.1",
"string-argv": "^0.3.1"

View File

@@ -0,0 +1,61 @@
const logger = require.main.require("./helper/logger.js")("influx-checks");
const Os = require("os");
const { InfluxDB, Point } = require('@influxdata/influxdb-client')
const Influx = require('@influxdata/influxdb-client-apis');
function checkHealth(influxDb){
return new Promise((resolve, reject) => {
new Influx.HealthAPI(influxDb) // Check influx health
.getHealth()
.catch((err) => {
logger.error("Could not communicate with Influx:");
logger.error(`Error [${err.code}]:`, err.message);
reject();
})
.then((res) => {
logger.debug("Server healthy.", "Version: ", res.version);
resolve(res);
});
});
}
function checkBucket(influxDb, options){
return new Promise((resolve, reject) => {
new Influx.BucketsAPI(influxDb).getBuckets(options)
.catch((err) => { // Weirdly the influx-Api returns 404 for searches of non-existing buckets
logger.error("Could not get bucket:");
logger.error(`Error [${err.code}]:`, err.message);
reject();
}).then((res) => { // But an empty list when the bucket exists, but token does not have permission to get details
logger.debug("Bucket found");
resolve(res);
// Now we know the bucket exists and we have some kind of permission.. but we still dont know if we are able to write to it..
});
});
}
function checkWriteApi(influxDb, options){
return new Promise((resolve, reject) => {
const writeApi = influxDb.getWriteApi(options.org, options.bucket); // Get WriteAPI
writeApi.writePoint(new Point("worker_connectionTest").tag("hostname", Os.hostname())) // Write point
writeApi.close()
.catch((err) => {
logger.error("Could not get writeApi:");
logger.error(`Error [${err.code}]:`, err.message);
reject();
}).then((res) => {
logger.debug("Writing ok");
resolve();
});
});
}
// Specify exports
module.exports = {
checkHealth,
checkBucket,
checkWriteApi,
};

View File

@@ -2,6 +2,8 @@ const logger = require("./helper/logger.js")("main");
const { requireEnvVars } = require("./helper/env.js");
const { exit } = require("process");
const { InfluxDB } = require('@influxdata/influxdb-client');
const InfluxChecks = require('./helper/influx-checks.js');
/// Setup ENVs
const env = process.env;
@@ -20,3 +22,28 @@ if(errorMsg){
exit(1);
}
(async function() {
logger.info("Setup Influx..");
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
await InfluxChecks.checkHealth(influxDb)
.then((res) => {return InfluxChecks.checkBucket(influxDb, {
org: env.INFLUX_ORG,
name: env.INFLUX_BUCKET
})})
.then((res) => {return InfluxChecks.checkWriteApi(influxDb, {
org: env.INFLUX_ORG,
bucket: env.INFLUX_BUCKET
})})
.catch((err) => {
if(err) {
logger.error("Error whilst checking influx:");
logger.error(err);
}
logger.fatal("Setup influx failed!");
exit(1);
});
logger.info("Influx ok");
})();

View File

@@ -8,14 +8,16 @@ const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client')
class InfluxPointWriter extends Writable{
/**
*
* @param {string} url Influx-Url
* @param {string} token Auth-token
* @param {InfluxDB} influxDb InfluxDb
* @param {string} org Organization to use
* @param {string} bucket Bucket to use
* @param {string} precision Precision to use
* @param {Partial<WriteOptions>} options Options for WriteApi
*/
constructor(url, token, org, bucket, precision = 'us'){
this._api = new InfluxDB({url, token}).getWriteApi(org, bucket, precision);
constructor(influxDb, org, bucket, options){
super({
objectMode: true
});
this._api = influxDb.getWriteApi(org, bucket, 'us', options);
}
_write(point, encoding, next){
@@ -25,10 +27,8 @@ class InfluxPointWriter extends Writable{
_flush(next){
this._api.flush(true)
.then(
next,
(err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); }
);
.catch((err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); })
.then(next);
}
}

View File

@@ -47,7 +47,7 @@ class PacketInfluxPointFactory extends Transform{
point.setField('value', packet[objKey]); // Set field
this.push(null, point); // Push point into stream
this.push(point); // Push point into stream
});
next(); // Get next packet

View File

@@ -27,7 +27,7 @@ class RegexBlockStream extends Transform{
}
_transform(chunk, encoding, next){
chunk = this.readableBuffer.length? this.readableBuffer.join() + chunk: chunk; // Add previous buffer to current chunk
chunk = this.readableBuffer.length? this.readableBuffer.join('') + chunk: chunk; // Add previous buffer to current chunk
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
let matches = chunk.match(this.matcher); // Match
@@ -52,7 +52,7 @@ class RegexBlockStream extends Transform{
_flush(next){
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
let chunk = this.readableBuffer.toString();
let chunk = this.readableBuffer.join('');
let matches = chunk.match(this.matcher); // Match remaining buffer
_writeMatches(matches); // Write matches including last element
}