#!/usr/bin/python3 import os, subprocess, socket, datetime, getopt, sys, re, atexit, logging as log import dns.flags import dns.message import dns.rdataclass import dns.rdatatype import dns.name from typing import cast def main(args): setupLogging(True) log.debug("Logging started") s = setupSocket('172.17.0.1', 53) startListen(s) def setupSocket(address, port): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind((address, port)) return s def startListen(s): log.debug('Now listening') while True: (address, dmsg) = receiveFromWire(s) if handleQuery(s, address, dmsg) != True: continue def receiveFromWire(s): (wire, address) = s.recvfrom(512) dmsg = dns.message.from_wire(wire) return (address, dmsg) def handleQuery(s, address, dmsg): log.info(f'{address[0]} |\tGot query') opcode = dmsg.opcode() if opcode != dns.opcode.NOTIFY: log.error(f"{address[0]} |\tExpected opcode=NOTIFY, but was {dns.opcode.to_text(opcode)}") makeResponseWithRCode(s, address, dmsg, dns.rcode.REFUSED) return False rcode = dmsg.rcode() if rcode != dns.rcode.NOERROR: log.error(f"{address[0]} |\tExpected rcode=NOERROR, but was {dns.rcode.to_text(rcode)}") makeResponseWithRCode(s, address, dmsg, dns.rcode.FORMERR) return False #flags = dmsg.flags #if flags != dns.flags.AA: # print('Expected flags=AA, but was', dns.flags.to_text(flags)) # continue if len(dmsg.question) != 1: log.error(f'{address[0]} |\tExpected question-len=1, but was {len(dmsg.question)}') makeResponseWithRCode(s, address, dmsg, dns.rcode.FORMERR) return False # Check record in question record = dmsg.question[0] r_datatype = record.rdtype; if r_datatype != dns.rdatatype.SOA: log.error(f'{address[0]} |\tExpected record to be SOA, but was {r_datatype}') makeResponseWithRCode(s, address, dmsg, dns.rcode.FORMERR) return False log.info(f'{address[0]} |\tNOTIFY for {record.name}') response = dns.message.make_response(dmsg) # type: dns.message.Message response.flags |= dns.flags.AA sendResponse(s, address, response) log.debug(f'{address[0]} |\tSent response') return True def makeResponseWithRCode(socket, address, dmsg, rcode): response = dns.message.make_response(dmsg) # type: dns.message.Message response.set_rcode(rcode) sendResponse(socket, address, response) def sendResponse(socket, address, response): wire = response.to_wire(cast(dns.name.Name, response)) socket.sendto(wire, address) def setupLogging(verbose: bool): level = log.INFO format = '%(asctime)s %(levelname)s:\t%(message)s' if verbose: level = log.DEBUG log.basicConfig(stream=sys.stdout, format=format, level=level) if __name__ == "__main__": sys.exit(main(sys.argv))