Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: 7a325c385bb547b481abfee8301e9d9f2e58990a authored by cvs2svn on 06 May 1994, 14:16:55 UTC
This commit was manufactured by cvs2svn to create tag 'release102'.
Tip revision: 7a325c3
VCR.py
import fcntl
import IOCTL
from IOCTL import *
import sys
import struct
import select
import posix
import time

DEVICE='/dev/ttyd2'

class UnixFile:
	def open(self, name, mode):
		self.fd = posix.open(name, mode)
		return self

	def read(self, len):
		return posix.read(self.fd, len)

	def write(self, data):
		dummy = posix.write(self.fd, data)

	def flush(self):
		pass

	def fileno(self):
		return self.fd

	def close(self):
		dummy = posix.close(self.fd)

def packttyargs(*args):
	if type(args) <> type(()):
		raise 'Incorrect argtype for packttyargs'
	if type(args[0]) == type(1):
		iflag, oflag, cflag, lflag, line, chars = args
	elif type(args[0]) == type(()):
		if len(args) <> 1:
			raise 'Only 1 argument expected'
		iflag, oflag, cflag, lflag, line, chars = args[0]
	elif type(args[0]) == type([]):
		if len(args) <> 1:
			raise 'Only 1 argument expected'
		[iflag, oflag, cflag, lflag, line, chars] = args[0]
	str = struct.pack('hhhhb', iflag, oflag, cflag, lflag, line)
	for c in chars:
		str = str + c
	return str

def nullttyargs():
	chars = ['\0']*IOCTL.NCCS
	return packttyargs(0, 0, 0, 0, 0, chars)

def unpackttyargs(str):
	args = str[:-IOCTL.NCCS]
	rawchars = str[-IOCTL.NCCS:]
	chars = []
	for c in rawchars:
		chars.append(c)
	iflag, oflag, cflag, lflag, line = struct.unpack('hhhhb', args)
	return (iflag, oflag, cflag, lflag, line, chars)

def initline(name):
	fp = UnixFile().open(name, 2)
	ofp = fp
	fd = fp.fileno()
	rv = fcntl.ioctl(fd, IOCTL.TCGETA, nullttyargs())
	iflag, oflag, cflag, lflag, line, chars = unpackttyargs(rv)
	iflag = iflag & ~(INPCK|ISTRIP|INLCR|IUCLC|IXON|IXOFF)
	oflag = oflag & ~OPOST
	cflag = B9600|CS8|CREAD|CLOCAL
	lflag = lflag & ~(ISIG|ICANON|ECHO|TOSTOP)
	chars[VMIN] = chr(1)
	chars[VTIME] = chr(0)
	arg = packttyargs(iflag, oflag, cflag, lflag, line, chars)
	dummy = fcntl.ioctl(fd, IOCTL.TCSETA, arg)
	return fp, ofp

#
#
error = 'VCR.error'

# Commands/replies:
COMPLETION = '\x01'
ACK  ='\x0a'
NAK  ='\x0b'

NUMBER_N = 0x30
ENTER  = '\x40'

EXP_7= '\xde'
EXP_8= '\xdf'

CL   ='\x56'
CTRL_ENABLE = EXP_8 + '\xc6'
SEARCH_DATA = EXP_8 + '\x93'
ADDR_SENSE = '\x60'

PLAY ='\x3a'
STOP ='\x3f'
EJECT='\x2a'
FF   ='\xab'
REW  ='\xac'
STILL='\x4f'
STEP_FWD ='\x2b'    # Was: '\xad'
FM_SELECT=EXP_8 + '\xc8'
FM_STILL=EXP_8 + '\xcd'
PREVIEW=EXP_7 + '\x9d'
REVIEW=EXP_7 + '\x9e'
DM_OFF=EXP_8 + '\xc9'
DM_SET=EXP_8 + '\xc4'
FWD_SHUTTLE='\xb5'
REV_SHUTTLE='\xb6'
EM_SELECT=EXP_8 + '\xc0'
N_FRAME_REC=EXP_8 + '\x92'
SEARCH_PREROLL=EXP_8 + '\x90'
EDIT_PB_STANDBY=EXP_8 + '\x96'
EDIT_PLAY=EXP_8 + '\x98'
AUTO_EDIT=EXP_7 + '\x9c'

