Implemented stream to transform packets to datapoints
This commit is contained in:
		
							parent
							
								
									38985ea9e2
								
							
						
					
					
						commit
						7ebcf573b9
					
				
							
								
								
									
										72
									
								
								src/streamHandler/PacketInfluxPointFactory.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								src/streamHandler/PacketInfluxPointFactory.js
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,72 @@
 | 
				
			|||||||
 | 
					const logger = require.main.require("./helper/logger.js")("PacketStreamFactory");
 | 
				
			||||||
 | 
					const { Transform } = require('stream');
 | 
				
			||||||
 | 
					const {Point} = require('@influxdata/influxdb-client')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/** Keys to always use as tags */
 | 
				
			||||||
 | 
					const TAG_LIST = [
 | 
				
			||||||
 | 
					    "srcMac",
 | 
				
			||||||
 | 
					    "dstMac",
 | 
				
			||||||
 | 
					    "bssid",
 | 
				
			||||||
 | 
					    "frequency",
 | 
				
			||||||
 | 
					    "flags",
 | 
				
			||||||
 | 
					];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/** Measurement-name and corresponding field-key */
 | 
				
			||||||
 | 
					const MEASUREMENT_MAP = new Map([
 | 
				
			||||||
 | 
					    ["Signal", "signal"],
 | 
				
			||||||
 | 
					    ["PayloadSize", "payloadSize"],
 | 
				
			||||||
 | 
					    ["DataRate", "dataRate"],
 | 
				
			||||||
 | 
					    ["SSID", "ssid"],
 | 
				
			||||||
 | 
					    ["AuthenticationType", "authenticationType"],
 | 
				
			||||||
 | 
					    ["AssociationSuccess", "associationIsSuccessful"],
 | 
				
			||||||
 | 
					    ["DisassociationReason", "disassociationReason"],
 | 
				
			||||||
 | 
					]);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * Get packets and convert them into influx-points
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					class PacketInfluxPointFactory extends Transform{
 | 
				
			||||||
 | 
					    constructor(){
 | 
				
			||||||
 | 
					        super({
 | 
				
			||||||
 | 
					            readableObjectMode: true,
 | 
				
			||||||
 | 
					            writableObjectMode: true
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    _transform(packet, encoding, next){
 | 
				
			||||||
 | 
					        // Create measurements
 | 
				
			||||||
 | 
					        MEASUREMENT_MAP.forEach((objKey, measurement) => {
 | 
				
			||||||
 | 
					            if(!Object.keys(packet).includes(objKey)) return;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            let point = new Point(measurement);     // Create point
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            // Set tags
 | 
				
			||||||
 | 
					            TAG_LIST.filter(tag => Object.keys(packet).includes(tag))
 | 
				
			||||||
 | 
					                    .forEach(tag => point.tag(tag, packet[tag]));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            point.setField('value', packet[objKey]);        // Set field
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            this.push(null, point);     // Push point into stream
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        next();     // Get next packet
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/** Mapping for type -> field-method */
 | 
				
			||||||
 | 
					const POINT_FIELD_TYPE = new Map([
 | 
				
			||||||
 | 
					    ['boolean',   function(key, value){ return this.booleanField(key, value); }],
 | 
				
			||||||
 | 
					    ['number',    function(key, value){ return this.intField(key, value); }],
 | 
				
			||||||
 | 
					    ['string',    function(key, value){ return this.stringField(key, value); }],
 | 
				
			||||||
 | 
					]);
 | 
				
			||||||
 | 
					Point.prototype.setField = function(key, value){
 | 
				
			||||||
 | 
					    let setField = POINT_FIELD_TYPE.get(typeof value);
 | 
				
			||||||
 | 
					    return setField.apply(this, [key, value]);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Specify exports
 | 
				
			||||||
 | 
					module.exports = {
 | 
				
			||||||
 | 
					    PacketInfluxPointFactory
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user