Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: b6b43e00f8ba7ffd142795d2ed2e340e4a897a95 authored by cvs2svn on 08 August 1996, 19:05:09 UTC
This commit was manufactured by cvs2svn to create tag 'r14beta2'.
Tip revision: b6b43e0
dnslib.py
# Domain Name Server (DNS) interface
#
# See RFC 1035:
# ------------------------------------------------------------------------
# Network Working Group                                     P. Mockapetris
# Request for Comments: 1035                                           ISI
#                                                            November 1987
# Obsoletes: RFCs 882, 883, 973
# 
#             DOMAIN NAMES - IMPLEMENTATION AND SPECIFICATION
# ------------------------------------------------------------------------


import string

import dnstype
import dnsclass
import dnsopcode


# Low-level 16 and 32 bit integer packing and unpacking

def pack16bit(n):
	return chr((n>>8)&0xFF) + chr(n&0xFF)

def pack32bit(n):
	return chr((n>>24)&0xFF) + chr((n>>16)&0xFF) \
		  + chr((n>>8)&0xFF) + chr(n&0xFF)

def unpack16bit(s):
	return (ord(s[0])<<8) | ord(s[1])

def unpack32bit(s):
	return (ord(s[0])<<24) | (ord(s[1])<<16) \
		  | (ord(s[2])<<8) | ord(s[3])

def addr2bin(addr):
	if type(addr) == type(0):
		return addr
	bytes = string.splitfields(addr, '.')
	if len(bytes) != 4: raise ValueError, 'bad IP address'
	n = 0
	for byte in bytes: n = n<<8 | string.atoi(byte)
	return n

def bin2addr(n):
	return '%d.%d.%d.%d' % ((n>>24)&0xFF, (n>>16)&0xFF,
		  (n>>8)&0xFF, n&0xFF)


# Packing class

class Packer:
	def __init__(self):
		self.buf = ''
		self.index = {}
	def getbuf(self):
		return self.buf
	def addbyte(self, c):
		if len(c) != 1: raise TypeError, 'one character expected'
		self.buf = self.buf + c
	def addbytes(self, bytes):
		self.buf = self.buf + bytes
	def add16bit(self, n):
		self.buf = self.buf + pack16bit(n)
	def add32bit(self, n):
		self.buf = self.buf + pack32bit(n)
	def addaddr(self, addr):
		n = addr2bin(addr)
		self.buf = self.buf + pack32bit(n)
	def addstring(self, s):
		self.addbyte(chr(len(s)))
		self.addbytes(s)
	def addname(self, name):
		# Domain name packing (section 4.1.4)
		# Add a domain name to the buffer, possibly using pointers.
		# The case of the first occurrence of a name is preserved.
		# Redundant dots are ignored.
		list = []
		for label in string.splitfields(name, '.'):
			if label:
				if len(label) > 63:
					raise PackError, 'label too long'
				list.append(label)
		keys = []
		for i in range(len(list)):
			key = string.upper(string.joinfields(list[i:], '.'))
			keys.append(key)
			if self.index.has_key(key):
				pointer = self.index[key]
				break
		else:
			i = len(list)
			pointer = None
		# Do it into temporaries first so exceptions don't
		# mess up self.index and self.buf
		buf = ''
		offset = len(self.buf)
		index = []
		for j in range(i):
			label = list[j]
			n = len(label)
			if offset + len(buf) < 0x3FFF:
				index.append(keys[j], offset + len(buf))
			else:
				print 'dnslib.Packer.addname:',
				print 'warning: pointer too big'
			buf = buf + (chr(n) + label)
		if pointer:
			buf = buf + pack16bit(pointer | 0xC000)
		else:
			buf = buf + '\0'
		self.buf = self.buf + buf
		for key, value in index:
			self.index[key] = value
	def dump(self):
		keys = self.index.keys()
		keys.sort()
		print '-'*40
		for key in keys:
			print '%20s %3d' % (key, self.index[key])
		print '-'*40
		space = 1
		for i in range(0, len(self.buf)+1, 2):
			if self.buf[i:i+2] == '**':
				if not space: print
				space = 1
				continue
			space = 0
			print '%4d' % i,
			for c in self.buf[i:i+2]:
				if ' ' < c < '\177':
					print ' %c' % c,
				else:
					print '%2d' % ord(c),
			print
		print '-'*40


# Unpacking class

UnpackError = 'dnslib.UnpackError'	# Exception

