Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: 6c3a3aa17b028f6b93067083d32c7eaa4338757c authored by cvs2svn on 25 October 1996, 15:40:20 UTC
This commit was manufactured by cvs2svn to create tag 'release14'.
Tip revision: 6c3a3aa
xdrlib.py
"""Implements (a subset of) Sun XDR -- eXternal Data Representation.

See: RFC 1014

This module will conditionally use the _xdrmodule.so module to get
support for those representations we can't do much with from Python.

"""

import struct
from types import LongType

# use C layer XDR libraries for some data types if available
try:
    import _xdr
except ImportError:
    _xdr = None

# this test is done to see if machine representation is the same as
# network representation.  if so, we can use module struct for packing
# some data types
_USE_MACHINE_REP = (struct.pack('l', 1) == '\0\0\0\1')

# exceptions
class Error:
    """Exception class for this module. Use:

    except xdrlib.Error, var:
        # var has the Error instance for the exception

    Public ivars:
        msg -- contains the message

    """
    def __init__(self, msg):
	self.msg = msg
    def __repr__(self):
	return repr(self.msg)
    def __str__(self):
	return str(self.msg)


class ConversionError(Error):
    pass



class Packer:
    """Pack various data representations into a buffer."""

    def __init__(self):
	self.reset()

    def reset(self):
	self.__buf = ''

    def get_buffer(self):
	return self.__buf
    # backwards compatibility
    get_buf = get_buffer

    def pack_uint(self, x):
	self.__buf = self.__buf + \
		     (chr(int(x>>24 & 0xff)) + chr(int(x>>16 & 0xff)) + \
		      chr(int(x>>8 & 0xff)) + chr(int(x & 0xff)))
    if _USE_MACHINE_REP:
	def pack_uint(self, x):
	    if type(x) == LongType:
		x = int((x + 0x80000000L) % 0x100000000L - 0x80000000L)
	    self.__buf = self.__buf + struct.pack('l', x)

    pack_int = pack_uint
    pack_enum = pack_int

    def pack_bool(self, x):
	if x: self.__buf = self.__buf + '\0\0\0\1'
	else: self.__buf = self.__buf + '\0\0\0\0'

    def pack_uhyper(self, x):
	self.pack_uint(int(x>>32 & 0xffffffff))
	self.pack_uint(int(x & 0xffffffff))

    pack_hyper = pack_uhyper

    def pack_float(self, x):
	raise ConversionError('Not supported')
    def pack_double(self, x):
	raise ConversionError('Not supported')
    # get these from the C layer if available
    if _xdr:
	def pack_float(self, x):
	    try: self.__buf = self.__buf + _xdr.pack_float(x)
	    except _xdr.error, msg:
		raise ConversionError(msg)
	def pack_double(self, x):
	    try: self.__buf = self.__buf + _xdr.pack_double(x)
	    except _xdr.error, msg:
		raise ConversionError(msg)

    def pack_fstring(self, n, s):
	if n < 0:
	    raise ValueError, 'fstring size must be nonnegative'
	n = ((n+3)/4)*4
	data = s[:n]
	data = data + (n - len(data)) * '\0'
	self.__buf = self.__buf + data

    pack_fopaque = pack_fstring

    def pack_string(self, s):
	n = len(s)
	self.pack_uint(n)
	self.pack_fstring(n, s)

    pack_opaque = pack_string
    pack_bytes = pack_string

    def pack_list(self, list, pack_item):
	for item in list:
	    self.pack_uint(1)
	    pack_item(item)
	self.pack_uint(0)

    def pack_farray(self, n, list, pack_item):
	if len(list) <> n:
	    raise ValueError, 'wrong array size'
	for item in list:
	    pack_item(item)

    def pack_array(self, list, pack_item):
	n = len(list)
	self.pack_uint(n)
	self.pack_farray(n, list, pack_item)



