From 6b93a02943beacef832ef15c6322ca10c21ddb3a Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 9 Dec 2021 15:47:26 +0100 Subject: [PATCH 01/10] Implement basic LineProtocolWriter --- .../InfluxDbLineProtocolWriter.js | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 src/streamHandler/InfluxDbLineProtocolWriter.js diff --git a/src/streamHandler/InfluxDbLineProtocolWriter.js b/src/streamHandler/InfluxDbLineProtocolWriter.js new file mode 100644 index 0000000..a47a843 --- /dev/null +++ b/src/streamHandler/InfluxDbLineProtocolWriter.js @@ -0,0 +1,67 @@ +const logger = require.main.require("./helper/logger.js")("InfluxDbLineProtocolWriter"); +const net = require("net"); + +/** + * Get points and write them into influx + */ +class InfluxDbLineProtocolWriter extends net.Socket{ + /** + * + * @param {string} host Host of line-server + * @param {string} port Port of line-server + * @param {object} options Options for further configuration + */ + constructor(host, port, options = {}) { + super(); + + this._host = host; + this._port = port; + + // options defaults + options.autoConnect ??= true; + options.timeout ??= 5000; + options.autoReconnect ??= true; + this._options = options; + + super.setKeepalive(true, 5000); + + // Register auto-Reconnect if enabled + if(this._options.autoReconnect){ + this.on("connect", () => { + logger.debug("Connection established!"); + + if(this._autoReconnectTimeout) + clearInterval(this._autoReconnectTimeout); + this._autoReconnectTimeout = 0; + }); + + this.on("error", (err) => { + logger.error(err.code, "TCP ERROR"); + if(!this._autoReconnectTimeout) + this._autoReconnectTimeout = setInterval(() => { + this.connect(); + }); + }); + } + + // Autoconnect if requested + if(this._options.autoConnect) this.connect(); + } + + get host(){ return this._host; } + get port(){ return this._port; } + + connect(){ + logger.debug("Connecting.."); + super.connect(this._host, this._port); + } + + write(buffer, errorCb){ + return super.write(buffer, errorCb); + } +} + +// Specify exports +module.exports = { + InfluxDbLineProtocolWriter +}; \ No newline at end of file From 024305db434640e2687ec7e2cbbe78b16313dd4b Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 9 Dec 2021 17:12:11 +0100 Subject: [PATCH 02/10] Fixed host and port wrong way around --- src/streamHandler/InfluxDbLineProtocolWriter.js | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/streamHandler/InfluxDbLineProtocolWriter.js b/src/streamHandler/InfluxDbLineProtocolWriter.js index a47a843..1aa4449 100644 --- a/src/streamHandler/InfluxDbLineProtocolWriter.js +++ b/src/streamHandler/InfluxDbLineProtocolWriter.js @@ -53,11 +53,7 @@ class InfluxDbLineProtocolWriter extends net.Socket{ connect(){ logger.debug("Connecting.."); - super.connect(this._host, this._port); - } - - write(buffer, errorCb){ - return super.write(buffer, errorCb); + super.connect(this._port, this._host); } } From 7f5e168fda89a711ee6314df467feb4e4cec8289 Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 9 Dec 2021 17:12:35 +0100 Subject: [PATCH 03/10] Added missing autoReconnectBackoffTime --- src/streamHandler/InfluxDbLineProtocolWriter.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/streamHandler/InfluxDbLineProtocolWriter.js b/src/streamHandler/InfluxDbLineProtocolWriter.js index 1aa4449..f0c9a1f 100644 --- a/src/streamHandler/InfluxDbLineProtocolWriter.js +++ b/src/streamHandler/InfluxDbLineProtocolWriter.js @@ -21,6 +21,7 @@ class InfluxDbLineProtocolWriter extends net.Socket{ options.autoConnect ??= true; options.timeout ??= 5000; options.autoReconnect ??= true; + options.autoReconnectBackoffTime ??= 3000; this._options = options; super.setKeepalive(true, 5000); @@ -40,7 +41,8 @@ class InfluxDbLineProtocolWriter extends net.Socket{ if(!this._autoReconnectTimeout) this._autoReconnectTimeout = setInterval(() => { this.connect(); - }); + }, + this._options.autoReconnectBackoffTime); }); } From 4e2ffec656383d2c5c88df9393467c2b55a395bb Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 9 Dec 2021 17:12:42 +0100 Subject: [PATCH 04/10] Fix typo --- src/streamHandler/InfluxDbLineProtocolWriter.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/streamHandler/InfluxDbLineProtocolWriter.js b/src/streamHandler/InfluxDbLineProtocolWriter.js index f0c9a1f..8eef2e5 100644 --- a/src/streamHandler/InfluxDbLineProtocolWriter.js +++ b/src/streamHandler/InfluxDbLineProtocolWriter.js @@ -24,7 +24,7 @@ class InfluxDbLineProtocolWriter extends net.Socket{ options.autoReconnectBackoffTime ??= 3000; this._options = options; - super.setKeepalive(true, 5000); + super.setKeepAlive(true, 5000); // Register auto-Reconnect if enabled if(this._options.autoReconnect){ From d1cf1d8f7dc599f653dedbe8eb03b5604d0ea0a3 Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 9 Dec 2021 17:12:59 +0100 Subject: [PATCH 05/10] Add connection-state tracking as getter --- src/streamHandler/InfluxDbLineProtocolWriter.js | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/streamHandler/InfluxDbLineProtocolWriter.js b/src/streamHandler/InfluxDbLineProtocolWriter.js index 8eef2e5..514c2de 100644 --- a/src/streamHandler/InfluxDbLineProtocolWriter.js +++ b/src/streamHandler/InfluxDbLineProtocolWriter.js @@ -24,12 +24,15 @@ class InfluxDbLineProtocolWriter extends net.Socket{ options.autoReconnectBackoffTime ??= 3000; this._options = options; + this._isConnected = false; + super.setKeepAlive(true, 5000); // Register auto-Reconnect if enabled if(this._options.autoReconnect){ this.on("connect", () => { logger.debug("Connection established!"); + this._isConnected = true; if(this._autoReconnectTimeout) clearInterval(this._autoReconnectTimeout); @@ -38,6 +41,8 @@ class InfluxDbLineProtocolWriter extends net.Socket{ this.on("error", (err) => { logger.error(err.code, "TCP ERROR"); + this._isConnected = false; + if(!this._autoReconnectTimeout) this._autoReconnectTimeout = setInterval(() => { this.connect(); @@ -53,6 +58,8 @@ class InfluxDbLineProtocolWriter extends net.Socket{ get host(){ return this._host; } get port(){ return this._port; } + get isConnected(){ return this._isConnected; } + connect(){ logger.debug("Connecting.."); super.connect(this._port, this._host); From f596a99ee6468ed631e8765f49d8683094284209 Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 9 Dec 2021 17:13:34 +0100 Subject: [PATCH 06/10] Implement converter-stream point -> lineProtocol --- .../InfluxPointToLineProtoStream.js | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 src/streamHandler/InfluxPointToLineProtoStream.js diff --git a/src/streamHandler/InfluxPointToLineProtoStream.js b/src/streamHandler/InfluxPointToLineProtoStream.js new file mode 100644 index 0000000..a8fc40b --- /dev/null +++ b/src/streamHandler/InfluxPointToLineProtoStream.js @@ -0,0 +1,22 @@ +const logger = require.main.require("./helper/logger.js")("InfluxPointToLineProtoStream"); +const { Transform } = require("stream"); + +/** + * Get points and converts them to Line-protocol + */ +class InfluxPointToLineProtoStream extends Transform{ + constructor(){ + super({ + writableObjectMode: true + }); + } + + _transform(point, encoding, next){ + next(null, point.toLineProtocol() +"\n"); + } +} + +// Specify exports +module.exports = { + InfluxPointToLineProtoStream +}; \ No newline at end of file From 101200131278ed2225231aee32bdd2d8ad97faee Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 9 Dec 2021 17:14:11 +0100 Subject: [PATCH 07/10] Implement env-var-checks --- src/main.js | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main.js b/src/main.js index bd9398c..610922e 100644 --- a/src/main.js +++ b/src/main.js @@ -25,12 +25,17 @@ const env = process.env; env.LOGLEVEL ??= "INFO"; env.WIFI_INTERFACE ??= "wlan0"; env.HOSTNAME ??= Os.hostname(); + + env.USE_INFLUXDB_LINEPROTOCOL ??= false; } // Required vars -let errorMsg = requireEnvVars([ - "INFLUX_URL", "INFLUX_TOKEN", - "INFLUX_ORG", "INFLUX_BUCKET" -]); +let errorMsg = requireEnvVars( + env.USE_INFLUXDB_LINEPROTOCOL? [ // When lineprotocol is enabled, we need host and port + "INFLUXDB_LINEPROTOCOL_HOST", "INFLUXDB_LINEPROTOCOL_PORT", + ] : [ // When its disabled, influxdb-data + "INFLUX_URL", "INFLUX_TOKEN", + "INFLUX_ORG", "INFLUX_BUCKET" + ]); if(errorMsg){ logger.fatal(errorMsg); exit(1); From c09c6c29fbe4fd0d75785f4f41755a8326f331db Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 9 Dec 2021 17:15:10 +0100 Subject: [PATCH 08/10] Move InfluxDB to if-block --- src/main.js | 59 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/src/main.js b/src/main.js index 610922e..fd2199e 100644 --- a/src/main.js +++ b/src/main.js @@ -14,6 +14,7 @@ const { RegexBlockStream } = require("./streamHandler/RegexBlockStream.js"); const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js"); const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js"); const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js"); +const { InfluxDbLineProtocolWriter } = require("./streamHandler/InfluxDbLineProtocolWriter.js"); const userHelper = require("./helper/userHelper.js"); @@ -42,31 +43,41 @@ if(errorMsg){ } (async function() { - logger.info("Setup Influx.."); - const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN}); - - await InfluxChecks.checkHealth(influxDb) - .then((res) => {return InfluxChecks.checkBucket(influxDb, { - org: env.INFLUX_ORG, - name: env.INFLUX_BUCKET - });}) - .then((res) => {return InfluxChecks.checkWriteApi(influxDb, { - org: env.INFLUX_ORG, - bucket: env.INFLUX_BUCKET - });}) - .catch((err) => { - if(err) { - logger.error("Error whilst checking influx:"); - logger.error(err); - } - logger.fatal("Setup influx failed!"); - exit(1); - }); + let pointWriter; + if(!env.USE_INFLUXDB_LINEPROTOCOL){ + logger.info("Setup Influx.."); + const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN}); + + await InfluxChecks.checkHealth(influxDb) + .then((res) => {return InfluxChecks.checkBucket(influxDb, { + org: env.INFLUX_ORG, + name: env.INFLUX_BUCKET + });}) + .then((res) => {return InfluxChecks.checkWriteApi(influxDb, { + org: env.INFLUX_ORG, + bucket: env.INFLUX_BUCKET + });}) + .catch((err) => { + if(err) { + logger.error("Error whilst checking influx:"); + logger.error(err); + } + logger.fatal("Setup influx failed!"); + exit(1); + }); + + logger.debug("Get WriteApi & set default-hostname to", `'${env.HOSTNAME}'`); + const influxWriteApi = influxDb.getWriteApi(env.INFLUX_ORG, env.INFLUX_BUCKET, "us"); + //influxWriteApi.useDefaultTags({"hostname": env.HOSTNAME}); - logger.debug("Get WriteApi & set default-hostname to", `'${env.HOSTNAME}'`); - const influxWriteApi = influxDb.getWriteApi(env.INFLUX_ORG, env.INFLUX_BUCKET, "us"); - //influxWriteApi.useDefaultTags({"hostname": env.HOSTNAME}); - logger.info("Influx ok"); + pointWriter = new InfluxPointWriter(influxWriteApi); + + logger.info("Influx ok"); + } + else { + logger.info("Setup Influxdb-LineProtocol.."); + + } logger.info("Starting tcpdump.."); const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i"; From 2f84bb4408008db9d2d40b48b10181d94a754737 Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 9 Dec 2021 17:15:31 +0100 Subject: [PATCH 09/10] Implement connection-checking --- src/main.js | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/src/main.js b/src/main.js index fd2199e..3382e03 100644 --- a/src/main.js +++ b/src/main.js @@ -15,6 +15,7 @@ const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js" const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js"); const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js"); const { InfluxDbLineProtocolWriter } = require("./streamHandler/InfluxDbLineProtocolWriter.js"); +const { InfluxPointToLineProtoStream } = require("./streamHandler/InfluxPointToLineProtoStream.js"); const userHelper = require("./helper/userHelper.js"); @@ -77,6 +78,37 @@ if(errorMsg){ else { logger.info("Setup Influxdb-LineProtocol.."); + let lineProtocolWriter = new InfluxDbLineProtocolWriter(env.INFLUXDB_LINEPROTOCOL_HOST, env.INFLUXDB_LINEPROTOCOL_PORT); + + logger.debug("Create PointToLineProto and pipe to LineProtocolWriter"); + pointWriter = new InfluxPointToLineProtoStream(); + pointWriter + .setEncoding("utf8") + .pipe(lineProtocolWriter); + + logger.debug("Waiting for connection.."); + await new Promise((resolve, reject) => { + lineProtocolWriter.once("connect", () => { + resolve(); + }); + lineProtocolWriter.once("error", (err) => { + reject(err); + }); + setTimeout(() => { // After timeout, reject promise + reject("Timeout whilst waiting to connect"); + }, 6500); + }) + .then(() => { + logger.info("Influxdb-LineProtocol ok"); + }) + .catch((err) => { + if(err) { + logger.error("Error whilst checking Influxdb-LineProtocol:"); + logger.error(err); + } + logger.fatal("Setup Influxdb-LineProtocol failed!"); + exit(1); + }); } logger.info("Starting tcpdump.."); @@ -88,13 +120,12 @@ if(errorMsg){ let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm); let packetStreamFactory = new PacketStreamFactory(); let packetInfluxPointFactory = new PacketInfluxPointFactory(); - let influxPointWriter = new InfluxPointWriter(influxWriteApi); proc.stdout .setEncoding("utf8") .pipe(regexBlockStream) .pipe(packetStreamFactory) .pipe(packetInfluxPointFactory) - .pipe(influxPointWriter); + .pipe(pointWriter); logger.debug("Attaching error-logger.."); const loggerTcpdump = logFactory("tcpdump"); From 69e910428e1d8ab9f3c46a9b6c4579fd8369040e Mon Sep 17 00:00:00 2001 From: Ruakij Date: Thu, 9 Dec 2021 17:57:15 +0100 Subject: [PATCH 10/10] Added documentation for LineProtocol export --- README.md | 88 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 78 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index e8423cd..33acff7 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,9 @@ Table of contents - [1.3. Tools used](#13-tools-used) - [2. Usage/Installation](#2-usageinstallation) - [2.1. Prerequisites](#21-prerequisites) - - [2.2. Running with Docker](#22-running-with-docker) - - [2.3. Environment-Variables](#23-environment-variables) + - [2.2. Choosing an Export-Method](#22-choosing-an-export-method) + - [2.3. Running with Docker](#23-running-with-docker) + - [2.4. Environment-Variables](#24-environment-variables) - [3. Data collected](#3-data-collected) - [3.1. Data-Types](#31-data-types) - [3.2. Metric-Overview](#32-metric-overview) @@ -113,18 +114,34 @@ iw dev set channel
-## 2.2. Running with Docker +## 2.2. Choosing an Export-Method -### 2.2.1. Permissions +The system allows exporting directly into [InfluxDB](https://docs.influxdata.com/influxdb) version >= 2.0 or into any system using the [InfluxDb-Line-Protocol](https://docs.influxdata.com/influxdb/v2.1/reference/syntax/line-protocol/) e.g. [QuestDB](https://questdb.io/) over TCP. + +As of writing (using InfluxDB v2.1 and using the *flux*-language), the data written by this system was a bit too much for InfluxDB and it struggled very quickly on a fairly beefy machine. + +Thats why the additional LineProtocol-Export-Method was added. Freedom of choice of the Time-Database. + +
+ +If you want to use the InfluxDB-Line-Protocol, simply set the environment variable `USE_INFLUXDB_LINEPROTOCOL` to `true` along with the-other necessary Host and Port-variables. + +
+ +## 2.3. Running with Docker + +### 2.3.1. Permissions The container must run as **root**, to have permission to listen on the wifi-interface.
-### 2.2.2. docker run +### 2.3.2. docker run Either run with docker directly. +
for InfluxDB + ```sh docker run -d @@ -134,18 +151,35 @@ docker run -e INFLUX_URL="http://influxdb:8086/" -e INFLUX_TOKEN="" -e INFLUX_ORG="" - -e INFLUX_BUCKET="" ruakij/rfmon-to-influx:2 ``` +
+ +
for InfluxDB-Line-Protocol + +```sh +docker run + -d + --restart unless-stopped + --network host + -e WIFI_INTERFACE="" + -e USE_INFLUXDB_LINEPROTOCOL="true" + -e INFLUXDB_LINEPROTOCOL_HOST="" + -e INFLUXDB_LINEPROTOCOL_PORT="" + ruakij/rfmon-to-influx:2 +``` +

-### 2.2.3. docker-compose +### 2.3.3. docker-compose Or use the more preferred way with docker-compose. `docker-compose.yml` +
for InfluxDB + ```yaml version: '3' @@ -162,6 +196,28 @@ services: - INFLUX_ORG="" - INFLUX_BUCKET="" ``` +
+ +
for InfluxDB-Line-Protocol + +```yaml +version: '3' + +services: + rfmon: + container_name: rfmon + image: ruakij/rfmon-to-influx:2 + restart: unless-stopped + network_mode: "host" + environment: + - WIFI_INTERFACE="" + - USE_INFLUXDB_LINEPROTOCOL="true" + - INFLUXDB_LINEPROTOCOL_HOST="" + - INFLUXDB_LINEPROTOCOL_PORT="" +``` +
+ +
And then pull&start the container: ```sh @@ -170,9 +226,11 @@ docker-compose up -d
-## 2.3. Environment-Variables +## 2.4. Environment-Variables -### 2.3.1. Necessary +### 2.4.1. Necessary + +
for InfluxDB Variable|Description ---|--- @@ -180,10 +238,20 @@ Variable|Description `INFLUX_TOKEN` | Token with write-access `INFLUX_ORG` | Organisation and.. `INFLUX_BUCKET` | Bucket to write into +
+ +
for InfluxDB-Line-Protocol + +Variable|Description +---|--- +`USE_INFLUXDB_LINEPROTOCOL` | Enable LineProtocol +`INFLUXDB_LINEPROTOCOL_HOST` | Host and.. +`INFLUXDB_LINEPROTOCOL_PORT` | Port of your server +

-### 2.3.2. Optional +### 2.4.2. Optional Variable|Default|Description ---|---|---