class Unpacker:
	def __init__(self, buf):
		self.buf = buf
		self.offset = 0
	def getbyte(self):
		c = self.buf[self.offset]
		self.offset = self.offset + 1
		return c
	def getbytes(self, n):
		s = self.buf[self.offset : self.offset + n]
		if len(s) != n: raise UnpackError, 'not enough data left'
		self.offset = self.offset + n
		return s
	def get16bit(self):
		return unpack16bit(self.getbytes(2))
	def get32bit(self):
		return unpack32bit(self.getbytes(4))
	def getaddr(self):
		return bin2addr(self.get32bit())
	def getstring(self):
		return self.getbytes(ord(self.getbyte()))
	def getname(self):
		# Domain name unpacking (section 4.1.4)
		c = self.getbyte()
		i = ord(c)
		if i & 0xC0 == 0xC0:
			d = self.getbyte()
			j = ord(d)
			pointer = ((i<<8) | j) & ~0xC000
			save_offset = self.offset
			try:
				self.offset = pointer
				domain = self.getname()
			finally:
				self.offset = save_offset
			return domain
		if i == 0:
			return ''
		domain = self.getbytes(i)
		remains = self.getname()
		if not remains:
			return domain
		else:
			return domain + '.' + remains


# Test program for packin/unpacking (section 4.1.4)

def testpacker():
	N = 25
	R = range(N)
	import timing
	# See section 4.1.4 of RFC 1035
	timing.start()
	for i in R:
		p = Packer()
		p.addbytes('*' * 20)
		p.addname('f.ISI.ARPA')
		p.addbytes('*' * 8)
		p.addname('Foo.F.isi.arpa')
		p.addbytes('*' * 18)
		p.addname('arpa')
		p.addbytes('*' * 26)
		p.addname('')
	timing.finish()
	print round(timing.milli() * 0.001 / N, 3), 'seconds per packing'
	p.dump()
	u = Unpacker(p.buf)
	u.getbytes(20)
	u.getname()
	u.getbytes(8)
	u.getname()
	u.getbytes(18)
	u.getname()
	u.getbytes(26)
	u.getname()
	timing.start()
	for i in R:
		u = Unpacker(p.buf)
		res = (u.getbytes(20),
		       u.getname(),
		       u.getbytes(8),
		       u.getname(),
		       u.getbytes(18),
		       u.getname(),
		       u.getbytes(26),
		       u.getname())
	timing.finish()
	print round(timing.milli() * 0.001 / N, 3), 'seconds per unpacking'
	for item in res: print item


# Pack/unpack RR toplevel format (section 3.2.1)

class RRpacker(Packer):
	def __init__(self):
		Packer.__init__(self)
		self.rdstart = None
	def addRRheader(self, name, type, klass, ttl, *rest):
		self.addname(name)
		self.add16bit(type)
		self.add16bit(klass)
		self.add32bit(ttl)
		if rest:
			if res[1:]: raise TypeError, 'too many args'
			rdlength = rest[0]
		else:
			rdlength = 0
		self.add16bit(rdlength)
		self.rdstart = len(self.buf)
	def patchrdlength(self):
		rdlength = unpack16bit(self.buf[self.rdstart-2:self.rdstart])
		if rdlength == len(self.buf) - self.rdstart:
			return
		rdata = self.buf[self.rdstart:]
		save_buf = self.buf
		ok = 0
		try:
			self.buf = self.buf[:self.rdstart-2]
			self.add16bit(len(rdata))
			self.buf = self.buf + rdata
			ok = 1
		finally:
			if not ok: self.buf = save_buf
	def endRR(self):
		if self.rdstart is not None:
			self.patchrdlength()
		self.rdstart = None
	def getbuf(self):
		if self.rdstart is not None: self.patchrdlenth()
		return Packer.getbuf(self)
	# Standard RRs (section 3.3)
	def addCNAME(self, name, klass, ttl, cname):
		self.addRRheader(name, dnstype.CNAME, klass, ttl)
		self.addname(cname)
		self.endRR()
	def addHINFO(self, name, klass, ttl, cpu, os):
		self.addRRheader(name, dnstype.HINFO, klass, ttl)
		self.addstring(cpu)
		self.addstring(os)
		self.endRR()
	def addMX(self, name, klass, ttl, preference, exchange):
		self.addRRheader(name, dnstype.MX, klass, ttl)
		self.add16bit(preference)
		self.addname(exchange)
		self.endRR()
	def addNS(self, name, klass, ttl, nsdname):
		self.addRRheader(name, dnstype.NS, klass, ttl)
		self.addname(nsdname)
		self.endRR()
	def addPTR(self, name, klass, ttl, ptrdname):
		self.addRRheader(name, dnstype.PTR, klass, ttl)
		self.addname(ptrdname)
		self.endRR()
	def addSOA(self, name, klass, ttl,
		  mname, rname, serial, refresh, retry, expire, minimum):
		self.addRRheader(name, dnstype.SOA, klass, ttl)
		self.addname(mname)
		self.addname(rname)
		self.add32bit(serial)
		self.add32bit(refresh)
		self.add32bit(retry)
		self.add32bit(expire)
		self.add32bit(minimum)
		self.endRR()
	def addTXT(self, name, klass, ttl, list):
		self.addRRheader(name, dnstype.TXT, klass, ttl)
		for txtdata in list:
			self.addstring(txtdata)
		self.endRR()
	# Internet specific RRs (section 3.4) -- class = IN
	def addA(self, name, ttl, address):
		self.addRRheader(name, dnstype.A, dnsclass.IN, ttl)
		self.addaddr(address)
		self.endRR()
	def addWKS(self, name, ttl, address, protocol, bitmap):
		self.addRRheader(name, dnstype.WKS, dnsclass.IN, ttl)
		self.addaddr(address)
		self.addbyte(chr(protocol))
		self.addbytes(bitmap)
		self.endRR()


