from enum import Enum from dataclasses import dataclass, field import time import ipaddress import fqdn class InvalidDataException(Exception): """Exception raised when invalid data is passed to a record.""" def __init__(self, message): self.message = message super().__init__(self, message) @dataclass class Record: """Base class for DNS records.""" rclass: str = 'IN' # DNS class, usually 'IN' for internet rtype: str = 'A' # Record type (A, AAAA, MX, etc.) name: str = '@' # Name of the record (e.g., domain name) data: str = '0.0.0.0' # Data associated with the record (e.g., IP address or hostname) ttl: int = 3600 # Time to live (TTL) for the record in seconds def __str__(self): """Returns a string representation of the record.""" return f"{self.name} {self.ttl} {self.rclass} {self.rtype} {self.data}" @dataclass class A(Record): """Represents an 'A' (IPv4 address) record.""" #host: str def __init__(self, name: str = '@', ttl: int = 3600, host: str = '0.0.0.0'): if isinstance(ipaddress.ip_address(host), ipaddress.IPv4Address): self.data = host else: raise InvalidDataException(message=f'{host} is not a valid IPv4 address.') self.rtype = 'A' self.name = name self.ttl = ttl @dataclass class AAAA(Record): """Represents an 'AAAA' (IPv6 address) record.""" #host:str def __init__(self, name: str = '@', ttl: int = 3600, host: str = '0.0.0.0'): if isinstance(ipaddress.ip_address(host), ipaddress.IPv6Address): self.data = host else: raise InvalidDataException(message=f'{host} is not a valid IPv6 address.') self.rtype = 'AAAA' self.name = name self.ttl = ttl @dataclass class CNAME(Record): """Represents a 'CNAME' (Canonical Name) record.""" #target: str def __init__(self, name: str = '@', ttl: int = 3600, target: str = 'example.com'): self.rtype = 'CNAME' self.name = name self.ttl = ttl if fqdn.FQDN(target).is_valid: self.data = target else: raise InvalidDataException(message=f'{target} is not a valid FQDN') @dataclass class MX(Record): """Represents an 'MX' (Mail Exchange) record.""" #host: str def __init__(self, name: str = '@', ttl: int = 3600, priority: int = 10, host: str = 'example.com'): self.rtype = 'MX' self.name = name self.ttl = ttl self.priority = priority self.host = host self.data = f"{self.priority} {self.host}" if fqdn.FQDN(host).is_valid: self.data = f"{self.priority} {self.host}" else: raise InvalidDataException(message=f'{host} is not a valid FQDN') @dataclass class NS(Record): """Represents an 'NS' (Name Server) record.""" #target: str def __init__(self, name: str = '@', ttl: int = 3600, target: str = 'example.com'): self.rtype = 'NS' self.name = name self.ttl = ttl self.target = target if fqdn.FQDN(target).is_valid: self.data = target else: raise InvalidDataException(message=f'{target} is not a valid FQDN') @dataclass class PTR(Record): """Represents a 'PTR' (Pointer) record.""" #host: str def __init__(self, name: str = '@', ttl: int = 3600, host: str = 'example.com'): self.rtype = 'PTR' self.name = name self.ttl = ttl if fqdn.FQDN(host).is_valid: self.data = host else: raise InvalidDataException(message=f'{host} is not a valid FQDN') @dataclass class SOA(Record): """Represents an 'SOA' (Start of Authority) record.""" #mname: str #rname: str #serial: int #refresh: int #retry: int #expire: int def __init__(self, name: str = '@', mname: str = 'ns1.example.com', rname: str = 'admin.example.com', serial: int = int(time.time()), refresh: int = 86400, retry: int = 7200, expire: int = 15552000, ttl: int = 21700): self.rtype = 'SOA' self.name = name self.mname = mname self.rname = rname self.serial = serial self.refresh = refresh self.retry = retry self.expire = expire self.ttl = ttl self.data = f"{self.name} {self.ttl} IN SOA {self.mname} {self.rname} {self.serial} {self.refresh} {self.retry} {self.expire}" @dataclass class SRV(Record): """Represents an 'SRV' (Service) record.""" #service: str #protocol: str #priority: int #weight: int #port: int #target: str def __init__(self, name: str = '@', ttl: int = 3600, service: str = "service", protocol: str = 'proto', priority: int = 10, weight: int = 10, port: int = 0, target: str = 'example.com'): self.rtype = 'SRV' self.name = f"_{service}._{protocol}.{name}" self.ttl = ttl self.service = service self.protocol = protocol self.priority = priority self.weight = weight self.port = port if fqdn.FQDN(target).is_valid: self.target = target else: raise InvalidDataException(message=f'{target} is not a valid FQDN') self.data = f"{self.priority} {self.weight} {self.port} {self.target}" @dataclass class Zone: """Represents a DNS zone containing multiple records.""" origin: str records: list = field(default_factory=list) def __init__(self, origin: str): """Initializes a zone with the given origin and ensures it ends with a dot.""" if origin[-1] != '.': self.origin = origin + '.' else: self.origin = origin def __str__(self): """Returns a string representation of the zone.""" zone = '' for record in self.records: zone += str(record) + '\n' return zone def __mkfqdn(self, name: str) -> str: """Converts a name to a fully qualified domain name (FQDN).""" if name[-1] != '.': return name + '.' + self.origin else: return name def new_A(self, name: str = '@', ttl: int = 3600, data: str = '0.0.0.0'): """Creates and adds a new A record to the zone.""" name = self.__mkfqdn(name) self.add(A(name=name, ttl=ttl, data=data)) def new_AAAA(self, name: str = '@', ttl: int = 3600, data: str = '0.0.0.0'): """Creates and adds a new AAAA record to the zone.""" name = self.__mkfqdn(name) self.add(AAAA(name=name, ttl=ttl, data=data)) def new_CNAME(self, name: str = '@', ttl: int = 3600, data: str = 'example.com'): """Creates and adds a new CNAME record to the zone.""" name = self.__mkfqdn(name) self.add(CNAME(name=name, ttl=ttl, target=data)) def new_MX(self, name: str = '@', ttl: int = 3600, priority: int = 10, host: str = 'example.com'): """Creates and adds a new MX record to the zone.""" name = self.__mkfqdn(name) self.add(MX(name=name, ttl=ttl, priority=priority, host=host)) def new_NS(self, name: str = '@', ttl: int = 3600, target: str = 'example.com'): """Creates and adds a new NS record to the zone.""" name = self.__mkfqdn(name) self.add(NS(name=name, ttl=ttl, target=target)) def new_PTR(self, name: str = '@', ttl: int = 3600, host: str = 'example.com'): """Creates and adds a new PTR record to the zone.""" name = self.__mkfqdn(name) self.add(PTR(name=name, ttl=ttl, host=host)) def new_soa(self, mname: str = 'ns1.example.com', rname: str = 'admin.example.com', serial: int = int(time.time()), refresh: int = 86400, retry: int = 7200, expire: int = 15552000, ttl: int = 21700): """Creates and adds a new SOA record to the zone.""" mname = self.__mkfqdn(mname) self.add(SOA(mname=mname, rname=rname, serial=serial, refresh=refresh, retry=retry, expire=expire, ttl=ttl)) def new_SRV(self, name: str = '@', ttl: int = 3600, service: str = 'service', protocol: str = 'proto', priority: int = 10, weight: int = 10, target: str = 'example.com'): """Creates and adds a new SRV record to the zone.""" name = self.__mkfqdn(name) self.add(SRV(name=name, ttl=ttl, service=service, protocol=protocol, priority=priority, weight=weight, target=target)) def new_record(self, name: str = '@', ttl: int = 3600, rtype: str = 'A', data: str = '0.0.0.0'): """Creates and adds a generic DNS record to the zone.""" name = self.__mkfqdn(name) self.add(Record(name=name, ttl=ttl, rtype=rtype, data=data)) def add(self, record: Record): """Adds a record to the zone.""" self.records.append(record) def save_file(self, filepath: str): """Saves the zone records to a file.""" with open(filepath, 'w') as file: for record in self.records: file.write(str(record) + '\n') print(str(record))