Compare commits
22 Commits
dcd0ce8111
...
f_influx-c
| Author | SHA1 | Date | |
|---|---|---|---|
| 873f00b21b | |||
| 271554719e | |||
| e18de63d7c | |||
| 2356040572 | |||
| b0bbf0c71a | |||
| 37b78e7373 | |||
| c5e1bb4c64 | |||
| 3c29ed2000 | |||
| ddf39b9433 | |||
| 3a927688d0 | |||
| c51cfc1b14 | |||
| e1b2a7e016 | |||
| fc5900b0ba | |||
| 354ca32a61 | |||
| d10e9bb2c6 | |||
| 44cd3288cf | |||
| 3af4bb7cc6 | |||
| 2a662e0bd1 | |||
| d7a9530b68 | |||
| 7de2250983 | |||
| bb3d843895 | |||
| 9472ed9198 |
15
package-lock.json
generated
15
package-lock.json
generated
@@ -10,6 +10,7 @@
|
||||
"license": "AGPL-3.0",
|
||||
"dependencies": {
|
||||
"@influxdata/influxdb-client": "^1.20.0",
|
||||
"@influxdata/influxdb-client-apis": "^1.20.0",
|
||||
"log4js": "^6.3.0",
|
||||
"luxon": "^2.1.1",
|
||||
"string-argv": "^0.3.1"
|
||||
@@ -20,6 +21,14 @@
|
||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
||||
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
||||
},
|
||||
"node_modules/@influxdata/influxdb-client-apis": {
|
||||
"version": "1.20.0",
|
||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
|
||||
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
|
||||
"peerDependencies": {
|
||||
"@influxdata/influxdb-client": "*"
|
||||
}
|
||||
},
|
||||
"node_modules/date-format": {
|
||||
"version": "3.0.0",
|
||||
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
||||
@@ -152,6 +161,12 @@
|
||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
||||
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
||||
},
|
||||
"@influxdata/influxdb-client-apis": {
|
||||
"version": "1.20.0",
|
||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
|
||||
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
|
||||
"requires": {}
|
||||
},
|
||||
"date-format": {
|
||||
"version": "3.0.0",
|
||||
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
"license": "AGPL-3.0",
|
||||
"dependencies": {
|
||||
"@influxdata/influxdb-client": "^1.20.0",
|
||||
"@influxdata/influxdb-client-apis": "^1.20.0",
|
||||
"log4js": "^6.3.0",
|
||||
"luxon": "^2.1.1",
|
||||
"string-argv": "^0.3.1"
|
||||
|
||||
61
src/helper/influx-checks.js
Normal file
61
src/helper/influx-checks.js
Normal file
@@ -0,0 +1,61 @@
|
||||
const logger = require.main.require("./helper/logger.js")("influx-checks");
|
||||
|
||||
const Os = require("os");
|
||||
const { InfluxDB, Point } = require('@influxdata/influxdb-client')
|
||||
const Influx = require('@influxdata/influxdb-client-apis');
|
||||
|
||||
|
||||
function checkHealth(influxDb){
|
||||
return new Promise((resolve, reject) => {
|
||||
new Influx.HealthAPI(influxDb) // Check influx health
|
||||
.getHealth()
|
||||
.catch((err) => {
|
||||
logger.error("Could not communicate with Influx:");
|
||||
logger.error(`Error [${err.code}]:`, err.message);
|
||||
reject();
|
||||
})
|
||||
.then((res) => {
|
||||
logger.debug("Server healthy.", "Version: ", res.version);
|
||||
resolve(res);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function checkBucket(influxDb, options){
|
||||
return new Promise((resolve, reject) => {
|
||||
new Influx.BucketsAPI(influxDb).getBuckets(options)
|
||||
.catch((err) => { // Weirdly the influx-Api returns 404 for searches of non-existing buckets
|
||||
logger.error("Could not get bucket:");
|
||||
logger.error(`Error [${err.code}]:`, err.message);
|
||||
reject();
|
||||
}).then((res) => { // But an empty list when the bucket exists, but token does not have permission to get details
|
||||
logger.debug("Bucket found");
|
||||
resolve(res);
|
||||
// Now we know the bucket exists and we have some kind of permission.. but we still dont know if we are able to write to it..
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function checkWriteApi(influxDb, options){
|
||||
return new Promise((resolve, reject) => {
|
||||
const writeApi = influxDb.getWriteApi(options.org, options.bucket); // Get WriteAPI
|
||||
writeApi.writePoint(new Point("worker_connectionTest").tag("hostname", Os.hostname())) // Write point
|
||||
writeApi.close()
|
||||
.catch((err) => {
|
||||
logger.error("Could not get writeApi:");
|
||||
logger.error(`Error [${err.code}]:`, err.message);
|
||||
reject();
|
||||
}).then((res) => {
|
||||
logger.debug("Writing ok");
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// Specify exports
|
||||
module.exports = {
|
||||
checkHealth,
|
||||
checkBucket,
|
||||
checkWriteApi,
|
||||
};
|
||||
27
src/main.js
27
src/main.js
@@ -2,6 +2,8 @@ const logger = require("./helper/logger.js")("main");
|
||||
|
||||
const { requireEnvVars } = require("./helper/env.js");
|
||||
const { exit } = require("process");
|
||||
const { InfluxDB } = require('@influxdata/influxdb-client');
|
||||
const InfluxChecks = require('./helper/influx-checks.js');
|
||||
|
||||
/// Setup ENVs
|
||||
const env = process.env;
|
||||
@@ -20,3 +22,28 @@ if(errorMsg){
|
||||
exit(1);
|
||||
}
|
||||
|
||||
(async function() {
|
||||
logger.info("Setup Influx..");
|
||||
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
|
||||
|
||||
await InfluxChecks.checkHealth(influxDb)
|
||||
.then((res) => {return InfluxChecks.checkBucket(influxDb, {
|
||||
org: env.INFLUX_ORG,
|
||||
name: env.INFLUX_BUCKET
|
||||
})})
|
||||
.then((res) => {return InfluxChecks.checkWriteApi(influxDb, {
|
||||
org: env.INFLUX_ORG,
|
||||
bucket: env.INFLUX_BUCKET
|
||||
})})
|
||||
.catch((err) => {
|
||||
if(err) {
|
||||
logger.error("Error whilst checking influx:");
|
||||
logger.error(err);
|
||||
}
|
||||
logger.fatal("Setup influx failed!");
|
||||
exit(1);
|
||||
});
|
||||
|
||||
logger.info("Influx ok");
|
||||
|
||||
})();
|
||||
|
||||
@@ -8,14 +8,16 @@ const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client')
|
||||
class InfluxPointWriter extends Writable{
|
||||
/**
|
||||
*
|
||||
* @param {string} url Influx-Url
|
||||
* @param {string} token Auth-token
|
||||
* @param {InfluxDB} influxDb InfluxDb
|
||||
* @param {string} org Organization to use
|
||||
* @param {string} bucket Bucket to use
|
||||
* @param {string} precision Precision to use
|
||||
* @param {Partial<WriteOptions>} options Options for WriteApi
|
||||
*/
|
||||
constructor(url, token, org, bucket, precision = 'us'){
|
||||
this._api = new InfluxDB({url, token}).getWriteApi(org, bucket, precision);
|
||||
constructor(influxDb, org, bucket, options){
|
||||
super({
|
||||
objectMode: true
|
||||
});
|
||||
this._api = influxDb.getWriteApi(org, bucket, 'us', options);
|
||||
}
|
||||
|
||||
_write(point, encoding, next){
|
||||
@@ -25,10 +27,8 @@ class InfluxPointWriter extends Writable{
|
||||
|
||||
_flush(next){
|
||||
this._api.flush(true)
|
||||
.then(
|
||||
next,
|
||||
(err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); }
|
||||
);
|
||||
.catch((err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); })
|
||||
.then(next);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ class PacketInfluxPointFactory extends Transform{
|
||||
|
||||
point.setField('value', packet[objKey]); // Set field
|
||||
|
||||
this.push(null, point); // Push point into stream
|
||||
this.push(point); // Push point into stream
|
||||
});
|
||||
|
||||
next(); // Get next packet
|
||||
|
||||
@@ -27,7 +27,7 @@ class RegexBlockStream extends Transform{
|
||||
}
|
||||
|
||||
_transform(chunk, encoding, next){
|
||||
chunk = this.readableBuffer.length? this.readableBuffer.join() + chunk: chunk; // Add previous buffer to current chunk
|
||||
chunk = this.readableBuffer.length? this.readableBuffer.join('') + chunk: chunk; // Add previous buffer to current chunk
|
||||
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
|
||||
|
||||
let matches = chunk.match(this.matcher); // Match
|
||||
@@ -52,7 +52,7 @@ class RegexBlockStream extends Transform{
|
||||
|
||||
_flush(next){
|
||||
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
|
||||
let chunk = this.readableBuffer.toString();
|
||||
let chunk = this.readableBuffer.join('');
|
||||
let matches = chunk.match(this.matcher); // Match remaining buffer
|
||||
_writeMatches(matches); // Write matches including last element
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user