Merge branch 'f_influxdb-line-protocol' into dev

release-2.1
Ruakij 3 years ago
commit 10bd72907e

@ -19,8 +19,9 @@ Table of contents
- [1.3. Tools used](#13-tools-used) - [1.3. Tools used](#13-tools-used)
- [2. Usage/Installation](#2-usageinstallation) - [2. Usage/Installation](#2-usageinstallation)
- [2.1. Prerequisites](#21-prerequisites) - [2.1. Prerequisites](#21-prerequisites)
- [2.2. Running with Docker](#22-running-with-docker) - [2.2. Choosing an Export-Method](#22-choosing-an-export-method)
- [2.3. Environment-Variables](#23-environment-variables) - [2.3. Running with Docker](#23-running-with-docker)
- [2.4. Environment-Variables](#24-environment-variables)
- [3. Data collected](#3-data-collected) - [3. Data collected](#3-data-collected)
- [3.1. Data-Types](#31-data-types) - [3.1. Data-Types](#31-data-types)
- [3.2. Metric-Overview](#32-metric-overview) - [3.2. Metric-Overview](#32-metric-overview)
@ -113,18 +114,34 @@ iw dev <interface> set channel <channelNumber>
<br> <br>
## 2.2. Running with Docker ## 2.2. Choosing an Export-Method
### 2.2.1. Permissions The system allows exporting directly into [InfluxDB](https://docs.influxdata.com/influxdb) version >= 2.0 or into any system using the [InfluxDb-Line-Protocol](https://docs.influxdata.com/influxdb/v2.1/reference/syntax/line-protocol/) e.g. [QuestDB](https://questdb.io/) over TCP.
As of writing (using InfluxDB v2.1 and using the *flux*-language), the data written by this system was a bit too much for InfluxDB and it struggled very quickly on a fairly beefy machine.
Thats why the additional LineProtocol-Export-Method was added. Freedom of choice of the Time-Database.
<br>
If you want to use the InfluxDB-Line-Protocol, simply set the environment variable `USE_INFLUXDB_LINEPROTOCOL` to `true` along with the-other necessary Host and Port-variables.
<br>
## 2.3. Running with Docker
### 2.3.1. Permissions
The container must run as **root**, to have permission to listen on the wifi-interface. The container must run as **root**, to have permission to listen on the wifi-interface.
<br> <br>
### 2.2.2. docker run ### 2.3.2. docker run
Either run with docker directly. Either run with docker directly.
<details><summary>for InfluxDB</summary>
```sh ```sh
docker run docker run
-d -d
@ -134,18 +151,35 @@ docker run
-e INFLUX_URL="http://influxdb:8086/" -e INFLUX_URL="http://influxdb:8086/"
-e INFLUX_TOKEN="<yourToken>" -e INFLUX_TOKEN="<yourToken>"
-e INFLUX_ORG="<yourOrganisation>" -e INFLUX_ORG="<yourOrganisation>"
-e INFLUX_BUCKET="<yourBucket>"
ruakij/rfmon-to-influx:2 ruakij/rfmon-to-influx:2
``` ```
</details>
<details><summary>for InfluxDB-Line-Protocol</summary>
```sh
docker run
-d
--restart unless-stopped
--network host
-e WIFI_INTERFACE="<yourInterfaceName or leave empty for wlan0>"
-e USE_INFLUXDB_LINEPROTOCOL="true"
-e INFLUXDB_LINEPROTOCOL_HOST="<host>"
-e INFLUXDB_LINEPROTOCOL_PORT="<port>"
ruakij/rfmon-to-influx:2
```
</details>
<br> <br>
### 2.2.3. docker-compose ### 2.3.3. docker-compose
Or use the more preferred way with docker-compose. Or use the more preferred way with docker-compose.
`docker-compose.yml` `docker-compose.yml`
<details><summary>for InfluxDB</summary>
```yaml ```yaml
version: '3' version: '3'
@ -162,6 +196,28 @@ services:
- INFLUX_ORG="<yourOrganisation>" - INFLUX_ORG="<yourOrganisation>"
- INFLUX_BUCKET="<yourBucket>" - INFLUX_BUCKET="<yourBucket>"
``` ```
</details>
<details><summary>for InfluxDB-Line-Protocol</summary>
```yaml
version: '3'
services:
rfmon:
container_name: rfmon
image: ruakij/rfmon-to-influx:2
restart: unless-stopped
network_mode: "host"
environment:
- WIFI_INTERFACE="<yourInterfaceName or leave empty for wlan0>"
- USE_INFLUXDB_LINEPROTOCOL="true"
- INFLUXDB_LINEPROTOCOL_HOST="<host>"
- INFLUXDB_LINEPROTOCOL_PORT="<port>"
```
</details>
<br>
And then pull&start the container: And then pull&start the container:
```sh ```sh
@ -170,9 +226,11 @@ docker-compose up -d
<br> <br>
## 2.3. Environment-Variables ## 2.4. Environment-Variables
### 2.3.1. Necessary ### 2.4.1. Necessary
<details><summary>for InfluxDB</summary>
Variable|Description Variable|Description
---|--- ---|---
@ -180,10 +238,20 @@ Variable|Description
`INFLUX_TOKEN` | Token with write-access `INFLUX_TOKEN` | Token with write-access
`INFLUX_ORG` | Organisation and.. `INFLUX_ORG` | Organisation and..
`INFLUX_BUCKET` | Bucket to write into `INFLUX_BUCKET` | Bucket to write into
</details>
<details><summary>for InfluxDB-Line-Protocol</summary>
Variable|Description
---|---
`USE_INFLUXDB_LINEPROTOCOL` | Enable LineProtocol
`INFLUXDB_LINEPROTOCOL_HOST` | Host and..
`INFLUXDB_LINEPROTOCOL_PORT` | Port of your server
</details>
<br> <br>
### 2.3.2. Optional ### 2.4.2. Optional
Variable|Default|Description Variable|Default|Description
---|---|--- ---|---|---

@ -14,6 +14,8 @@ const { RegexBlockStream } = require("./streamHandler/RegexBlockStream.js");
const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js"); const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js");
const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js"); const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js");
const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js"); const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js");
const { InfluxDbLineProtocolWriter } = require("./streamHandler/InfluxDbLineProtocolWriter.js");
const { InfluxPointToLineProtoStream } = require("./streamHandler/InfluxPointToLineProtoStream.js");
const userHelper = require("./helper/userHelper.js"); const userHelper = require("./helper/userHelper.js");
@ -25,9 +27,14 @@ const env = process.env;
env.LOGLEVEL ??= "INFO"; env.LOGLEVEL ??= "INFO";
env.WIFI_INTERFACE ??= "wlan0"; env.WIFI_INTERFACE ??= "wlan0";
env.HOSTNAME ??= Os.hostname(); env.HOSTNAME ??= Os.hostname();
env.USE_INFLUXDB_LINEPROTOCOL ??= false;
} }
// Required vars // Required vars
let errorMsg = requireEnvVars([ let errorMsg = requireEnvVars(
env.USE_INFLUXDB_LINEPROTOCOL? [ // When lineprotocol is enabled, we need host and port
"INFLUXDB_LINEPROTOCOL_HOST", "INFLUXDB_LINEPROTOCOL_PORT",
] : [ // When its disabled, influxdb-data
"INFLUX_URL", "INFLUX_TOKEN", "INFLUX_URL", "INFLUX_TOKEN",
"INFLUX_ORG", "INFLUX_BUCKET" "INFLUX_ORG", "INFLUX_BUCKET"
]); ]);
@ -37,6 +44,8 @@ if(errorMsg){
} }
(async function() { (async function() {
let pointWriter;
if(!env.USE_INFLUXDB_LINEPROTOCOL){
logger.info("Setup Influx.."); logger.info("Setup Influx..");
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN}); const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
@ -61,7 +70,46 @@ if(errorMsg){
logger.debug("Get WriteApi & set default-hostname to", `'${env.HOSTNAME}'`); logger.debug("Get WriteApi & set default-hostname to", `'${env.HOSTNAME}'`);
const influxWriteApi = influxDb.getWriteApi(env.INFLUX_ORG, env.INFLUX_BUCKET, "us"); const influxWriteApi = influxDb.getWriteApi(env.INFLUX_ORG, env.INFLUX_BUCKET, "us");
//influxWriteApi.useDefaultTags({"hostname": env.HOSTNAME}); //influxWriteApi.useDefaultTags({"hostname": env.HOSTNAME});
pointWriter = new InfluxPointWriter(influxWriteApi);
logger.info("Influx ok"); logger.info("Influx ok");
}
else {
logger.info("Setup Influxdb-LineProtocol..");
let lineProtocolWriter = new InfluxDbLineProtocolWriter(env.INFLUXDB_LINEPROTOCOL_HOST, env.INFLUXDB_LINEPROTOCOL_PORT);
logger.debug("Create PointToLineProto and pipe to LineProtocolWriter");
pointWriter = new InfluxPointToLineProtoStream();
pointWriter
.setEncoding("utf8")
.pipe(lineProtocolWriter);
logger.debug("Waiting for connection..");
await new Promise((resolve, reject) => {
lineProtocolWriter.once("connect", () => {
resolve();
});
lineProtocolWriter.once("error", (err) => {
reject(err);
});
setTimeout(() => { // After timeout, reject promise
reject("Timeout whilst waiting to connect");
}, 6500);
})
.then(() => {
logger.info("Influxdb-LineProtocol ok");
})
.catch((err) => {
if(err) {
logger.error("Error whilst checking Influxdb-LineProtocol:");
logger.error(err);
}
logger.fatal("Setup Influxdb-LineProtocol failed!");
exit(1);
});
}
logger.info("Starting tcpdump.."); logger.info("Starting tcpdump..");
const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i"; const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i";
@ -72,13 +120,12 @@ if(errorMsg){
let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm); let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm);
let packetStreamFactory = new PacketStreamFactory(); let packetStreamFactory = new PacketStreamFactory();
let packetInfluxPointFactory = new PacketInfluxPointFactory(); let packetInfluxPointFactory = new PacketInfluxPointFactory();
let influxPointWriter = new InfluxPointWriter(influxWriteApi);
proc.stdout proc.stdout
.setEncoding("utf8") .setEncoding("utf8")
.pipe(regexBlockStream) .pipe(regexBlockStream)
.pipe(packetStreamFactory) .pipe(packetStreamFactory)
.pipe(packetInfluxPointFactory) .pipe(packetInfluxPointFactory)
.pipe(influxPointWriter); .pipe(pointWriter);
logger.debug("Attaching error-logger.."); logger.debug("Attaching error-logger..");
const loggerTcpdump = logFactory("tcpdump"); const loggerTcpdump = logFactory("tcpdump");

@ -0,0 +1,72 @@
const logger = require.main.require("./helper/logger.js")("InfluxDbLineProtocolWriter");
const net = require("net");
/**
* Get points and write them into influx
*/
class InfluxDbLineProtocolWriter extends net.Socket{
/**
*
* @param {string} host Host of line-server
* @param {string} port Port of line-server
* @param {object} options Options for further configuration
*/
constructor(host, port, options = {}) {
super();
this._host = host;
this._port = port;
// options defaults
options.autoConnect ??= true;
options.timeout ??= 5000;
options.autoReconnect ??= true;
options.autoReconnectBackoffTime ??= 3000;
this._options = options;
this._isConnected = false;
super.setKeepAlive(true, 5000);
// Register auto-Reconnect if enabled
if(this._options.autoReconnect){
this.on("connect", () => {
logger.debug("Connection established!");
this._isConnected = true;
if(this._autoReconnectTimeout)
clearInterval(this._autoReconnectTimeout);
this._autoReconnectTimeout = 0;
});
this.on("error", (err) => {
logger.error(err.code, "TCP ERROR");
this._isConnected = false;
if(!this._autoReconnectTimeout)
this._autoReconnectTimeout = setInterval(() => {
this.connect();
},
this._options.autoReconnectBackoffTime);
});
}
// Autoconnect if requested
if(this._options.autoConnect) this.connect();
}
get host(){ return this._host; }
get port(){ return this._port; }
get isConnected(){ return this._isConnected; }
connect(){
logger.debug("Connecting..");
super.connect(this._port, this._host);
}
}
// Specify exports
module.exports = {
InfluxDbLineProtocolWriter
};

@ -0,0 +1,22 @@
const logger = require.main.require("./helper/logger.js")("InfluxPointToLineProtoStream");
const { Transform } = require("stream");
/**
* Get points and converts them to Line-protocol
*/
class InfluxPointToLineProtoStream extends Transform{
constructor(){
super({
writableObjectMode: true
});
}
_transform(point, encoding, next){
next(null, point.toLineProtocol() +"\n");
}
}
// Specify exports
module.exports = {
InfluxPointToLineProtoStream
};
Loading…
Cancel
Save