#!/usr/bin/env python3 # Add, remove or query IP addresses in DNSBL zone. # # Requires Debian packages: python3-dnspython 2.0.0, python-tz import argparse import sys import dns.update import dns.query import dns.tsigkeyring import dns.resolver import dns.zone import ipaddress import socket import time import datetime import pytz __author__ = "Niccolo Rigacci" __copyright__ = "Copyright 2020 Niccolo Rigacci " __license__ = "GPLv3-or-later" __email__ = "niccolo@rigacci.org" __version__ = "0.4.0" DNSBL_ZONE = 'bl.rigacci.org' DNS_HOSTNAME = 'ns1.rigacci.org' DNS_IPADDR = socket.gethostbyname(DNS_HOSTNAME) RNDC_KEY = {'bl-rigacci-org_rndc-key.' : 'rg2aizg+T6XkKkmpI42K7g=='} #--------------------------------------------------------------- #--------------------------------------------------------------- def reverse_address(address): if ':' in address: ipv6_address = ipaddress.ip_address(address) return ipv6_address.reverse_pointer.replace(".ip6.arpa", "") else: return '.'.join(reversed(address.split("."))) #--------------------------------------------------------------- #--------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description='Add, remove or query IP addresses in DNSBL zone.') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-a', '--add', metavar='ADDRESS', help='add address to the zone') group.add_argument('-r', '--remove', metavar='ADDRESS', help='remove address from the zone') group.add_argument('-q', '--query', metavar='ADDRESS', help='query for address in the zone') group.add_argument('-x', '--xfr', action='store_true', help='do an AXFR zone transfer') args = parser.parse_args() if args.query != None: address = args.query rev_address = reverse_address(address) sys.exit(QueryDNS(address, rev_address)) elif args.add != None: address = args.add rev_address = reverse_address(address) sys.exit(AddDNS(address, rev_address)) elif args.remove != None: address = args.remove rev_address = reverse_address(address) sys.exit(DeleteDNS(rev_address)) elif args.xfr: sys.exit(XfrDNS()) #--------------------------------------------------------------- #--------------------------------------------------------------- def AddDNS(address, rev_address, value='127.0.0.1'): print('Adding type A record "%s" for %s.%s :' % (value, rev_address, DNSBL_ZONE)), timestamp = int(time.time()) date = datetime.datetime.fromtimestamp(timestamp, tz=pytz.utc).strftime('%Y-%m-%dZ%H:%M:%S') keyring = dns.tsigkeyring.from_text(RNDC_KEY) update = dns.update.Update(DNSBL_ZONE, keyring = keyring, keyalgorithm = 'hmac-md5.sig-alg.reg.int') update.replace(rev_address, 8600, 'A', value) update.replace(rev_address, 8600, 'TXT', '"Address %s added at %d (%s)"' % (address, timestamp, date)) response = dns.query.tcp(update, DNS_IPADDR) if response.rcode() == dns.rcode.NOERROR: print('NOERROR') return 0 elif response.rcode() == dns.rcode.REFUSED: print('REFUSED') return 1 else: print('Response: %s' % (response,)) return 2 def DeleteDNS(rev_address): print('Removing type A record %s.%s :' % (rev_address, DNSBL_ZONE)), keyring = dns.tsigkeyring.from_text(RNDC_KEY) update = dns.update.Update(DNSBL_ZONE, keyring = keyring, keyalgorithm = 'hmac-md5.sig-alg.reg.int') update.delete(rev_address, 'A') update.delete(rev_address, 'TXT') response = dns.query.tcp(update, DNS_IPADDR) if response.rcode() == dns.rcode.NOERROR: print('NOERROR') return 0 elif response.rcode() == dns.rcode.REFUSED: print('REFUSED') return 1 else: print('Response: %s' % (response,)) return 2 def QueryDNS(address, rev_address): query = rev_address + "." + DNSBL_ZONE resolver = dns.resolver.Resolver() resolver.timeout = 8 resolver.lifetime = 8 try: answers = resolver.resolve(query, 'A') a_record = answers[0] except: a_record = None try: answers = resolver.resolve(query, 'TXT') txt_record = answers[0] except: txt_record = None if txt_record != None: print("Address %s: TXT record for %s.%s => %s" % (address, rev_address, DNSBL_ZONE, txt_record)) if a_record != None: print("Address %s: A record for %s.%s => %s" % (address, rev_address, DNSBL_ZONE, a_record)) return 1 else: print("Address %s is not listed." % (address,)) return 0 def XfrDNS(): print('===== AXFR from zone %s =====' % (DNSBL_ZONE,)) z = dns.zone.from_xfr(dns.query.xfr(DNS_IPADDR, DNSBL_ZONE)) for n in z.nodes.keys(): record = z[n].to_text(n) if ' IN A 127' in record: print(record) return #--------------------------------------------------------------- #--------------------------------------------------------------- if __name__ == '__main__': main()