diff --git a/Dockerfile b/Dockerfile index ff467e9..d7e4be0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM node:16 +FROM node:16-alpine # Create app directory WORKDIR /usr/src/app @@ -7,9 +7,8 @@ WORKDIR /usr/src/app COPY package*.json ./ RUN npm install -RUN apt-get update -RUN apt-get -y install \ - tcpdump +RUN apk update +RUN apk add tcpdump # Bundle app source COPY ./src/ . diff --git a/src/helper/env.js b/src/helper/env.js index 08bff89..8d0cabe 100644 --- a/src/helper/env.js +++ b/src/helper/env.js @@ -1,9 +1,9 @@ function requireEnvVars(requiredEnv){ // Ensure required ENV vars are set - let unsetEnv = requiredEnv.filter((env) => !(typeof process.env[env] !== 'undefined')); + let unsetEnv = requiredEnv.filter((env) => (typeof process.env[env] === "undefined")); if (unsetEnv.length > 0) { - return "Required ENV variables are not set: [" + unsetEnv.join(', ') + "]"; + return "Required ENV variables are not set: [" + unsetEnv.join(", ") + "]"; } } diff --git a/src/helper/exec.js b/src/helper/exec.js index ed7f628..8195f7b 100644 --- a/src/helper/exec.js +++ b/src/helper/exec.js @@ -1,7 +1,7 @@ const logger = require("./logger.js")("exec"); const { spawn } = require("child_process"); -const { parseArgsStringToArgv } = require('string-argv'); +const { parseArgsStringToArgv } = require("string-argv"); function exec(cmd, options){ diff --git a/src/helper/hexConverter.js b/src/helper/hexConverter.js index 00c2e8c..c2c5da2 100644 --- a/src/helper/hexConverter.js +++ b/src/helper/hexConverter.js @@ -21,4 +21,4 @@ function bytesToHex(bytes) { module.exports = { hexToBytes, bytesToHex -} \ No newline at end of file +}; \ No newline at end of file diff --git a/src/helper/influx-checks.js b/src/helper/influx-checks.js index 2c77ede..6eb7a61 100644 --- a/src/helper/influx-checks.js +++ b/src/helper/influx-checks.js @@ -1,8 +1,8 @@ const logger = require.main.require("./helper/logger.js")("influx-checks"); const Os = require("os"); -const { InfluxDB, Point } = require('@influxdata/influxdb-client') -const Influx = require('@influxdata/influxdb-client-apis'); +const { InfluxDB, Point } = require("@influxdata/influxdb-client") +const Influx = require("@influxdata/influxdb-client-apis"); function checkHealth(influxDb){ @@ -39,7 +39,7 @@ function checkBucket(influxDb, options){ function checkWriteApi(influxDb, options){ return new Promise((resolve, reject) => { const writeApi = influxDb.getWriteApi(options.org, options.bucket); // Get WriteAPI - writeApi.writePoint(new Point("worker_connectionTest").tag("hostname", Os.hostname())) // Write point + writeApi.writePoint(new Point("worker_connectionTest").tag("hostname", Os.hostname())); // Write point writeApi.close() .catch((err) => { logger.error("Could not get writeApi:"); diff --git a/src/helper/userHelper.js b/src/helper/userHelper.js index 50a1624..2be6302 100644 --- a/src/helper/userHelper.js +++ b/src/helper/userHelper.js @@ -5,15 +5,15 @@ function detectStreamData(stream, timeout = 5000){ let timeoutHandler; if(timeout){ timeoutHandler = setTimeout(() => { - reject('timeout'); + reject("timeout"); remListeners(); }, timeout); } function remListeners(){ - stream.removeListener('error', errorHandler); - stream.removeListener('data', dataHandler); + stream.removeListener("error", errorHandler); + stream.removeListener("data", dataHandler); if(timeoutHandler) clearTimeout(timeoutHandler); } @@ -25,8 +25,8 @@ function detectStreamData(stream, timeout = 5000){ remListeners(); } - stream.on('error', errorHandler); - stream.on('data', dataHandler); + stream.on("error", errorHandler); + stream.on("data", dataHandler); }); } @@ -34,7 +34,7 @@ function detectStreamsData(streams, timeout = 5000){ let promises = []; streams.forEach((stream) => { promises.push(detectStreamData(stream, timeout)); - }) + }); return promises; } diff --git a/src/helper/wifiStateAnalyzer.js b/src/helper/wifiStateAnalyzer.js index e2622de..46ec2f0 100644 --- a/src/helper/wifiStateAnalyzer.js +++ b/src/helper/wifiStateAnalyzer.js @@ -1,4 +1,4 @@ -const { HandshakeStage } = require.main.require('./dto/Packet.js'); +const { HandshakeStage } = require.main.require("./dto/Packet.js"); function keyInfoFromRaw(keyInfoRaw) { return { @@ -27,7 +27,7 @@ function handshakeStageFromKeyInfo(keyInfo){ // Extract compare-keys let keyData = ""; - for (const key of HANDSHAKE_STAGE_KEYINFO['keys']) { + for (const key of HANDSHAKE_STAGE_KEYINFO["keys"]) { keyData += keyInfo[key].toString(); } diff --git a/src/main.js b/src/main.js index d471c8f..dcf991d 100644 --- a/src/main.js +++ b/src/main.js @@ -6,8 +6,8 @@ const { requireEnvVars } = require("./helper/env.js"); const { exit } = require("process"); const { exec } = require("./helper/exec.js"); -const { InfluxDB } = require('@influxdata/influxdb-client'); -const InfluxChecks = require('./helper/influx-checks.js'); +const { InfluxDB } = require("@influxdata/influxdb-client"); +const InfluxChecks = require("./helper/influx-checks.js"); const { RegexBlockStream } = require("./streamHandler/RegexBlockStream.js"); const { PacketStreamFactory } = require("./streamHandler/PacketStreamFactory.js"); @@ -21,111 +21,111 @@ const userHelper = require("./helper/userHelper.js"); const env = process.env; // Defaults { - env.LOGLEVEL ??= "INFO"; - env.WIFI_INTERFACE ??= "wlan0"; + env.LOGLEVEL ??= "INFO"; + env.WIFI_INTERFACE ??= "wlan0"; } // Required vars let errorMsg = requireEnvVars([ - "INFLUX_URL", "INFLUX_TOKEN", - "INFLUX_ORG", "INFLUX_BUCKET" + "INFLUX_URL", "INFLUX_TOKEN", + "INFLUX_ORG", "INFLUX_BUCKET" ]); if(errorMsg){ - logger.fatal(errorMsg); - exit(1); + logger.fatal(errorMsg); + exit(1); } (async function() { - logger.info("Setup Influx.."); - const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN}); - - await InfluxChecks.checkHealth(influxDb) - .then((res) => {return InfluxChecks.checkBucket(influxDb, { - org: env.INFLUX_ORG, - name: env.INFLUX_BUCKET - })}) - .then((res) => {return InfluxChecks.checkWriteApi(influxDb, { - org: env.INFLUX_ORG, - bucket: env.INFLUX_BUCKET - })}) - .catch((err) => { - if(err) { - logger.error("Error whilst checking influx:"); - logger.error(err); - } - logger.fatal("Setup influx failed!"); - exit(1); + logger.info("Setup Influx.."); + const influxDb = new InfluxDB({url: env.INFLUX_URL, token: env.INFLUX_TOKEN}); + + await InfluxChecks.checkHealth(influxDb) + .then((res) => {return InfluxChecks.checkBucket(influxDb, { + org: env.INFLUX_ORG, + name: env.INFLUX_BUCKET + });}) + .then((res) => {return InfluxChecks.checkWriteApi(influxDb, { + org: env.INFLUX_ORG, + bucket: env.INFLUX_BUCKET + });}) + .catch((err) => { + if(err) { + logger.error("Error whilst checking influx:"); + logger.error(err); + } + logger.fatal("Setup influx failed!"); + exit(1); + }); + + logger.info("Influx ok"); + + logger.info("Starting tcpdump.."); + const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i"; + let cmd = `${TCPDUMP_BASECMD} ${env.WIFI_INTERFACE}`; + + let proc = exec(cmd); + logger.debug("Creating & Attaching streams.."); + let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm); + let packetStreamFactory = new PacketStreamFactory(); + let packetInfluxPointFactory = new PacketInfluxPointFactory(); + let influxPointWriter = new InfluxPointWriter(influxDb, env.INFLUX_ORG, env.INFLUX_BUCKET); + proc.stdout + .setEncoding("utf8") + .pipe(regexBlockStream) + .pipe(packetStreamFactory) + .pipe(packetInfluxPointFactory) + .pipe(influxPointWriter); + + logger.debug("Attaching error-logger.."); + const loggerTcpdump = logFactory("tcpdump"); + proc.stderr.setEncoding("utf8").on("data", (data) => { + if(!data.match(/^(tcpdump: )?listening on /i) || !data.match(/^\d+ packets captured/i)) { // Catch start-error + loggerTcpdump.debug(data); + } + else loggerTcpdump.error(data); }); - logger.info("Influx ok"); - - logger.info("Starting tcpdump.."); - const TCPDUMP_BASECMD = "tcpdump -vvv -e -n -X -s0 -i" - let cmd = `${TCPDUMP_BASECMD} ${env.WIFI_INTERFACE}`; - - let proc = exec(cmd); - logger.debug("Creating & Attaching streams.."); - let regexBlockStream = new RegexBlockStream(/^\d{2}:\d{2}:\d{2}.\d{6}.*(\n( {4,8}|\t\t?).*)+\n/gm); - let packetStreamFactory = new PacketStreamFactory(); - let packetInfluxPointFactory = new PacketInfluxPointFactory(); - let influxPointWriter = new InfluxPointWriter(influxDb, env.INFLUX_ORG, env.INFLUX_BUCKET); - proc.stdout - .setEncoding("utf8") - .pipe(regexBlockStream) - .pipe(packetStreamFactory) - .pipe(packetInfluxPointFactory) - .pipe(influxPointWriter); - - logger.debug("Attaching error-logger.."); - const loggerTcpdump = logFactory("tcpdump"); - proc.stderr.setEncoding("utf8").on("data", (data) => { - if(!data.match(/^(tcpdump: )?listening on /i) || !data.match(/^\d+ packets captured/i)) { // Catch start-error - loggerTcpdump.debug(data); - } - else loggerTcpdump.error(data); - }); - - // FIXME: This is a hacky workaround to not let errors from subprocess bubble up and terminate our process - regexBlockStream.on('error', (err) => {}); - - proc.on("error", (err) => { - loggerTcpdump.error(err); - }); - - const loggerPacketStream = logFactory("PacketStreamFactory"); - userHelper.detectStreamData(proc.stdout, 10000) // Expect tcpdump-logs to have data after max. 10s - .then(() => { - loggerTcpdump.debug("Got first data"); - userHelper.detectStreamData(packetStreamFactory, 10000) // Expect then to have packets after further 10s - .then(() => { - loggerPacketStream.debug("Got first packet"); - }) - .catch((err) => { - if(err == 'timeout') loggerPacketStream.warn("No packets"); - }); - }) - .catch((err) => { - if(err == 'timeout') loggerTcpdump.warn("No data after 10s! Wrong configuration?"); + // FIXME: This is a hacky workaround to not let errors from subprocess bubble up and terminate our process + regexBlockStream.on("error", (err) => {}); + + proc.on("error", (err) => { + loggerTcpdump.error(err); }); - logger.debug("Attaching exit-handler.."); - proc.on("exit", (code) => { - loggerTcpdump.debug(`tcpdump exited code: ${code}`); - if (code) { - loggerTcpdump.fatal(`tcpdump exited with non-zero code: ${code}`); - exit(1); + const loggerPacketStream = logFactory("PacketStreamFactory"); + userHelper.detectStreamData(proc.stdout, 10000) // Expect tcpdump-logs to have data after max. 10s + .then(() => { + loggerTcpdump.debug("Got first data"); + userHelper.detectStreamData(packetStreamFactory, 10000) // Expect then to have packets after further 10s + .then(() => { + loggerPacketStream.debug("Got first packet"); + }) + .catch((err) => { + if(err == "timeout") loggerPacketStream.warn("No packets"); + }); + }) + .catch((err) => { + if(err == "timeout") loggerTcpdump.warn("No data after 10s! Wrong configuration?"); + }); + + logger.debug("Attaching exit-handler.."); + proc.on("exit", (code) => { + loggerTcpdump.debug(`tcpdump exited code: ${code}`); + if (code) { + loggerTcpdump.fatal(`tcpdump exited with non-zero code: ${code}`); + exit(1); + } + logger.info("Shutdown"); + exit(0); + }); + + // Handle stop-signals for graceful shutdown + function shutdownReq() { + logger.info("Shutdown request received.."); + logger.debug("Stopping subprocess tcpdump, then exiting myself.."); + proc.kill(); // Kill process (send SIGTERM), then upper event-handler will stop self } - logger.info("Shutdown"); - exit(0); - }); - - // Handle stop-signals for graceful shutdown - function shutdownReq() { - logger.info("Shutdown request received.."); - logger.debug("Stopping subprocess tcpdump, then exiting myself.."); - proc.kill(); // Kill process (send SIGTERM), then upper event-handler will stop self - } - process.on('SIGTERM', shutdownReq); - process.on('SIGINT', shutdownReq); - - logger.info("Startup complete"); + process.on("SIGTERM", shutdownReq); + process.on("SIGINT", shutdownReq); + + logger.info("Startup complete"); })(); diff --git a/src/streamHandler/InfluxPointWriter.js b/src/streamHandler/InfluxPointWriter.js index 9bd96ad..ea7f76e 100644 --- a/src/streamHandler/InfluxPointWriter.js +++ b/src/streamHandler/InfluxPointWriter.js @@ -1,6 +1,6 @@ const logger = require.main.require("./helper/logger.js")("InfluxPointWriter"); -const { Writable } = require('stream'); -const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client') +const { Writable } = require("stream"); +const {InfluxDB, Point, HttpError} = require("@influxdata/influxdb-client"); /** * Get points and write them into influx @@ -17,7 +17,7 @@ class InfluxPointWriter extends Writable{ super({ objectMode: true }); - this._api = influxDb.getWriteApi(org, bucket, 'us', options); + this._api = influxDb.getWriteApi(org, bucket, "us", options); } _write(point, encoding, next){ diff --git a/src/streamHandler/PacketInfluxPointFactory.js b/src/streamHandler/PacketInfluxPointFactory.js index e21b7b0..8684b3a 100644 --- a/src/streamHandler/PacketInfluxPointFactory.js +++ b/src/streamHandler/PacketInfluxPointFactory.js @@ -1,6 +1,6 @@ const logger = require.main.require("./helper/logger.js")("PacketStreamFactory"); -const { Transform } = require('stream'); -const {Point} = require('@influxdata/influxdb-client') +const { Transform } = require("stream"); +const {Point} = require("@influxdata/influxdb-client"); /** Keys to always use as tags */ const TAG_LIST = [ @@ -21,6 +21,7 @@ const MEASUREMENT_MAP = new Map([ ["AuthenticationType", "authenticationType"], ["AssociationSuccess", "associationIsSuccessful"], ["DisassociationReason", "disassociationReason"], + ["HandshakeStage", "handshakeStage"], ]); @@ -44,12 +45,12 @@ class PacketInfluxPointFactory extends Transform{ // Set tags TAG_LIST.filter(tag => Object.keys(packet).includes(tag)) // Filter tags available on object - .filter(tag => packet[tag] != null) // Filter tags not falsy on object - .forEach(tag => { - tagObjectRecursively(point, tag, packet[tag]); - }); + .filter(tag => packet[tag] != null) // Filter tags not falsy on object + .forEach(tag => { + tagObjectRecursively(point, tag, packet[tag]); + }); - point.setField('value', packet[objKey]); // Set field + point.setField("value", packet[objKey]); // Set field this.push(point); // Push point into stream }); @@ -70,14 +71,14 @@ function tagObjectRecursively(point, tag, field, suffix = ""){ /** Mapping for type -> field-method */ const POINT_FIELD_TYPE = new Map([ - ['boolean', function(key, value){ return this.booleanField(key, value); }], - ['number', function(key, value){ return this.intField(key, value); }], - ['string', function(key, value){ return this.stringField(key, value); }], + ["boolean", function(key, value){ return this.booleanField(key, value); }], + ["number", function(key, value){ return this.intField(key, value); }], + ["string", function(key, value){ return this.stringField(key, value); }], ]); Point.prototype.setField = function(key, value){ let setField = POINT_FIELD_TYPE.get(typeof value); return setField.apply(this, [key, value]); -} +}; // Specify exports module.exports = { diff --git a/src/streamHandler/PacketStreamFactory.js b/src/streamHandler/PacketStreamFactory.js index 9d7c866..febfb93 100644 --- a/src/streamHandler/PacketStreamFactory.js +++ b/src/streamHandler/PacketStreamFactory.js @@ -1,7 +1,7 @@ const logger = require.main.require("./helper/logger.js")("PacketStreamFactory"); -const { Transform } = require('stream'); +const { Transform } = require("stream"); const { DateTime } = require("luxon"); -const { PacketType, FlagType, Packet, PacketWithSSID, BeaconPacket, ProbeRequestPacket, ProbeResponsePacket, AuthenticationPacket, AuthenticationType, AssociationResponsePacket, DisassociationPacket, HandshakePacket, HandshakeStage } = require.main.require('./dto/Packet.js'); +const { PacketType, FlagType, Packet, PacketWithSSID, BeaconPacket, ProbeRequestPacket, ProbeResponsePacket, AuthenticationPacket, AuthenticationType, AssociationResponsePacket, DisassociationPacket, HandshakePacket, HandshakeStage } = require.main.require("./dto/Packet.js"); const hexConv = require.main.require("./helper/hexConverter.js"); const wifiStateAnalyser = require.main.require("./helper/wifiStateAnalyzer.js"); @@ -20,20 +20,20 @@ const PACKET_TYPE_MAP = { "Disassociation:": PacketType.Disassociation, "EAPOL": PacketType.Handshake, }; -const PACKET_TYPES_REGEX = Object.keys(PACKET_TYPE_MAP).join('|'); +const PACKET_TYPES_REGEX = Object.keys(PACKET_TYPE_MAP).join("|"); const AUTHENTICATION_TYPE_MAP = { "(Open System)-1": AuthenticationType.OpenSystem_1, "(Open System)-2": AuthenticationType.OpenSystem_2, -} +}; const FLAG_TYPE_MAP = { "Retry": FlagType.Retry, "Pwr Mgmt": FlagType.PwrMgt, "More Data": FlagType.MoreData, "Protected": FlagType.Protected, -} -const FLAG_TYPE_MAPS_REGEX = Object.keys(FLAG_TYPE_MAP).join('|'); +}; +const FLAG_TYPE_MAPS_REGEX = Object.keys(FLAG_TYPE_MAP).join("|"); /** * Read data from text-blocks and convert them to Packet @@ -49,8 +49,8 @@ class PacketStreamFactory extends Transform{ _transform(chunk, encoding, next){ let packet = new Packet(); - const lines = chunk.split('\n'); - const header = lines.splice(0, 1)[0]; // Grab first line, 'lines' is now the payload + const lines = chunk.split("\n"); + const header = lines.splice(0, 1)[0]; // Grab first line, "lines" is now the payload packet = this._handleHeader(packet, header); packet = this._handlePayload(packet, lines); @@ -62,7 +62,7 @@ class PacketStreamFactory extends Transform{ packet.timestampMicros = DateTime.fromISO(data.slice(0, 12)).toSeconds() + data.slice(12, 15)/1000000; // Find flags - data.match(data.match(new RegExp('(?<=^|\\s)('+ FLAG_TYPE_MAPS_REGEX +')(?=$|\\s)', 'ig')) + data.match(data.match(new RegExp("(?<=^|\\s)("+ FLAG_TYPE_MAPS_REGEX +")(?=$|\\s)", "ig")) ?.forEach(match => packet.flags[FLAG_TYPE_MAP[match]] = true) // Set them to true in flags ); @@ -73,11 +73,11 @@ class PacketStreamFactory extends Transform{ packet.signal = Number(data.match(/(?<=^|\s)-\d{2,3}(?=dBm\sSignal($|\s))/i)?.[0]) || null; - let packetTypeStr = data.match(new RegExp('(?<=^|\\s)('+ PACKET_TYPES_REGEX +')(?=$|\\s)', 'i'))?.[0]; + let packetTypeStr = data.match(new RegExp("(?<=^|\\s)("+ PACKET_TYPES_REGEX +")(?=$|\\s)", "i"))?.[0]; if(packetTypeStr) packet.packetType = PACKET_TYPE_MAP[packetTypeStr]; else if(data.match(/(SA|TA|DA|RA|BSSID):.{17}\s*$/i)){ - packet.packetType = PacketType.NoData + packet.packetType = PacketType.NoData; } else { packet.packetType = PacketType.Unknown; @@ -97,12 +97,12 @@ class PacketStreamFactory extends Transform{ case PacketType.ProbeResponse: case PacketType.AssociationRequest: newPacket = new PacketWithSSID(); - newPacket.ssid = data.match(new RegExp('(?<=(^|\\s)'+ packetTypeStr +'\\s\\().{0,32}(?=\\)($|\\s))', 'i'))?.[0] ?? null; + newPacket.ssid = data.match(new RegExp("(?<=(^|\\s)"+ packetTypeStr +"\\s\\().{0,32}(?=\\)($|\\s))", "i"))?.[0] ?? null; break; case PacketType.Authentication: newPacket = new AuthenticationPacket(); - newPacket.authenticationType = AUTHENTICATION_TYPE_MAP[data.match(/(?<=(^|\s)Authentication\s).{3,}(?=\:(\s|$))/i)[0]] ?? AuthenticationType.Unknown; + newPacket.authenticationType = AUTHENTICATION_TYPE_MAP[data.match(/(?<=(^|\s)Authentication\s).{3,}(?=:(\s|$))/i)[0]] ?? AuthenticationType.Unknown; break; case PacketType.AssociationResponse: @@ -121,16 +121,16 @@ class PacketStreamFactory extends Transform{ } _handlePayload(packet, data){ - data = data.join(''); + data = data.join(""); // Get payload-Hex-Data. If there is no data: empty - packet.payloadData = hexConv.hexToBytes(data.match(/(?<=\s)([A-F0-9]{1,4}(?=\s))/igm)?.join('') ?? ''); + packet.payloadData = hexConv.hexToBytes(data.match(/(?<=\s)([A-F0-9]{1,4}(?=\s))/igm)?.join("") ?? ""); packet.payloadData.splice(packet.payloadData.length-4, 4); // Remove FrameCheck sequence // Cover special cases with more data let newPacket; switch(packet.packetType){ - case PacketType.Handshake: + case PacketType.Handshake: { newPacket = new HandshakePacket(); // Read key-information @@ -139,6 +139,7 @@ class PacketStreamFactory extends Transform{ newPacket.handshakeStage = wifiStateAnalyser.handshakeStageFromKeyInfo(keyInfo); // Get stage break; + } } if(newPacket) packet = Object.assign(newPacket, packet); diff --git a/src/streamHandler/RegexBlockStream.js b/src/streamHandler/RegexBlockStream.js index 30d7646..9ef48b9 100644 --- a/src/streamHandler/RegexBlockStream.js +++ b/src/streamHandler/RegexBlockStream.js @@ -1,5 +1,5 @@ const logger = require.main.require("./helper/logger.js")("RegexBlockStream"); -const { Transform } = require('stream') +const { Transform } = require("stream"); /** * Matches whole blocks as regex and passes them on @@ -27,7 +27,7 @@ class RegexBlockStream extends Transform{ } _transform(chunk, encoding, next){ - chunk = this.readableBuffer.length? this.readableBuffer.join('') + chunk: chunk; // Add previous buffer to current chunk + chunk = this.readableBuffer.length? this.readableBuffer.join("") + chunk: chunk; // Add previous buffer to current chunk this.readableBuffer.length && this.readableBuffer.clear(); // Clear buffer once we read it let matches = chunk.match(this.matcher); // Match @@ -44,17 +44,17 @@ class RegexBlockStream extends Transform{ if(matches){ matches.forEach((match) => { this.push(match); // Write match to stream - if(chunk) chunk = chunk.replace(match, ''); // Remove match from chunks + if(chunk) chunk = chunk.replace(match, ""); // Remove match from chunks }); } if(chunk) return chunk; } _flush(next){ - if(matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer - let chunk = this.readableBuffer.join(''); + if(this.matchAllOnFlush){ // When requested, we'll match one last time over the remaining buffer + let chunk = this.readableBuffer.join(""); let matches = chunk.match(this.matcher); // Match remaining buffer - _writeMatches(matches); // Write matches including last element + this._writeMatches(matches); // Write matches including last element } next(); // Tell system we are done