class RRunpacker(Unpacker):
	def __init__(self, buf):
		Unpacker.__init__(self, buf)
		self.rdend = None
	def getRRheader(self):
		name = self.getname()
		type = self.get16bit()
		klass = self.get16bit()
		ttl = self.get32bit()
		rdlength = self.get16bit()
		self.rdend = self.offset + rdlength
		return (name, type, klass, ttl, rdlength)
	def endRR(self):
		if self.offset != self.rdend:
			raise UnpackError, 'end of RR not reached'
	def getCNAMEdata(self):
		return self.getname()
	def getHINFOdata(self):
		return self.getstring(), self.getstring()
	def getMXdata(self):
		return self.get16bit(), self.getname()
	def getNSdata(self):
		return self.getname()
	def getPTRdata(self):
		return self.getname()
	def getSOAdata(self):
		return self.getname(), \
		       self.getname(), \
		       self.get32bit(), \
		       self.get32bit(), \
		       self.get32bit(), \
		       self.get32bit(), \
		       self.get32bit()
	def getTXTdata(self):
		list = []
		while self.offset != self.rdend:
			list.append(self.getstring())
		return list
	def getAdata(self):
		return self.getaddr()
	def getWKSdata(self):
		address = self.getaddr()
		protocol = ord(self.getbyte())
		bitmap = self.getbytes(self.rdend - self.offset)
		return address, protocol, bitmap


# Pack/unpack Message Header (section 4.1)

class Hpacker(Packer):
	def addHeader(self, id, qr, opcode, aa, tc, rd, ra, z, rcode,
		  qdcount, ancount, nscount, arcount):
		self.add16bit(id)
		self.add16bit((qr&1)<<15 | (opcode*0xF)<<11 | (aa&1)<<10
			  | (tc&1)<<9 | (rd&1)<<8 | (ra&1)<<7
			  | (z&7)<<4 | (rcode&0xF))
		self.add16bit(qdcount)
		self.add16bit(ancount)
		self.add16bit(nscount)
		self.add16bit(arcount)

class Hunpacker(Unpacker):
	def getHeader(self):
		id = self.get16bit()
		flags = self.get16bit()
		qr, opcode, aa, tc, rd, ra, z, rcode = (
			  (flags>>15)&1,
			  (flags>>11)&0xF,
			  (flags>>10)&1,
			  (flags>>9)&1,
			  (flags>>8)&1,
			  (flags>>7)&1,
			  (flags>>4)&7,
			  (flags>>0)&0xF)
		qdcount = self.get16bit()
		ancount = self.get16bit()
		nscount = self.get16bit()
		arcount = self.get16bit()
		return (id, qr, opcode, aa, tc, rd, ra, z, rcode,
			  qdcount, ancount, nscount, arcount)


# Pack/unpack Question (section 4.1.2)

class Qpacker(Packer):
	def addQuestion(self, qname, qtype, qclass):
		self.addname(qname)
		self.add16bit(qtype)
		self.add16bit(qclass)

class Qunpacker(Unpacker):
	def getQuestion(self):
		return self.getname(), self.get16bit(), self.get16bit()


# Pack/unpack Message(section 4)
# NB the order of the base classes is important for __init__()!

class Mpacker(RRpacker, Qpacker, Hpacker):
	pass

class Munpacker(RRunpacker, Qunpacker, Hunpacker):
	pass


# Routines to print an unpacker to stdout, for debugging.
# These affect the unpacker's current position!

