Source code for maggit.db.io.pack

# This file is part of maggit.
#
# Copyright 2015 Matthieu Gautier <dev@mgautier.fr>
#
# Pit is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# Additional permission under the GNU Affero GPL version 3 section 7:
#
# If you modify this Program, or any covered work, by linking or
# combining it with other code, such other code is not for that reason
# alone subject to any of the requirements of the GNU Affero GPL
# version 3.
#
# You should have received a copy of the GNU Affero General Public License
# along with maggit.  If not, see http://www.gnu.org/licenses
#
# In summary:
# - You can use this program for no cost.
# - You can use this program for both personal and commercial reasons.
# - You do not have to share your own program's code which uses this program.
# - You have to share modifications (e.g bug-fixes, improvements) you've made to this program.

from zlib import decompress
from binascii import unhexlify, hexlify, Error as binasciiError
from struct import unpack_from
import ctypes
from mmap import mmap, ACCESS_READ

__all__ = ('GitPack', 'GitPackIndex')

_wrapping_struct_cache = {}
def read_ctypes(buff, ctypes_):
    try:
        wrapping_struct, size = _wrapping_struct_cache[ctypes_]
    except KeyError:
        wrapping_struct = type('Wrap_%s'%ctypes_, (ctypes.BigEndianStructure, ), {'_fields_':(('v', ctypes_),)})
        size = ctypes.sizeof(wrapping_struct)
        _wrapping_struct_cache[ctypes_] = wrapping_struct, size

    ret =  wrapping_struct.from_buffer_copy(buff[:size]).v
    return ret, buff[size:]

class SizeChar(ctypes.BigEndianStructure):
    _fields_ = [ ('cont', ctypes.c_uint8, 1),
                 ('size'    , ctypes.c_uint8, 7)
               ]

def read_delta_size(data):
    sizeChar, data = read_ctypes(data, SizeChar)
    size = sizeChar.size
    shift = 7
    while sizeChar.cont:
        sizeChar, data = read_ctypes(data, SizeChar)
        size =  size | (sizeChar.size << shift)
        shift += 7
    return size, data

def delta_chunk(base, data):
    while len(data):
        cmd, data = unpack_from("!B", data)[0], data[1:]
        if cmd & 0x80:
            cp_offset = 0
            if cmd & 0x01:
                cp_offset, data = unpack_from("!B", data)[0], data[1:]
            if cmd & 0x02:
                off, data = unpack_from("!B", data)[0], data[1:]
                cp_offset |= (off << 8)
            if cmd & 0x04:
                off, data = unpack_from("!B", data)[0], data[1:]
                cp_offset |= (off << 16)
            if cmd & 0x08:
                off, data = unpack_from("!B", data)[0], data[1:]
                cp_offset |= ((off&127) << 24)
            cp_size = 0
            if cmd & 0x10:
                cp_size, data = unpack_from("!B", data)[0], data[1:]
            if cmd & 0x20:
                size, data = unpack_from("!B", data)[0], data[1:]
                cp_size |= (size << 8)
            if cmd & 0x40:
                size, data = unpack_from("!B", data)[0], data[1:]
                cp_size |= (size << 16)
            if not cp_size:
                cp_size = 0x10000
            #if cp_offset+cp_size<cp_size or cp_offset+cp_size>len(base):
            #    break
            yield base[cp_offset:cp_offset+cp_size]
        elif cmd:
            out, data = data[:cmd], data[cmd:]
            yield out
        else:
            raise Exception


def apply_patch(base, delta_data):
    data = memoryview(delta_data)
    src_size, data = read_delta_size(data)
    dst_size, data = read_delta_size(data)
    content = b''.join(delta_chunk(base, data))
    assert dst_size == len(content)
    return content

class pack_first_obj_header(ctypes.BigEndianStructure):
    _fields_ = [ ('cont', ctypes.c_uint8, 1),
                 ('type'    , ctypes.c_uint8, 3),
                 ('size'    , ctypes.c_uint8, 4)
               ]

class pack_other_obj_header(ctypes.BigEndianStructure):
    _fields_ = [ ('cont', ctypes.c_uint8, 1),
                 ('size'    , ctypes.c_uint8, 7)
               ]

typeName = {1: b'commit',
            2: b'tree',
            3: b'blob',
            4: b'tag'
           }

