Source code for edzip

from abc import ABC, abstractmethod
from io import IOBase
from zipfile import ZipFile, ZipInfo, ZipExtFile, ZIP_STORED
from zipfile import _SharedFile, structFileHeader, sizeFileHeader, BadZipFile, _FH_SIGNATURE, stringFileHeader, _FH_FILENAME_LENGTH, _FH_EXTRA_FIELD_LENGTH # type: ignore
from stream_unzip import stream_unzip
import struct
import os
from typing import Generator, Optional, Sequence, Union

[docs] class ExternalDirectory(ABC): @property @abstractmethod def len(self) -> int: pass
[docs] @abstractmethod def namelist(self) -> Sequence[str]: pass
[docs] @abstractmethod def infolist(self) -> Sequence[ZipInfo]: pass
[docs] @abstractmethod def getinfo(self, name: str) -> ZipInfo: pass
[docs] class EDZipFile(ZipFile): """A subclass of ZipFile that reads the directory information from an external SQLite database. """ def __init__(self, file: Union[str, os.PathLike, IOBase], ed: ExternalDirectory): """Initializes a new instance of the class. Args: file (str or os.PathLike or BinaryIO): The ZIP file to read from. con (sqlite3.Connection): The SQLite3 database connection to the external directory. """ super().__init__(file, 'r', ZIP_STORED, True, None) # type: ignore self.ed = ed def __len__(self) -> int: """Return the number of items in the EDZip object. Returns: int: The number of items in the EDZip object. """ return self.ed.len def _RealGetContents(self): pass
[docs] def namelist(self) -> Sequence[str]: """Returns a sequence of filenames stored in the ZIP archive. Returns: Sequence[str]: A sequence of filenames. """ return self.ed.namelist()
[docs] def infolist(self) -> Sequence[ZipInfo]: """Return a sequence of ZipInfo objects for all files in the archive. Returns: Sequence[ZipInfo]: sequence of ZipInfo objects. Note that the ZipInfo objects returned have only offset info filled in. To get all info, call fillinfo() with each object. """ return self.ed.infolist()
[docs] def getinfo(self, name) -> ZipInfo: """Retrieves information about a file in the archive. Args: name (str): The name of the file to retrieve information for. Returns: ZipInfo: An object containing offset information for the specified file. Note that the object returned has only offset info filled in. To get all info, call fillinfo() with it. """ return self.ed.getinfo(name)
[docs] def fillinfo(self, zinfo: ZipInfo) -> ZipInfo: """Fill the given ZipInfo object with further information about the file in the archive. Args: zinfo (ZipInfo): The ZipInfo object to fill in with information. Returns: ZipInfo: The filled-in ZipInfo object. """ self.fp.seek(zinfo.header_offset) fheader = self.fp.read(sizeFileHeader) if len(fheader) != sizeFileHeader: raise BadZipFile("Truncated file header") fheader = struct.unpack(structFileHeader, fheader) if fheader[_FH_SIGNATURE] != stringFileHeader: raise BadZipFile("Bad magic number for file header") (zinfo.extract_version, zinfo.reserved, zinfo.flag_bits, zinfo.compress_type, t, d, zinfo.CRC, zinfo.compress_size, zinfo.file_size) = fheader[1:10] zinfo._raw_time = t zinfo.date_time = ((d >> 9) + 1980, (d >> 5) & 0xF, d & 0x1F, t >> 11, (t >> 5) & 0x3F, (t & 0x1F) * 2) zinfo.extra = self.fp.read(fheader[_FH_EXTRA_FIELD_LENGTH]) zinfo.orig_filename = self.fp.read(fheader[_FH_FILENAME_LENGTH]) if fheader[_FH_EXTRA_FIELD_LENGTH]: zinfo.extra = self.fp.read(fheader[_FH_EXTRA_FIELD_LENGTH]) zinfo._decodeExtra() return zinfo
[docs] def open(self, name: Union[str, ZipInfo], mode: str = "r", pwd: Optional[bytes] = None, *, force_zip6: bool = False) -> ZipExtFile: """Open the file specified by 'name' inside the ZIP archive for reading. Args: name (str or ZipInfo): The name of the file to open, or a ZipInfo object. mode (str): The mode to open the file in. Only 'r' is supported. pwd (bytes): The password to use for decrypting the file, if it is encrypted. force_zip64 (bool): Ignored, as this subclass can only be used for reading. Returns: ZipExtFile: A file-like object for reading the contents of the ZIP archive. """ if mode != "r": raise ValueError('This class does not support writing zip files') if not self.fp: raise ValueError( "Attempt to use ZIP archive that was already closed") if isinstance(name, str): name = self.getinfo(name) self.fillinfo(name) self._fileRefCnt += 1 zef_file = _SharedFile(self.fp, self.fp.tell(), self._fpclose, self._lock, lambda: self._writing) return ZipExtFile(zef_file, mode, name, pwd, True)
[docs] def stream_from(self, name: Optional[Union[str, ZipInfo]] = None) -> Generator[ tuple[str, int, Generator[bytes, None, None]], None, None]: """Returns a generator that yields a tuple of (filename, file size, file data) for each file in the archive, optionally starting with the specified file. Args: name (str or ZipInfo): Optional. The name of the file to start streaming from, or a ZipInfo object representing the file. Yields: (str,int,Generator[bytes,None,None]): tuple of (filename, file size, file data) for each file in the archive. """ if isinstance(name, str): # Get info object for name name = self.getinfo(name) if name is not None: self.fp.seek(name.header_offset) else: self.fp.seek(0) return stream_unzip(self.fp)