class Unpacker:
    """Unpacks various data representations from the given buffer."""

    def __init__(self, data):
	self.reset(data)

    def reset(self, data):
	self.__buf = data
	self.__pos = 0

    def get_position(self):
	return self.__pos

    def set_position(self, position):
	self.__pos = position

    def done(self):
	if self.__pos < len(self.__buf):
	    raise Error('unextracted data remains')

    def unpack_uint(self):
	i = self.__pos
	self.__pos = j = i+4
	data = self.__buf[i:j]
	if len(data) < 4:
	    raise EOFError
	x = long(ord(data[0]))<<24 | ord(data[1])<<16 | \
	    ord(data[2])<<8 | ord(data[3])
	# Return a Python long only if the value is not representable
	# as a nonnegative Python int
	if x < 0x80000000L:
	    x = int(x)
	return x
    if _USE_MACHINE_REP:
	def unpack_uint(self):
	    i = self.__pos
	    self.__pos = j = i+4
	    data = self.__buf[i:j]
	    if len(data) < 4:
		raise EOFError
	    return struct.unpack('l', data)[0]

    def unpack_int(self):
	x = self.unpack_uint()
	if x >= 0x80000000L:
	    x = x - 0x100000000L
	return int(x)

    unpack_enum = unpack_int
    unpack_bool = unpack_int

    def unpack_uhyper(self):
	hi = self.unpack_uint()
	lo = self.unpack_uint()
	return long(hi)<<32 | lo

    def unpack_hyper(self):
	x = self.unpack_uhyper()
	if x >= 0x8000000000000000L:
	    x = x - 0x10000000000000000L
	return x

    def unpack_float(self):
	raise ConversionError('Not supported')
    def unpack_double(self):
	raise ConversionError('Not supported')
    # get these from the C layer if available
    if _xdr:
	def unpack_float(self):
	    i = self.__pos
	    self.__pos = j = i+4
	    data = self.__buf[i:j]
	    if len(data) < 4:
		raise EOFError
	    try: return _xdr.unpack_float(data)
	    except _xdr.error, msg:
		raise ConversionError(msg)

	def unpack_double(self):
	    i = self.__pos
	    self.__pos = j = i+8
	    data = self.__buf[i:j]
	    if len(data) < 8:
		raise EOFError
	    try: return _xdr.unpack_double(data)
	    except _xdr.error, msg:
		raise ConversionError(msg)

    def unpack_fstring(self, n):
	if n < 0:
	    raise ValueError, 'fstring size must be nonnegative'
	i = self.__pos
	j = i + (n+3)/4*4
	if j > len(self.__buf):
	    raise EOFError
	self.__pos = j
	return self.__buf[i:i+n]

    unpack_fopaque = unpack_fstring

    def unpack_string(self):
	n = self.unpack_uint()
	return self.unpack_fstring(n)

    unpack_opaque = unpack_string
    unpack_bytes = unpack_string

    def unpack_list(self, unpack_item):
	list = []
	while 1:
	    x = self.unpack_uint()
	    if x == 0: break
	    if x <> 1:
		raise ConversionError('0 or 1 expected, got ' + `x`)
	    item = unpack_item()
	    list.append(item)
	return list

    def unpack_farray(self, n, unpack_item):
	list = []
	for i in range(n):
	    list.append(unpack_item())
	return list

    def unpack_array(self, unpack_item):
	n = self.unpack_uint()
	return self.unpack_farray(n, unpack_item)


# test suite
def _test():
    p = Packer()
    packtest = [
	(p.pack_uint,    (9,)),
	(p.pack_bool,    (None,)),
	(p.pack_bool,    ('hello',)),
	(p.pack_uhyper,  (45L,)),
	(p.pack_float,   (1.9,)),
	(p.pack_double,  (1.9,)),
	(p.pack_string,  ('hello world',)),
	(p.pack_list,    (range(5), p.pack_uint)),
	(p.pack_array,   (['what', 'is', 'hapnin', 'doctor'], p.pack_string)),
	]
    succeedlist = [1] * len(packtest)
    count = 0
    for method, args in packtest:
	print 'pack test', count,
	try:
	    apply(method, args)
	    print 'succeeded'
	except ConversionError, var:
	    print 'ConversionError:', var.msg
	    succeedlist[count] = 0
	count = count + 1
    data = p.get_buffer()
    # now verify
    up = Unpacker(data)
    unpacktest = [
	(up.unpack_uint,   (), lambda x: x == 9),
	(up.unpack_bool,   (), lambda x: not x),
	(up.unpack_bool,   (), lambda x: x),
	(up.unpack_uhyper, (), lambda x: x == 45L),
	(up.unpack_float,  (), lambda x: 1.89 < x < 1.91),
	(up.unpack_double, (), lambda x: 1.89 < x < 1.91),
	(up.unpack_string, (), lambda x: x == 'hello world'),
	(up.unpack_list,   (up.unpack_uint,), lambda x: x == range(5)),
	(up.unpack_array,  (up.unpack_string,),
	 lambda x: x == ['what', 'is', 'hapnin', 'doctor']),
	]
    count = 0
    for method, args, pred in unpacktest:
	print 'unpack test', count,
	try:
	    if succeedlist[count]:
		x = apply(method, args)
		print pred(x) and 'succeeded' or 'failed', ':', x
	    else:
		print 'skipping'
	except ConversionError, var:
	    print 'ConversionError:', var.msg
	count = count + 1

if __name__ == '__main__':
    _test()
back to top