Compare commits
22 Commits
dcd0ce8111
...
f_influx-c
| Author | SHA1 | Date | |
|---|---|---|---|
| 873f00b21b | |||
| 271554719e | |||
| e18de63d7c | |||
| 2356040572 | |||
| b0bbf0c71a | |||
| 37b78e7373 | |||
| c5e1bb4c64 | |||
| 3c29ed2000 | |||
| ddf39b9433 | |||
| 3a927688d0 | |||
| c51cfc1b14 | |||
| e1b2a7e016 | |||
| fc5900b0ba | |||
| 354ca32a61 | |||
| d10e9bb2c6 | |||
| 44cd3288cf | |||
| 3af4bb7cc6 | |||
| 2a662e0bd1 | |||
| d7a9530b68 | |||
| 7de2250983 | |||
| bb3d843895 | |||
| 9472ed9198 |
15
package-lock.json
generated
15
package-lock.json
generated
@@ -10,6 +10,7 @@
|
|||||||
"license": "AGPL-3.0",
|
"license": "AGPL-3.0",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@influxdata/influxdb-client": "^1.20.0",
|
"@influxdata/influxdb-client": "^1.20.0",
|
||||||
|
"@influxdata/influxdb-client-apis": "^1.20.0",
|
||||||
"log4js": "^6.3.0",
|
"log4js": "^6.3.0",
|
||||||
"luxon": "^2.1.1",
|
"luxon": "^2.1.1",
|
||||||
"string-argv": "^0.3.1"
|
"string-argv": "^0.3.1"
|
||||||
@@ -20,6 +21,14 @@
|
|||||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
||||||
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
||||||
},
|
},
|
||||||
|
"node_modules/@influxdata/influxdb-client-apis": {
|
||||||
|
"version": "1.20.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
|
||||||
|
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
|
||||||
|
"peerDependencies": {
|
||||||
|
"@influxdata/influxdb-client": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/date-format": {
|
"node_modules/date-format": {
|
||||||
"version": "3.0.0",
|
"version": "3.0.0",
|
||||||
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
||||||
@@ -152,6 +161,12 @@
|
|||||||
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.20.0.tgz",
|
||||||
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
"integrity": "sha512-jaKSI63hmQ5VSkJrFJkYIXaKlhoF+mGd4HmOf7v/X7pmEi69ReHp922Wyx6/OeCrpndRMbsadk+XmGNdd43cFw=="
|
||||||
},
|
},
|
||||||
|
"@influxdata/influxdb-client-apis": {
|
||||||
|
"version": "1.20.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.20.0.tgz",
|
||||||
|
"integrity": "sha512-KMTmXH4rbpS+NWGpqDjxcKTyan2rbiT2IM5AdRElKhH2sHbH96xwLgziaxeC+OCJLeNAdehJgae3I8WiZjbwdg==",
|
||||||
|
"requires": {}
|
||||||
|
},
|
||||||
"date-format": {
|
"date-format": {
|
||||||
"version": "3.0.0",
|
"version": "3.0.0",
|
||||||
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
"resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz",
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
"license": "AGPL-3.0",
|
"license": "AGPL-3.0",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@influxdata/influxdb-client": "^1.20.0",
|
"@influxdata/influxdb-client": "^1.20.0",
|
||||||
|
"@influxdata/influxdb-client-apis": "^1.20.0",
|
||||||
"log4js": "^6.3.0",
|
"log4js": "^6.3.0",
|
||||||
"luxon": "^2.1.1",
|
"luxon": "^2.1.1",
|
||||||
"string-argv": "^0.3.1"
|
"string-argv": "^0.3.1"
|
||||||
|
|||||||
61
src/helper/influx-checks.js
Normal file
61
src/helper/influx-checks.js
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
const logger = require.main.require("./helper/logger.js")("influx-checks");
|
||||||
|
|
||||||
|
const Os = require("os");
|
||||||
|
const { InfluxDB, Point } = require('@influxdata/influxdb-client')
|
||||||
|
const Influx = require('@influxdata/influxdb-client-apis');
|
||||||
|
|
||||||
|
|
||||||
|
function checkHealth(influxDb){
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
new Influx.HealthAPI(influxDb) // Check influx health
|
||||||
|
.getHealth()
|
||||||
|
.catch((err) => {
|
||||||
|
logger.error("Could not communicate with Influx:");
|
||||||
|
logger.error(`Error [${err.code}]:`, err.message);
|
||||||
|
reject();
|
||||||
|
})
|
||||||
|
.then((res) => {
|
||||||
|
logger.debug("Server healthy.", "Version: ", res.version);
|
||||||
|
resolve(res);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function checkBucket(influxDb, options){
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
new Influx.BucketsAPI(influxDb).getBuckets(options)
|
||||||
|
.catch((err) => { // Weirdly the influx-Api returns 404 for searches of non-existing buckets
|
||||||
|
logger.error("Could not get bucket:");
|
||||||
|
logger.error(`Error [${err.code}]:`, err.message);
|
||||||
|
reject();
|
||||||
|
}).then((res) => { // But an empty list when the bucket exists, but token does not have permission to get details
|
||||||
|
logger.debug("Bucket found");
|
||||||
|
resolve(res);
|
||||||
|
// Now we know the bucket exists and we have some kind of permission.. but we still dont know if we are able to write to it..
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function checkWriteApi(influxDb, options){
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const writeApi = influxDb.getWriteApi(options.org, options.bucket); // Get WriteAPI
|
||||||
|
writeApi.writePoint(new Point("worker_connectionTest").tag("hostname", Os.hostname())) // Write point
|
||||||
|
writeApi.close()
|
||||||
|
.catch((err) => {
|
||||||
|
logger.error("Could not get writeApi:");
|
||||||
|
logger.error(`Error [${err.code}]:`, err.message);
|
||||||
|
reject();
|
||||||
|
}).then((res) => {
|
||||||
|
logger.debug("Writing ok");
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Specify exports
|
||||||
|
module.exports = {
|
||||||
|
checkHealth,
|
||||||
|
checkBucket,
|
||||||
|
checkWriteApi,
|
||||||
|
};
|
||||||
27
src/main.js
27
src/main.js
@@ -2,6 +2,8 @@ const logger = require("./helper/logger.js")("main");
|
|||||||
|
|
||||||
const { requireEnvVars } = require("./helper/env.js");
|
const { requireEnvVars } = require("./helper/env.js");
|
||||||
const { exit } = require("process");
|
const { exit } = require("process");
|
||||||
|
const { InfluxDB } = require('@influxdata/influxdb-client');
|
||||||
|
const InfluxChecks = require('./helper/influx-checks.js');
|
||||||
|
|
||||||
/// Setup ENVs
|
/// Setup ENVs
|
||||||
const env = process.env;
|
const env = process.env;
|
||||||
@@ -20,3 +22,28 @@ if(errorMsg){
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(async function() {
|
||||||
|
logger.info("Setup Influx..");
|
||||||
|
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
|
||||||
|
|
||||||
|
await InfluxChecks.checkHealth(influxDb)
|
||||||
|
.then((res) => {return InfluxChecks.checkBucket(influxDb, {
|
||||||
|
org: env.INFLUX_ORG,
|
||||||
|
name: env.INFLUX_BUCKET
|
||||||
|
})})
|
||||||
|
.then((res) => {return InfluxChecks.checkWriteApi(influxDb, {
|
||||||
|
org: env.INFLUX_ORG,
|
||||||
|
bucket: env.INFLUX_BUCKET
|
||||||
|
})})
|
||||||
|
.catch((err) => {
|
||||||
|
if(err) {
|
||||||
|
logger.error("Error whilst checking influx:");
|
||||||
|
logger.error(err);
|
||||||
|
}
|
||||||
|
logger.fatal("Setup influx failed!");
|
||||||
|
exit(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info("Influx ok");
|
||||||
|
|
||||||
|
})();
|
||||||
|
|||||||
@@ -8,14 +8,16 @@ const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client')
|
|||||||
class InfluxPointWriter extends Writable{
|
class InfluxPointWriter extends Writable{
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param {string} url Influx-Url
|
* @param {InfluxDB} influxDb InfluxDb
|
||||||
* @param {string} token Auth-token
|
|
||||||
* @param {string} org Organization to use
|
* @param {string} org Organization to use
|
||||||
* @param {string} bucket Bucket to use
|
* @param {string} bucket Bucket to use
|
||||||
* @param {string} precision Precision to use
|
* @param {Partial<WriteOptions>} options Options for WriteApi
|
||||||
*/
|
*/
|
||||||
constructor(url, token, org, bucket, precision = 'us'){
|
constructor(influxDb, org, bucket, options){
|
||||||
this._api = new InfluxDB({url, token}).getWriteApi(org, bucket, precision);
|
super({
|
||||||
|
objectMode: true
|
||||||
|
});
|
||||||
|
this._api = influxDb.getWriteApi(org, bucket, 'us', options);
|
||||||
}
|
}
|
||||||
|
|
||||||
_write(point, encoding, next){
|
_write(point, encoding, next){
|
||||||
@@ -25,10 +27,8 @@ class InfluxPointWriter extends Writable{
|
|||||||
|
|
||||||
_flush(next){
|
_flush(next){
|
||||||
this._api.flush(true)
|
this._api.flush(true)
|
||||||
.then(
|
.catch((err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); })
|
||||||
next,
|
.then(next);
|
||||||
(err) => { next(new Error(`WriteApi rejected promise for flush: ${err}`)); }
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ class PacketInfluxPointFactory extends Transform{
|
|||||||
|
|
||||||
point.setField('value', packet[objKey]); // Set field
|
point.setField('value', packet[objKey]); // Set field
|
||||||
|
|
||||||
this.push(null, point); // Push point into stream
|
this.push(point); // Push point into stream
|
||||||
});
|
});
|
||||||
|
|
||||||
next(); // Get next packet
|
next(); // Get next packet
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ class RegexBlockStream extends Transform{
|
|||||||
}
|
}
|
||||||
|
|
||||||
_transform(chunk, encoding, next){
|
_transform(chunk, encoding, next){
|
||||||
chunk = this.readableBuffer.length? this.readableBuffer.join() + chunk: chunk; // Add previous buffer to current chunk
|
chunk = this.readableBuffer.length? this.readableBuffer.join('') + chunk: chunk; // Add previous buffer to current chunk
|
||||||
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
|
this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it
|
||||||
|
|
||||||
let matches = chunk.match(this.matcher); // Match
|
let matches = chunk.match(this.matcher); // Match
|
||||||
@@ -52,7 +52,7 @@ class RegexBlockStream extends Transform{
|
|||||||
|
|
||||||
_flush(next){
|
_flush(next){
|
||||||
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
|
if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer
|
||||||
let chunk = this.readableBuffer.toString();
|
let chunk = this.readableBuffer.join('');
|
||||||
let matches = chunk.match(this.matcher); // Match remaining buffer
|
let matches = chunk.match(this.matcher); // Match remaining buffer
|
||||||
_writeMatches(matches); // Write matches including last element
|
_writeMatches(matches); // Write matches including last element
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user