IN_ENTRY=EXP_7 + '\x90'
IN_ENTRY_RESET=EXP_7 + '\x91'
IN_ENTRY_SET=EXP_7 + '\x98'
IN_ENTRY_INC=EXP_7 + '\x94'
IN_ENTRY_DEC=EXP_7 + '\x95'
IN_ENTRY_SENSE=EXP_7 + '\x9a'

OUT_ENTRY=EXP_7 + '\x92'
OUT_ENTRY_RESET=EXP_7 + '\x93'
OUT_ENTRY_SET=EXP_7 + '\x99'
OUT_ENTRY_INC=EXP_7 + '\x96'
OUT_ENTRY_DEC=EXP_7 + '\x98'
OUT_ENTRY_SENSE=EXP_7 + '\x9b'

MUTE_AUDIO = '\x24'
MUTE_AUDIO_OFF = '\x25'
MUTE_VIDEO = '\x26'
MUTE_VIDEO_OFF = '\x27'
MUTE_AV = EXP_7 + '\xc6'
MUTE_AV_OFF = EXP_7 + '\xc7'

DEBUG=0

class VCR:
	def __init__(self):
		self.ifp, self.ofp = initline(DEVICE)
		self.busy_cmd = None
		self.async = 0
		self.cb = None
		self.cb_arg = None

	def _check(self):
		if self.busy_cmd:
			raise error, 'Another command active: '+self.busy_cmd

	def _endlongcmd(self, name):
		if not self.async:
			self.waitready()
			return 1
		self.busy_cmd = name
		return 'VCR BUSY'

	def fileno(self):
		return self.ifp.fileno()

	def setasync(self, async):
		self.async = async

	def setcallback(self, cb, arg):
		self.setasync(1)
		self.cb = cb
		self.cb_arg = arg

	def poll(self):
		if not self.async:
			raise error, 'Can only call poll() in async mode'
		if not self.busy_cmd:
			return
		if self.testready():
			if self.cb:
				apply(self.cb, self.cb_arg)

	def _cmd(self, cmd):
		if DEBUG:
			print '>>>',`cmd`
		self.ofp.write(cmd)
		self.ofp.flush()

	def _waitdata(self, len, tout):
		rep = ''
		while len > 0:
			if tout == None:
				ready, d1, d2 = select.select( \
					  [self.ifp], [], [])
			else:
				ready, d1, d2 = select.select( \
					  [self.ifp], [], [], tout)
			if ready == []:
##				if rep:
##					print 'FLUSHED:', `rep`
				return None
			data = self.ifp.read(1)
			if DEBUG:
				print '<<<',`data`
			if data == NAK:
				return NAK
			rep = rep + data
			len = len - 1
		return rep

	def _reply(self, len):
		data = self._waitdata(len, 10)
		if data == None:
			raise error, 'Lost contact with VCR'
		return data

	def _getnumber(self, len):
		data = self._reply(len)
		number = 0
		for c in data:
			digit = ord(c) - NUMBER_N
			if digit < 0 or digit > 9:
				raise error, 'Non-digit in number'+`c`
			number = number*10 + digit
		return number

	def _iflush(self):
		dummy = self._waitdata(10000, 0)
##		if dummy:
##			print 'IFLUSH:', dummy

	def simplecmd(self,cmd):
		self._iflush()
		for ch in cmd:
			self._cmd(ch)
			rep = self._reply(1)
			if rep == NAK:
				return 0
			elif rep <> ACK:
				raise error, 'Unexpected reply:' + `rep`
		return 1

	def replycmd(self, cmd):
		if not self.simplecmd(cmd[:-1]):
			return 0
		self._cmd(cmd[-1])

	def _number(self, number, digits):
		if number < 0:
			raise error, 'Unexpected negative number:'+ `number`
		maxnum = pow(10, digits)
		if number > maxnum:
			raise error, 'Number too big'
		while maxnum > 1:
			number = number % maxnum
			maxnum = maxnum / 10
			digit = number / maxnum
			ok = self.simplecmd(chr(NUMBER_N + digit))
			if not ok:
				raise error, 'Error while transmitting number'

	def initvcr(self, *optarg):
		timeout = None
		if optarg <> ():
			timeout = optarg[0]
			starttime = time.time()
		self.busy_cmd = None
		self._iflush()
		while 1:
