# This file is part of maggit.
#
# Copyright 2015 Matthieu Gautier <dev@mgautier.fr>
#
# Pit is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# Additional permission under the GNU Affero GPL version 3 section 7:
#
# If you modify this Program, or any covered work, by linking or
# combining it with other code, such other code is not for that reason
# alone subject to any of the requirements of the GNU Affero GPL
# version 3.
#
# You should have received a copy of the GNU Affero General Public License
# along with maggit. If not, see http://www.gnu.org/licenses
#
# In summary:
# - You can use this program for no cost.
# - You can use this program for both personal and commercial reasons.
# - You do not have to share your own program's code which uses this program.
# - You have to share modifications (e.g bug-fixes, improvements) you've made to this program.
from pathlib import Path
import hashlib
from .io import loose, pack
from binascii import unhexlify, hexlify
[docs]class Gitdb:
"""The Gitdb, unify all loose/pack io function to provide a coherent access
to content of a git repository.
A Gitdb handle only the reading of git objects. Not the references, remotes, ...
Arguments:
rootdir(path): The path of the objects directory (.git/objects)
"""
def __init__(self, rootdir):
self.rootdir = Path(rootdir)
self.gen_pack_list()
def _get_pathfile(self, sha):
sha = hexlify(sha).decode()
return self.rootdir.joinpath(sha[:2], sha[2:])
[docs] def gen_pack_list(self):
self.packs = {}
self.pack_dir = self.rootdir.joinpath("pack")
packs = self.pack_dir.glob('*.idx')
for p in packs:
p_pack = p.with_suffix(".pack")
if not p_pack.exists():
print(p_pack, "not exists")
continue
packIndex = pack.GitPackIndex(p)
self.packs[p.stem] = (packIndex, None)
[docs] def get_pack(self, sha):
"""Get a pack containing the sha
Arguments:
sha : The sha of the object
Returns:
The :class:`~maggit.io.pack.GitPack` containing the sha
"""
for packName, index_data in self.packs.items():
packIndex, packData = index_data
try:
offset = packIndex.get_offset(sha)
except KeyError:
# Not in this pack
continue
else:
if packData is None:
packData = pack.GitPack(self.pack_dir/(packName+'.pack'), packIndex)
self.packs[packName] = packIndex, packData
return packIndex, packData, offset
[docs] def object_exists(self, sha):
"""Return True if the sha exists in the db"""
pathfile = self._get_pathfile(sha)
if pathfile.exists():
return True
index_pack = self.get_pack(sha)
if index_pack:
return True
return False
[docs] def get_full_sha(self, prefix):
"""Return the full Sha of the prefix
Arguments:
prefix(bytes): The beginning of a sha.
Returns:
The corresponding (bytes).
Exemples:
>>> repo.get_full_sha(b'bf09f0a9')
<Sha b'bf09f0a9...'>
Raises:
:Exception: If number of object corresponding to prefix is not equal to one.
"""
for packName, index_data in self.packs.items():
# be sure that a .pack exits
packIndex, _ = index_data
try:
return packIndex.get_full_sha(prefix)
except KeyError:
# Not in this pack
continue
# Look in loose objects
directory = self.rootdir.joinpath(prefix[:2])
if not directory.exists():
raise KeyError
entries = directory.glob(prefix[2:]+'*')
try:
entry = next(entries)
except StopIteration:
# len(entries) < 1
raise KeyError
try:
_ = next(entries)
# len(entries) > 1
raise KeyError
except StopIteration:
pass
return unhexlify((prefix[:2] + str(entry.name)).encode('utf8'))
[docs] def object_content(self, sha):
index_pack = self.get_pack(sha)
try:
index, pack, offset = index_pack
return pack.read_object(offset)
except TypeError:
pathfile = self._get_pathfile(sha)
if pathfile.exists():
return loose.object_content(pathfile)
raise KeyError
[docs] def object_type(self, sha):
"""Return the type of the object associated to sha.
Arguments:
sha: The sha of the object.
Returns:
The type of the object.
"""
return self.object_content(sha)[0]
def _object_write(self, sha, content):
if self.object_exists(sha):
return
pathfile = self._get_pathfile(sha)
loose.object_write(pathfile, content)
[docs] def blob_content(self, sha):
"""Read and parse a object assuming it is a blob.
Arguments:
sha : The sha of the object.
Returns:
The content of the blob (bytes).
Raises:
ValueError: If the object is not a blob.
"""
type_, content = self.object_content(sha)
if type_ != b'blob':
raise ValueError("Object %s is of type %s (not blob)"%(sha, type_))
return content
[docs] def blob_write(self, content):
raw, sha = loose.object_rawsha(b'blob', content)
self._object_write(sha, raw)
return sha
[docs] def tree_content(self, sha):
"""Read and parse a object assuming it is a tree.
Arguments:
sha : The sha of the object.
Returns:
List[Tuple[bytes, Tuple[bytes, bytes]]]:
A list of (path, (mode, sha)) where :
- path is the name of the entry.
- mode is the git mode.
- sha is the sha of the blob/tree object.
Raises:
ValueError: If the object is not a tree.
"""
type_, content = self.object_content(sha)
if type_ != b'tree':
raise ValueError("Object %s is of type %s (not tree)"%(sha, type_))
return loose.tree_parse(content)
[docs] def tree_write(self, entries):
raw, sha = loose.object_rawsha(b'tree', loose.tree_gen(entries))
self._object_write(sha, raw)
return sha
[docs] def commit_content(self, sha):
"""Read and parse a object assuming it is a commit.
Arguments:
sha : The sha of the object.
Returns:
bytes, list[bytes], bytes, bytes, bytes:
A tuple (tree, parents, message, author, committer) where:
- tree is the sha of the tree object of the commit(unhexlified bytes).
- parents is a list of sha of the parents commit.
- message is the message of the commit.
- author is the name (b'name <email> timestamp') of the author.
- author is the name (b'name <email> timestamp') of the committer.
Raises:
ValueError: If the object is not a commit.
"""
type_, content = self.object_content(sha)
if type_ != b'commit':
raise ValueError("Object %s is of type %s (not commit)"%(sha, type_))
return loose.commit_parse(content)
[docs] def commit_write(self, treesha, message, author, authorDate, committer, committerDate, parents=[]):
raw, sha = loose.object_rawsha(b'commit', loose.commit_gen(treesha, message, author, authorDate, committer, committerDate, parents))
self._object_write(sha, raw)
return sha
[docs] def tag_content(self, sha):
"""Read and parse a object assuming it is a tag.
Arguments:
sha : The sha of the object.
Returns:
bytes, bytes, bytes, bytes, bytes:
A tuple (object, objecttype, tag, tagger, message) where:
- object is the sha of the tagged object (unhexlified bytes).
- objecttype is the type of the tagged object.
- tag is the name of the tag.
- tagger is the name (b'name <email> timestamp') of the tagger.
- message is the message of the tag.
Raises:
ValueError: If the object is not a tag.
"""
type_, content = self.object_content(sha)
if type_ != b'tag':
raise ValueError("Object %s is of type %s (not, tag)"%(sha, type_))
return loose.tag_parse(content)
[docs] def tag_write(self, objectsha, tag, tagger, tagDate, message):
objecttype = self.object_type(objectsha)
raw, sha = loose.object_rawsha(b'tag', loose.tag_gen(objectsha, objecttype, tag, tagger, tagDate, message))
self._object_write(sha, raw)
return sha