"""Functions that read and write gzipped files. The user of the file doesn't have to worry about the compression, but random access is not allowed.""" # based on Andrew Kuchling's minigzip.py distributed with the zlib module import struct, sys, time, os import zlib import builtins import io __all__ = ["GzipFile", "open", "compress", "decompress"] FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT = 1, 2, 4, 8, 16 READ, WRITE = 1, 2 def U32(i): """Return i as an unsigned integer, assuming it fits in 32 bits. If it's >= 2GB when viewed as a 32-bit unsigned int, return a long. """ if i < 0: i += 1 << 32 return i def LOWU32(i): """Return the low-order 32 bits, as a non-negative int""" return i & 0xFFFFFFFF def write32u(output, value): # The L format writes the bit pattern correctly whether signed # or unsigned. output.write(struct.pack("' def _check_closed(self): """Raises a ValueError if the underlying file object has been closed. """ if self.closed: raise ValueError('I/O operation on closed file.') def _init_write(self, filename): self.name = filename self.crc = zlib.crc32(b"") & 0xffffffff self.size = 0 self.writebuf = [] self.bufsize = 0 def _write_gzip_header(self): self.fileobj.write(b'\037\213') # magic header self.fileobj.write(b'\010') # compression method try: # RFC 1952 requires the FNAME field to be Latin-1. Do not # include filenames that cannot be represented that way. fname = os.path.basename(self.name) if not isinstance(fname, bytes): fname = fname.encode('latin-1') if fname.endswith(b'.gz'): fname = fname[:-3] except UnicodeEncodeError: fname = b'' flags = 0 if fname: flags = FNAME self.fileobj.write(chr(flags).encode('latin-1')) mtime = self.mtime if mtime is None: mtime = time.time() write32u(self.fileobj, int(mtime)) self.fileobj.write(b'\002') self.fileobj.write(b'\377') if fname: self.fileobj.write(fname + b'\000') def _init_read(self): self.crc = zlib.crc32(b"") & 0xffffffff self.size = 0 def _read_gzip_header(self): magic = self.fileobj.read(2) if magic == b'': raise EOFError("Reached EOF") if magic != b'\037\213': raise IOError('Not a gzipped file') method = ord( self.fileobj.read(1) ) if method != 8: raise IOError('Unknown compression method') flag = ord( self.fileobj.read(1) ) self.mtime = read32(self.fileobj) # extraflag = self.fileobj.read(1) # os = self.fileobj.read(1) self.fileobj.read(2) if flag & FEXTRA: # Read & discard the extra field, if present xlen = ord(self.fileobj.read(1)) xlen = xlen + 256*ord(self.fileobj.read(1)) self.fileobj.read(xlen) if flag & FNAME: # Read and discard a null-terminated string containing the filename while True: s = self.fileobj.read(1) if not s or s==b'\000': break if flag & FCOMMENT: # Read and discard a null-terminated string containing a comment while True: s = self.fileobj.read(1) if not s or s==b'\000': break if flag & FHCRC: self.fileobj.read(2) # Read & discard the 16-bit header CRC unused = self.fileobj.unused() if unused: uncompress = self.decompress.decompress(unused) self._add_read_data(uncompress) def write(self,data): self._check_closed() if self.mode != WRITE: import errno raise IOError(errno.EBADF, "write() on read-only GzipFile object") if self.fileobj is None: raise ValueError("write() on closed GzipFile object") # Convert data type if called by io.BufferedWriter. if isinstance(data, memoryview): data = data.tobytes() if len(data) > 0: self.size = self.size + len(data) self.crc = zlib.crc32(data, self.crc) & 0xffffffff self.fileobj.write( self.compress.compress(data) ) self.offset += len(data) return len(data) def read(self, size=-1): self._check_closed() if self.mode != READ: import errno raise IOError(errno.EBADF, "read() on write-only GzipFile object") if self.extrasize <= 0 and self.fileobj is None: return b'' readsize = 1024 if size < 0: # get the whole thing try: while True: self._read(readsize) readsize = min(self.max_read_chunk, readsize * 2) except EOFError: size = self.extrasize else: # just get some more of it try: while size > self.extrasize: self._read(readsize) readsize = min(self.max_read_chunk, readsize * 2) except EOFError: if size > self.extrasize: size = self.extrasize offset = self.offset - self.extrastart chunk = self.extrabuf[offset: offset + size] self.extrasize = self.extrasize - size self.offset += size return chunk def peek(self, n): if self.mode != READ: import errno raise IOError(errno.EBADF, "peek() on write-only GzipFile object") # Do not return ridiculously small buffers, for one common idiom # is to call peek(1) and expect more bytes in return. if n < 100: n = 100 if self.extrasize == 0: if self.fileobj is None: return b'' try: # Ensure that we don't return b"" if we haven't reached EOF. while self.extrasize == 0: # 1024 is the same buffering heuristic used in read() self._read(max(n, 1024)) except EOFError: pass offset = self.offset - self.extrastart remaining = self.extrasize assert remaining == len(self.extrabuf) - offset return self.extrabuf[offset:offset + n] def _unread(self, buf): self.extrasize = len(buf) + self.extrasize self.offset -= len(buf) def _read(self, size=1024): if self.fileobj is None: raise EOFError("Reached EOF") if self._new_member: # If the _new_member flag is set, we have to # jump to the next member, if there is one. self._init_read() self._read_gzip_header() self.decompress = zlib.decompressobj(-zlib.MAX_WBITS) self._new_member = False # Read a chunk of data from the file buf = self.fileobj.read(size) # If the EOF has been reached, flush the decompression object # and mark this object as finished. if buf == b"": uncompress = self.decompress.flush() # Prepend the already read bytes to the fileobj to they can be # seen by _read_eof() self.fileobj.prepend(self.decompress.unused_data, True) self._read_eof() self._add_read_data( uncompress ) raise EOFError('Reached EOF') uncompress = self.decompress.decompress(buf) self._add_read_data( uncompress ) if self.decompress.unused_data != b"": # Ending case: we've come to the end of a member in the file, # so seek back to the start of the unused data, finish up # this member, and read a new gzip header. # Prepend the already read bytes to the fileobj to they can be # seen by _read_eof() and _read_gzip_header() self.fileobj.prepend(self.decompress.unused_data, True) # Check the CRC and file size, and set the flag so we read # a new member on the next call self._read_eof() self._new_member = True def _add_read_data(self, data): self.crc = zlib.crc32(data, self.crc) & 0xffffffff offset = self.offset - self.extrastart self.extrabuf = self.extrabuf[offset:] + data self.extrasize = self.extrasize + len(data) self.extrastart = self.offset self.size = self.size + len(data) def _read_eof(self): # We've read to the end of the file # We check the that the computed CRC and size of the # uncompressed data matches the stored values. Note that the size # stored is the true file size mod 2**32. crc32 = read32(self.fileobj) isize = read32(self.fileobj) # may exceed 2GB if crc32 != self.crc: raise IOError("CRC check failed %s != %s" % (hex(crc32), hex(self.crc))) elif isize != (self.size & 0xffffffff): raise IOError("Incorrect length of data produced") # Gzip files can be padded with zeroes and still have archives. # Consume all zero bytes and set the file position to the first # non-zero byte. See http://www.gzip.org/#faq8 c = b"\x00" while c == b"\x00": c = self.fileobj.read(1) if c: self.fileobj.prepend(c, True) @property def closed(self): return self.fileobj is None def close(self): if self.fileobj is None: return if self.mode == WRITE: self.fileobj.write(self.compress.flush()) write32u(self.fileobj, self.crc) # self.size may exceed 2GB, or even 4GB write32u(self.fileobj, self.size & 0xffffffff) self.fileobj = None elif self.mode == READ: self.fileobj = None if self.myfileobj: self.myfileobj.close() self.myfileobj = None def flush(self,zlib_mode=zlib.Z_SYNC_FLUSH): self._check_closed() if self.mode == WRITE: # Ensure the compressor's buffer is flushed self.fileobj.write(self.compress.flush(zlib_mode)) self.fileobj.flush() def fileno(self): """Invoke the underlying file object's fileno() method. This will raise AttributeError if the underlying file object doesn't support fileno(). """ return self.fileobj.fileno() def rewind(self): '''Return the uncompressed stream file position indicator to the beginning of the file''' if self.mode != READ: raise IOError("Can't rewind in write mode") self.fileobj.seek(0) self._new_member = True self.extrabuf = b"" self.extrasize = 0 self.extrastart = 0 self.offset = 0 def readable(self): return self.mode == READ def writable(self): return self.mode == WRITE def seekable(self): return True def seek(self, offset, whence=0): if whence: if whence == 1: offset = self.offset + offset else: raise ValueError('Seek from end not supported') if self.mode == WRITE: if offset < self.offset: raise IOError('Negative seek in write mode') count = offset - self.offset chunk = bytes(1024) for i in range(count // 1024): self.write(chunk) self.write(bytes(count % 1024)) elif self.mode == READ: if offset < self.offset: # for negative seek, rewind and do positive seek self.rewind() count = offset - self.offset for i in range(count // 1024): self.read(1024) self.read(count % 1024) return self.offset def readline(self, size=-1): if size < 0: # Shortcut common case - newline found in buffer. offset = self.offset - self.extrastart i = self.extrabuf.find(b'\n', offset) + 1 if i > 0: self.extrasize -= i - offset self.offset += i - offset return self.extrabuf[offset: i] size = sys.maxsize readsize = self.min_readsize else: readsize = size bufs = [] while size != 0: c = self.read(readsize) i = c.find(b'\n') # We set i=size to break out of the loop under two # conditions: 1) there's no newline, and the chunk is # larger than size, or 2) there is a newline, but the # resulting line would be longer than 'size'. if (size <= i) or (i == -1 and len(c) > size): i = size - 1 if i >= 0 or c == b'': bufs.append(c[:i + 1]) # Add portion of last chunk self._unread(c[i + 1:]) # Push back rest of chunk break # Append chunk to list, decrease 'size', bufs.append(c) size = size - len(c) readsize = min(size, readsize * 2) if readsize > self.min_readsize: self.min_readsize = min(readsize, self.min_readsize * 2, 512) return b''.join(bufs) # Return resulting line def compress(data, compresslevel=9): """Compress data in one shot and return the compressed string. Optional argument is the compression level, in range of 0-9. """ buf = io.BytesIO() with GzipFile(fileobj=buf, mode='wb', compresslevel=compresslevel) as f: f.write(data) return buf.getvalue() def decompress(data): """Decompress a gzip compressed string in one shot. Return the decompressed string. """ with GzipFile(fileobj=io.BytesIO(data)) as f: return f.read() def _test(): # Act like gzip; with -d, act like gunzip. # The input file is not deleted, however, nor are any other gzip # options or features supported. args = sys.argv[1:] decompress = args and args[0] == "-d" if decompress: args = args[1:] if not args: args = ["-"] for arg in args: if decompress: if arg == "-": f = GzipFile(filename="", mode="rb", fileobj=sys.stdin.buffer) g = sys.stdout.buffer else: if arg[-3:] != ".gz": print("filename doesn't end in .gz:", repr(arg)) continue f = open(arg, "rb") g = builtins.open(arg[:-3], "wb") else: if arg == "-": f = sys.stdin.buffer g = GzipFile(filename="", mode="wb", fileobj=sys.stdout.buffer) else: f = builtins.open(arg, "rb") g = open(arg + ".gz", "wb") while True: chunk = f.read(1024) if not chunk: break g.write(chunk) if g is not sys.stdout.buffer: g.close() if f is not sys.stdin.buffer: f.close() if __name__ == '__main__': _test()