[docs]class GitPack: """A git pack file. Arguments: packfile(path): The path of the packfile. idxfile(:class:`~maggit.io.pack.GitPackIndex`): The index associated to the pack. """ def __init__(self, packfile, idxfile): self.packfile = packfile.open("rb") self.mmap = mmap(self.packfile.fileno(), 0, access=ACCESS_READ) self.mview = memoryview(self.mmap) self.idxfile = idxfile self.read_header() self.__cache = {} def close(self): self.mview.release() self.mmap.close() self.packfile.close() def __del__(self): self.close() self.packfile = None def read_header(self): self.type_, self.version_number, self.nb_objects = unpack_from("!4s i i", self.mview) assert self.type_ == b'PACK'
[docs] def read_object(self, offset): """Return the content of a object at a offset. Arguments: offset: The offset to read from. Returns: bytes, bytes: A tuple (type, content) where: - type is the type of the object. - content is the content of the object (without header). """ try: return self.__cache[offset] except KeyError: pass data = self.mview[offset:] header, data = read_ctypes(data, pack_first_obj_header) type_ = header.type size = header.size if header.cont: delta = 4 header, data = read_ctypes(data, pack_other_obj_header) size += (header.size << delta) while header.cont: delta += 7 header, data = read_ctypes(data, pack_other_obj_header) size += (header.size << delta) if type_ in (1, 2, 3, 4): out = decompress(data) assert size == len(out) self.__cache[offset] = ret = typeName[type_], out return ret if type_ in (6, 7): if type_ == 6: # OBJ_OFS_DELTA header, data = read_ctypes(data, pack_other_obj_header) base_offset = header.size while header.cont: base_offset += 1 header, data = read_ctypes(data, pack_other_obj_header) base_offset = (base_offset << 7) + header.size base_offset = offset - base_offset else: # OBJ_REF_DELTA base_offset, data = self.idxfile.get_offset(bytes(data[:20])), data[20:] delta_data = decompress(data) assert size == len(delta_data) typename, base = self.read_object(base_offset) data = apply_patch(base, delta_data) self.__cache[offset] = typename, data return typename, data
class index_header(ctypes.BigEndianStructure): _fields_ = [ ('magic' , ctypes.c_uint8*4), ('version' , ctypes.c_int32) ] Fanouts = ctypes.c_int32*256 Sha = ctypes.c_uint8*20 Offset = ctypes.c_uint32 LongOffset = ctypes.c_uint64 class OffsetSha(ctypes.BigEndianStructure): _fields_ = [ ('offset', ctypes.c_uint32), ('sha', ctypes.c_uint8*20) ]
[docs]class GitPackIndex: """A pack index Arguments: indexfile(path): The path of the index file. Methods: get_offset(sha): Return the offset in the pack associated to the sha. """ def __init__(self, indexfile): self.indexfile = indexfile.open(mode='rb') self.mmap = mmap(self.indexfile.fileno(), 0, access=ACCESS_READ) self.mview = memoryview(self.mmap) header, _ = read_ctypes(self.mview, index_header) if bytes(header.magic) != b'\xfftOc': self.version = 1 else: self.version = header.version self.readFanout() if self.version == 1: self.get_offset = self.get_offset1 self.offsetshas = self.mview[256*4:256*4+24*self.nb_objects] else: self.get_offset = self.get_offset2 self.shas = self.mview[258*4:258*4+20*self.nb_objects] self.offsets = self.mview[258*4+24*self.nb_objects:258*4+28*self.nb_objects] self.longOffsets = self.mview[258*4+28*self.nb_objects:] def close(self): if self.version == 1: self.offsetshas.release() else: self.shas.release() self.offsets.release() self.longOffsets.release() self.mview.release() self.mmap.close() self.indexfile.close() def __del__(self): self.close() self.indexfile = None def readFanout(self): fanouts, _ = read_ctypes(self.mview[0 if self.version==1 else 8:], Fanouts) self.fanouts = list(fanouts) @property def nb_objects(self): return self.fanouts[255] def get_offset1(self, sha): assert self.version == 1 if sha[0]: startIndex = self.fanouts[sha[0]-1] else: startIndex = 0 endIndex = self.fanouts[sha[0]] entries = self.offsetshas[24*startIndex:24*endIndex] _sha = None while len(entries): offset_sha, _ = read_ctypes(entries, OffsetSha) _sha = bytes(offset_sha.sha) assert sha[0] == _sha[0] if sha == _sha: return offset_sha.offset entries = entries[24:] raise KeyError def get_offset2(self, sha): assert self.version == 2 if sha[0]: startIndex = self.fanouts[sha[0]-1] else: startIndex = 0 endIndex = self.fanouts[sha[0]] _min, _max = startIndex, endIndex _sha = None _middle = None while _min != _max: _middle = (_min+_max)//2 needStop = _min == _middle for a, b in zip(self.shas[20*_middle:], sha): if a<b: #_sha is before what we search _min = _middle break if a>b: #_sha is after what we search _max = _middle break else: # a==base startIndex = _middle break if needStop: raise KeyError else: raise KeyError offset, _ = read_ctypes(self.offsets[4*startIndex:], Offset) if offset & (1 << 31): offset -= (1 << 31) offset, _ = read_ctypes(self.longOffsets[8*offset:], LongOffset) return offset def get_full_sha(self, value): if self.version == 1: entry_size = 24 sha_read = lambda entries: bytes(read_ctypes(entries, OffsetSha)[0].sha) entries = self.offsetshas else: entry_size = 20 sha_read = lambda entries: bytes(read_ctypes(entries, Sha)[0]) entries = self.shas try: value = unhexlify(value.encode()) except binasciiError: raise KeyError if value[0]: startIndex = self.fanouts[value[0]-1] else: startIndex = 0 endIndex = self.fanouts[value[0]] entries = entries[entry_size*startIndex:entry_size*endIndex] found_start = found_end = None found_sha = _sha = None while len(entries): _sha = sha_read(entries) assert value[0] == _sha[0] if _sha.startswith(value): if found_sha: # We've found two sha starting with value raise KeyError found_sha = _sha elif found_sha: # the current sha do not start with value # but previous one does, return it return found_sha entries = entries[entry_size:] #end of the loop if found_sha: return found_sha raise KeyError