"""Temporary files and filenames.""" # XXX This tries to be not UNIX specific, but I don't know beans about # how to choose a temp directory or filename on MS-DOS or other # systems so it may have to be changed... import os __all__ = ["mktemp", "TemporaryFile", "tempdir", "gettempprefix"] # Parameters that the caller may set to override the defaults tempdir = None template = None def gettempdir(): """Function to calculate the directory to use.""" global tempdir if tempdir is not None: return tempdir # _gettempdir_inner deduces whether a candidate temp dir is usable by # trying to create a file in it, and write to it. If that succeeds, # great, it closes the file and unlinks it. There's a race, though: # the *name* of the test file it tries is the same across all threads # under most OSes (Linux is an exception), and letting multiple threads # all try to open, write to, close, and unlink a single file can cause # a variety of bogus errors (e.g., you cannot unlink a file under # Windows if anyone has it open, and two threads cannot create the # same file in O_EXCL mode under Unix). The simplest cure is to serialize # calls to _gettempdir_inner. This isn't a real expense, because the # first thread to succeed sets the global tempdir, and all subsequent # calls to gettempdir() reuse that without trying _gettempdir_inner. _tempdir_lock.acquire() try: return _gettempdir_inner() finally: _tempdir_lock.release() def _gettempdir_inner(): """Function to calculate the directory to use.""" global tempdir if tempdir is not None: return tempdir try: pwd = os.getcwd() except (AttributeError, os.error): pwd = os.curdir attempdirs = ['/tmp', '/var/tmp', '/usr/tmp', pwd] if os.name == 'nt': attempdirs.insert(0, 'C:\\TEMP') attempdirs.insert(0, '\\TEMP') elif os.name == 'mac': import macfs, MACFS try: refnum, dirid = macfs.FindFolder(MACFS.kOnSystemDisk, MACFS.kTemporaryFolderType, 1) dirname = macfs.FSSpec((refnum, dirid, '')).as_pathname() attempdirs.insert(0, dirname) except macfs.error: pass elif os.name == 'riscos': scrapdir = os.getenv('Wimp$ScrapDir') if scrapdir: attempdirs.insert(0, scrapdir) for envname in 'TMPDIR', 'TEMP', 'TMP': if os.environ.has_key(envname): attempdirs.insert(0, os.environ[envname]) testfile = gettempprefix() + 'test' for dir in attempdirs: try: filename = os.path.join(dir, testfile) if os.name == 'posix': try: fd = os.open(filename, os.O_RDWR | os.O_CREAT | os.O_EXCL, 0700) except OSError: pass else: fp = os.fdopen(fd, 'w') fp.write('blat') fp.close() os.unlink(filename) del fp, fd tempdir = dir break else: fp = open(filename, 'w') fp.write('blat') fp.close() os.unlink(filename) tempdir = dir break except IOError: pass if tempdir is None: msg = "Can't find a usable temporary directory amongst " + `attempdirs` raise IOError, msg return tempdir # template caches the result of gettempprefix, for speed, when possible. # XXX unclear why this isn't "_template"; left it "template" for backward # compatibility. if os.name == "posix": # We don't try to cache the template on posix: the pid may change on us # between calls due to a fork, and on Linux the pid changes even for # another thread in the same process. Since any attempt to keep the # cache in synch would have to call os.getpid() anyway in order to make # sure the pid hasn't changed between calls, a cache wouldn't save any # time. In addition, a cache is difficult to keep correct with the pid # changing willy-nilly, and earlier attempts proved buggy (races). template = None # Else the pid never changes, so gettempprefix always returns the same # string. elif os.name == "nt": template = '~' + `os.getpid()` + '-' elif os.name in ('mac', 'riscos'): template = 'Python-Tmp-' else: template = 'tmp' # XXX might choose a better one def gettempprefix(): """Function to calculate a prefix of the filename to use. This incorporates the current process id on systems that support such a notion, so that concurrent processes don't generate the same prefix. """ global template if template is None: return '@' + `os.getpid()` + '.' else: return template def mktemp(suffix=""): """User-callable function to return a unique temporary file name.""" dir = gettempdir() pre = gettempprefix() while 1: i = _counter.get_next() file = os.path.join(dir, pre + str(i) + suffix) if not os.path.exists(file): return file class TemporaryFileWrapper: """Temporary file wrapper This class provides a wrapper around files opened for temporary use. In particular, it seeks to automatically remove the file when it is no longer needed. """ # Cache the unlinker so we don't get spurious errors at shutdown # when the module-level "os" is None'd out. Note that this must # be referenced as self.unlink, because the name TemporaryFileWrapper # may also get None'd out before __del__ is called. unlink = os.unlink def __init__(self, file, path): self.file = file self.path = path self.close_called = 0 def close(self): if not self.close_called: self.close_called = 1 self.file.close() self.unlink(self.path) def __del__(self): self.close() def __getattr__(self, name): file = self.__dict__['file'] a = getattr(file, name) if type(a) != type(0): setattr(self, name, a) return a def TemporaryFile(mode='w+b', bufsize=-1, suffix=""): """Create and return a temporary file (opened read-write by default).""" name = mktemp(suffix) if os.name == 'posix': # Unix -- be very careful fd = os.open(name, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0700) try: os.unlink(name) return os.fdopen(fd, mode, bufsize) except: os.close(fd) raise else: # Non-unix -- can't unlink file that's still open, use wrapper file = open(name, mode, bufsize) return TemporaryFileWrapper(file, name) # In order to generate unique names, mktemp() uses _counter.get_next(). # This returns a unique integer on each call, in a threadsafe way (i.e., # multiple threads will never see the same integer). The integer will # usually be a Python int, but if _counter.get_next() is called often # enough, it will become a Python long. # Note that the only names that survive this next block of code # are "_counter" and "_tempdir_lock". class _ThreadSafeCounter: def __init__(self, mutex, initialvalue=0): self.mutex = mutex self.i = initialvalue def get_next(self): self.mutex.acquire() result = self.i try: newi = result + 1 except OverflowError: newi = long(result) + 1 self.i = newi self.mutex.release() return result try: import thread except ImportError: class _DummyMutex: def acquire(self): pass release = acquire _counter = _ThreadSafeCounter(_DummyMutex()) _tempdir_lock = _DummyMutex() del _DummyMutex else: _counter = _ThreadSafeCounter(thread.allocate_lock()) _tempdir_lock = thread.allocate_lock() del thread del _ThreadSafeCounter