##			print 'SENDCL'
			self._cmd(CL)
			rep = self._waitdata(1, 2)
##			print `rep`
			if rep in ( None, CL, NAK ):
				if timeout:
					if time.time() - starttime > timeout:
						raise error, \
							  'No reply from VCR'
				continue
			break
		if rep <> ACK:
			raise error, 'Unexpected reply:' + `rep`
		dummy = self.simplecmd(CTRL_ENABLE)

	def waitready(self):
		rep = self._waitdata(1, None)
		if rep == None:
			raise error, 'Unexpected None reply from waitdata'
		if rep not in  (COMPLETION, ACK):
			self._iflush()
			raise error, 'Unexpected waitready reply:' + `rep`
		self.busy_cmd = None

	def testready(self):
		rep = self._waitdata(1, 0)
		if rep == None:
			return 0
		if rep not in  (COMPLETION, ACK):
			self._iflush()
			raise error, 'Unexpected waitready reply:' + `rep`
		self.busy_cmd = None
		return 1

	def play(self): return self.simplecmd(PLAY)
	def stop(self): return self.simplecmd(STOP)
	def ff(self):   return self.simplecmd(FF)
	def rew(self):  return self.simplecmd(REW)
	def eject(self):return self.simplecmd(EJECT)
	def still(self):return self.simplecmd(STILL)
	def step(self): return self.simplecmd(STEP_FWD)

	def goto(self, (h, m, s, f)):
		if not self.simplecmd(SEARCH_DATA):
			return 0
		self._number(h, 2)
		self._number(m, 2)
		self._number(s, 2)
		self._number(f, 2)
		if not self.simplecmd(ENTER):
			return 0
		return self._endlongcmd('goto')

	# XXXX TC_SENSE doesn't seem to work
	def faulty_where(self):
		self._check()
		self._cmd(TC_SENSE)
		h = self._getnumber(2)
		m = self._getnumber(2)
		s = self._getnumber(2)
		f = self._getnumber(2)
		return (h, m, s, f)

	def where(self):
		return self.addr2tc(self.sense())

	def sense(self):
		self._check()
		self._cmd(ADDR_SENSE)
		num = self._getnumber(5)
		return num

	def addr2tc(self, num):
		f = num % 25
		num = num / 25
		s = num % 60
		num = num / 60
		m = num % 60
		h = num / 60
		return (h, m, s, f)

	def tc2addr(self, (h, m, s, f)):
		return ((h*60 + m)*60 + s)*25 + f

	def fmmode(self, mode):
		self._check()
		if mode == 'off':
			arg = 0
		elif mode == 'buffer':
			arg = 1
		elif mode == 'dnr':
			arg = 2
		else:
			raise error, 'fmmode arg should be off, buffer or dnr'
		if not self.simplecmd(FM_SELECT):
			return 0
		self._number(arg, 1)
		if not self.simplecmd(ENTER):
			return 0
		return 1

	def mute(self, mode, value):
		self._check()
		if mode == 'audio':
			cmds = (MUTE_AUDIO_OFF, MUTE_AUDIO)
		elif mode == 'video':
			cmds = (MUTE_VIDEO_OFF, MUTE_VIDEO)
		elif mode == 'av':
			cmds = (MUTE_AV_OFF, MUTE_AV)
		else:
			raise error, 'mute type should be audio, video or av'
		cmd = cmds[value]
		return self.simplecmd(cmd)

	def editmode(self, mode):
		self._check()
		if mode == 'off':
			a0 = a1 = a2 = 0
		elif mode == 'format':
			a0 = 4
			a1 = 7
			a2 = 4
		elif mode == 'asmbl':
			a0 = 1
			a1 = 7
			a2 = 4
		elif mode == 'insert-video':
			a0 = 2
			a1 = 4
			a2 = 0
		else:
			raise 'editmode should be off,format,asmbl or insert-video'
		if not self.simplecmd(EM_SELECT):
			return 0
		self._number(a0, 1)
		self._number(a1, 1)
		self._number(a2, 1)
		if not self.simplecmd(ENTER):
			return 0
		return 1

	def autoedit(self):
		self._check()
		return self._endlongcmd(AUTO_EDIT)

	def nframerec(self, num):
		if not self.simplecmd(N_FRAME_REC):
			return 0
		self._number(num, 4)
		if not self.simplecmd(ENTER):
			return 0
		return self._endlongcmd('nframerec')

	def fmstill(self):
		if not self.simplecmd(FM_STILL):
			return 0
		return self._endlongcmd('fmstill')

	def preview(self):
		if not self.simplecmd(PREVIEW):
			return 0
		return self._endlongcmd('preview')

	def review(self):
		if not self.simplecmd(REVIEW):
			return 0
		return self._endlongcmd('review')

	def search_preroll(self):
		if not self.simplecmd(SEARCH_PREROLL):
			return 0
		return self._endlongcmd('search_preroll')

	def edit_pb_standby(self):
		if not self.simplecmd(EDIT_PB_STANDBY):
			return 0
		return self._endlongcmd('edit_pb_standby')

	def edit_play(self):
		if not self.simplecmd(EDIT_PLAY):
			return 0
		return self._endlongcmd('edit_play')

	def dmcontrol(self, mode):
		self._check()
		if mode == 'off':
			return self.simplecmd(DM_OFF)
		if mode == 'multi freeze':
			num = 1000
		elif mode == 'zoom freeze':
			num = 2000
		elif mode == 'digital slow':
			num = 3000
		elif mode == 'freeze':
			num = 4011
		else:
			raise error, 'unknown dmcontrol argument: ' + `mode`
		if not self.simplecmd(DM_SET):
			return 0
		self._number(num, 4)
		if not self.simplecmd(ENTER):
			return 0
		return 1

	def fwdshuttle(self, num):
		if not self.simplecmd(FWD_SHUTTLE):
			return 0
		self._number(num, 1)
		return 1

	def revshuttle(self, num):
		if not self.simplecmd(REV_SHUTTLE):
			return 0
		self._number(num, 1)
		return 1

	def getentry(self, which):
		self._check()
		if which == 'in':
			cmd = IN_ENTRY_SENSE
		elif which == 'out':
			cmd = OUT_ENTRY_SENSE
		self.replycmd(cmd)
		h = self._getnumber(2)
		m = self._getnumber(2)
		s = self._getnumber(2)
		f = self._getnumber(2)
		return (h, m, s, f)

	def inentry(self, arg):
		return self.ioentry(arg, (IN_ENTRY, IN_ENTRY_RESET, \
			  IN_ENTRY_SET, IN_ENTRY_INC, IN_ENTRY_DEC))

	def outentry(self, arg):
		return self.ioentry(arg, (OUT_ENTRY, OUT_ENTRY_RESET, \
			  OUT_ENTRY_SET, OUT_ENTRY_INC, OUT_ENTRY_DEC))

	def ioentry(self, arg, (Load, Clear, Set, Inc, Dec)):
		self._check()
		if type(arg) == type(()):
			h, m, s, f = arg
			if not self.simplecmd(Set):
				return 0
			self._number(h,2)
			self._number(m,2)
			self._number(s,2)
			self._number(f,2)
			if not self.simplecmd(ENTER):
				return 0
			return 1
		elif arg == 'reset':
			cmd = Clear
		elif arg == 'load':
			cmd = Load
		elif arg == '+':
			cmd = Inc
		elif arg == '-':
			cmd = Dec
		else:
			raise error, 'Arg should be +,-,reset,load or (h,m,s,f)'
		return self.simplecmd(cmd)

	def cancel(self):
		d = self.simplecmd(CL)
		self.busy_cmd = None
back to top