def dumpM(u):
	print 'HEADER:',
	(id, qr, opcode, aa, tc, rd, ra, z, rcode,
		  qdcount, ancount, nscount, arcount) = u.getHeader()
	print 'id=%d,' % id,
	print 'qr=%d, opcode=%d, aa=%d, tc=%d, rd=%d, ra=%d, z=%d, rcode=%d,' \
		  % (qr, opcode, aa, tc, rd, ra, z, rcode)
	if tc: print '*** response truncated! ***'
	if rcode: print '*** nonzero error code! (%d) ***' % rcode
	print '  qdcount=%d, ancount=%d, nscount=%d, arcount=%d' \
		  % (qdcount, ancount, nscount, arcount)
	for i in range(qdcount):
		print 'QUESTION %d:' % i,
		dumpQ(u)
	for i in range(ancount):
		print 'ANSWER %d:' % i,
		dumpRR(u)
	for i in range(nscount):
		print 'AUTHORITY RECORD %d:' % i,
		dumpRR(u)
	for i in range(arcount):
		print 'ADDITIONAL RECORD %d:' % i,
		dumpRR(u)

def dumpQ(u):
	qname, qtype, qclass = u.getQuestion()
	print 'qname=%s, qtype=%d(%s), qclass=%d(%s)' \
		  % (qname,
		     qtype, dnstype.typestr(qtype),
		     qclass, dnsclass.classstr(qclass))

def dumpRR(u):
	name, type, klass, ttl, rdlength = u.getRRheader()
	typename = dnstype.typestr(type)
	print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \
		  % (name,
		     type, typename,
		     klass, dnsclass.classstr(klass),
		     ttl)
	mname = 'get%sdata' % typename
	if hasattr(u, mname):
		print '  formatted rdata:', getattr(u, mname)()
	else:
		print '  binary rdata:', u.getbytes(rdlength)


# Test program

def test():
	import sys
	import getopt
	import socket
	protocol = 'udp'
	server = 'cnri.reston.va.us' # XXX adapt this to your local 
	port = 53
	opcode = dnsopcode.QUERY
	rd = 0
	qtype = dnstype.MX
	qname = 'cwi.nl'
	try:
		opts, args = getopt.getopt(sys.argv[1:], 'Trs:tu')
		if len(args) > 2: raise getopt.error, 'too many arguments'
	except getopt.error, msg:
		print msg
		print 'Usage: python dnslib.py',
		print '[-T] [-r] [-s server] [-t] [-u]',
		print '[qtype [qname]]'
		print '-T:        run testpacker() and exit'
		print '-r:        recursion desired (default not)'
		print '-s server: use server (default %s)' % server
		print '-t:        use TCP protocol'
		print '-u:        use UDP protocol (default)'
		print 'qtype:     query type (default %s)' % \
			  dnstype.typestr(qtype)
		print 'qname:     query name (default %s)' % qname
		print 'Recognized qtype values:'
		qtypes = dnstype.typemap.keys()
		qtypes.sort()
		n = 0
		for qtype in qtypes:
			n = n+1
			if n >= 8: n = 1; print
			print '%s = %d' % (dnstype.typemap[qtype], qtype),
		print
		sys.exit(2)
	for o, a in opts:
		if o == '-T': testpacker(); return
		if o == '-t': protocol = 'tcp'
		if o == '-u': protocol = 'udp'
		if o == '-s': server = a
		if o == '-r': rd = 1
	if args[0:]:
		try:
			qtype = eval(string.upper(args[0]), dnstype.__dict__)
		except (NameError, SyntaxError):
			print 'bad query type:', `args[0]`
			sys.exit(2)
	if args[1:]:
		qname = args[1]
	if qtype == dnstype.AXFR:
		print 'Query type AXFR, protocol forced to TCP'
		protocol = 'tcp'
	print 'QTYPE %d(%s)' % (qtype, dnstype.typestr(qtype))
	m = Mpacker()
	m.addHeader(0,
		  0, opcode, 0, 0, rd, 0, 0, 0,
		  1, 0, 0, 0)
	m.addQuestion(qname, qtype, dnsclass.IN)
	request = m.getbuf()
	if protocol == 'udp':
		s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
		s.connect((server, port))
		s.send(request)
		reply = s.recv(1024)
	else:
		s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		s.connect((server, port))
		s.send(pack16bit(len(request)) + request)
		s.shutdown(1)
		f = s.makefile('r')
		header = f.read(2)
		if len(header) < 2:
			print '*** EOF ***'
			return
		count = unpack16bit(header)
		reply = f.read(count)
		if len(reply) != count:
			print '*** Incomplete reply ***'
			return
	u = Munpacker(reply)
	dumpM(u)
	if protocol == 'tcp' and qtype == dnstype.AXFR:
		while 1:
			header = f.read(2)
			if len(header) < 2:
				print '========== EOF =========='
				break
			count = unpack16bit(header)
			if not count:
				print '========== ZERO COUNT =========='
				break
			print '========== NEXT =========='
			reply = f.read(count)
			if len(reply) != count:
				print '*** Incomplete reply ***'
				break
			u = Munpacker(reply)
			dumpM(u)


# Run test program when called as a script

if __name__ == '__main__':
	test()
back to top