9 Commits

Author SHA1 Message Date
2f84bb4408 Implement connection-checking 2021-12-09 17:15:31 +01:00
c09c6c29fb Move InfluxDB to if-block 2021-12-09 17:15:10 +01:00
1012001312 Implement env-var-checks 2021-12-09 17:14:11 +01:00
f596a99ee6 Implement converter-stream point -> lineProtocol 2021-12-09 17:13:34 +01:00
d1cf1d8f7d Add connection-state tracking as getter 2021-12-09 17:12:59 +01:00
4e2ffec656 Fix typo 2021-12-09 17:12:42 +01:00
7f5e168fda Added missing autoReconnectBackoffTime 2021-12-09 17:12:35 +01:00
024305db43 Fixed host and port wrong way around 2021-12-09 17:12:11 +01:00
6b93a02943 Implement basic LineProtocolWriter 2021-12-09 15:47:26 +01:00
4 changed files with 185 additions and 39 deletions

View File

@@ -27,11 +27,12 @@ Table of contents
- [3.3. Metric-Details](#33-metric-details) - [3.3. Metric-Details](#33-metric-details)
- [3.4. Tag-Overview](#34-tag-overview) - [3.4. Tag-Overview](#34-tag-overview)
- [3.5. Tag-Details](#35-tag-details) - [3.5. Tag-Details](#35-tag-details)
- [4. Potential Issues](#4-potential-issues) - [4. Screenshots](#4-screenshots)
- [4.1. Channel/Frequency](#41-channelfrequency) - [5. Potential Issues](#5-potential-issues)
- [4.2. Technology](#42-technology) - [5.1. Channel/Frequency](#51-channelfrequency)
- [4.3. Data protection](#43-data-protection) - [5.2. Technology](#52-technology)
- [4.4. Ethical](#44-ethical) - [5.3. Data protection](#53-data-protection)
- [5.4. Ethical](#54-ethical)
<!-- /TOC --> <!-- /TOC -->
<br> <br>
@@ -307,9 +308,13 @@ Unknown | - | Unknown packets not identified into above types
<br> <br>
# 4. Potential Issues # 4. Screenshots
## 4.1. Channel/Frequency <br>
# 5. Potential Issues
## 5.1. Channel/Frequency
The System can only monitor one channel at a time which might not be enough cover, The System can only monitor one channel at a time which might not be enough cover,
to combat this, more Interfaces and Systems can be deployed. to combat this, more Interfaces and Systems can be deployed.
@@ -318,14 +323,14 @@ This is not entirely unproblematic, as the system cannot currently prevent packa
<br> <br>
## 4.2. Technology ## 5.2. Technology
Mismatches between sender and receiver-technologies (e.g. MIMO or HT) can cause packets not being logged at all. Mismatches between sender and receiver-technologies (e.g. MIMO or HT) can cause packets not being logged at all.
Though this should only be a problem for data-packets. Though this should only be a problem for data-packets.
<br> <br>
## 4.3. Data protection ## 5.3. Data protection
Because the system collects any data, this can be problematic, specially in countries with strong data-protection laws. Because the system collects any data, this can be problematic, specially in countries with strong data-protection laws.
@@ -333,7 +338,7 @@ A wifi MAC address is likely to be considered as information of an identifiable
<br> <br>
## 4.4. Ethical ## 5.4. Ethical
The large-scale collection of data for behavioural or movement analysis, especially without consent of the data subject, is highly controversial. The large-scale collection of data for behavioural or movement analysis, especially without consent of the data subject, is highly controversial.

View File

@@ -14,6 +14,8 @@ const { RegexBlockStream } = require("./streamHandler/RegexBlockStream.js");
const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js"); const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js");
const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js"); const { PacketInfluxPointFactory } = require("./streamHandler/PacketInfluxPointFactory.js");
const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js"); const { InfluxPointWriter } = require("./streamHandler/InfluxPointWriter.js");
const { InfluxDbLineProtocolWriter } = require("./streamHandler/InfluxDbLineProtocolWriter.js");
const { InfluxPointToLineProtoStream } = require("./streamHandler/InfluxPointToLineProtoStream.js");
const userHelper = require("./helper/userHelper.js"); const userHelper = require("./helper/userHelper.js");
@@ -25,18 +27,25 @@ const env = process.env;
env.LOGLEVEL ??= "INFO"; env.LOGLEVEL ??= "INFO";
env.WIFI_INTERFACE ??= "wlan0"; env.WIFI_INTERFACE ??= "wlan0";
env.HOSTNAME ??= Os.hostname(); env.HOSTNAME ??= Os.hostname();
env.USE_INFLUXDB_LINEPROTOCOL ??= false;
} }
// Required vars // Required vars
let errorMsg = requireEnvVars([ let errorMsg = requireEnvVars(
env.USE_INFLUXDB_LINEPROTOCOL? [ // When lineprotocol is enabled, we need host and port
"INFLUXDB_LINEPROTOCOL_HOST", "INFLUXDB_LINEPROTOCOL_PORT",
] : [ // When its disabled, influxdb-data
"INFLUX_URL", "INFLUX_TOKEN", "INFLUX_URL", "INFLUX_TOKEN",
"INFLUX_ORG", "INFLUX_BUCKET" "INFLUX_ORG", "INFLUX_BUCKET"
]); ]);
if(errorMsg){ if(errorMsg){
logger.fatal(errorMsg); logger.fatal(errorMsg);
exit(1); exit(1);
} }
(async function() { (async function() {
let pointWriter;
if(!env.USE_INFLUXDB_LINEPROTOCOL){
logger.info("Setup Influx.."); logger.info("Setup Influx..");
const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN}); const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN});
@@ -61,7 +70,46 @@ if(errorMsg){
logger.debug("Get WriteApi & set default-hostname to", `'${env.HOSTNAME}'`); logger.debug("Get WriteApi & set default-hostname to", `'${env.HOSTNAME}'`);
const influxWriteApi = influxDb.getWriteApi(env.INFLUX_ORG, env.INFLUX_BUCKET, "us"); const influxWriteApi = influxDb.getWriteApi(env.INFLUX_ORG, env.INFLUX_BUCKET, "us");
//influxWriteApi.useDefaultTags({"hostname": env.HOSTNAME}); //influxWriteApi.useDefaultTags({"hostname": env.HOSTNAME});
pointWriter = new InfluxPointWriter(influxWriteApi);
logger.info("Influx ok"); logger.info("Influx ok");
}
else {
logger.info("Setup Influxdb-LineProtocol..");
let lineProtocolWriter = new InfluxDbLineProtocolWriter(env.INFLUXDB_LINEPROTOCOL_HOST, env.INFLUXDB_LINEPROTOCOL_PORT);
logger.debug("Create PointToLineProto and pipe to LineProtocolWriter");
pointWriter = new InfluxPointToLineProtoStream();
pointWriter
.setEncoding("utf8")
.pipe(lineProtocolWriter);
logger.debug("Waiting for connection..");
await new Promise((resolve, reject) => {
lineProtocolWriter.once("connect", () => {
resolve();
});
lineProtocolWriter.once("error", (err) => {
reject(err);
});
setTimeout(() => { // After timeout, reject promise
reject("Timeout whilst waiting to connect");
}, 6500);
})
.then(() => {
logger.info("Influxdb-LineProtocol ok");
})
.catch((err) => {
if(err) {
logger.error("Error whilst checking Influxdb-LineProtocol:");
logger.error(err);
}
logger.fatal("Setup Influxdb-LineProtocol failed!");
exit(1);
});
}
logger.info("Starting tcpdump.."); logger.info("Starting tcpdump..");
const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i"; const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i";
@@ -72,13 +120,12 @@ if(errorMsg){
let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm); let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm);
let packetStreamFactory = new PacketStreamFactory(); let packetStreamFactory = new PacketStreamFactory();
let packetInfluxPointFactory = new PacketInfluxPointFactory(); let packetInfluxPointFactory = new PacketInfluxPointFactory();
let influxPointWriter = new InfluxPointWriter(influxWriteApi);
proc.stdout proc.stdout
.setEncoding("utf8") .setEncoding("utf8")
.pipe(regexBlockStream) .pipe(regexBlockStream)
.pipe(packetStreamFactory) .pipe(packetStreamFactory)
.pipe(packetInfluxPointFactory) .pipe(packetInfluxPointFactory)
.pipe(influxPointWriter); .pipe(pointWriter);
logger.debug("Attaching error-logger.."); logger.debug("Attaching error-logger..");
const loggerTcpdump = logFactory("tcpdump"); const loggerTcpdump = logFactory("tcpdump");

View File

@@ -0,0 +1,72 @@
const logger = require.main.require("./helper/logger.js")("InfluxDbLineProtocolWriter");
const net = require("net");
/**
* Get points and write them into influx
*/
class InfluxDbLineProtocolWriter extends net.Socket{
/**
*
* @param {string} host Host of line-server
* @param {string} port Port of line-server
* @param {object} options Options for further configuration
*/
constructor(host, port, options = {}) {
super();
this._host = host;
this._port = port;
// options defaults
options.autoConnect ??= true;
options.timeout ??= 5000;
options.autoReconnect ??= true;
options.autoReconnectBackoffTime ??= 3000;
this._options = options;
this._isConnected = false;
super.setKeepAlive(true, 5000);
// Register auto-Reconnect if enabled
if(this._options.autoReconnect){
this.on("connect", () => {
logger.debug("Connection established!");
this._isConnected = true;
if(this._autoReconnectTimeout)
clearInterval(this._autoReconnectTimeout);
this._autoReconnectTimeout = 0;
});
this.on("error", (err) => {
logger.error(err.code, "TCP ERROR");
this._isConnected = false;
if(!this._autoReconnectTimeout)
this._autoReconnectTimeout = setInterval(() => {
this.connect();
},
this._options.autoReconnectBackoffTime);
});
}
// Autoconnect if requested
if(this._options.autoConnect) this.connect();
}
get host(){ return this._host; }
get port(){ return this._port; }
get isConnected(){ return this._isConnected; }
connect(){
logger.debug("Connecting..");
super.connect(this._port, this._host);
}
}
// Specify exports
module.exports = {
InfluxDbLineProtocolWriter
};

View File

@@ -0,0 +1,22 @@
const logger = require.main.require("./helper/logger.js")("InfluxPointToLineProtoStream");
const { Transform } = require("stream");
/**
* Get points and converts them to Line-protocol
*/
class InfluxPointToLineProtoStream extends Transform{
constructor(){
super({
writableObjectMode: true
});
}
_transform(point, encoding, next){
next(null, point.toLineProtocol() +"\n");
}
}
// Specify exports
module.exports = {
InfluxPointToLineProtoStream
};