subcommit-git/libwyag.py
Nathan McRae 08771733b1 Allow first commit in repo
Before you had to do it through vanilla git
2024-07-20 09:29:52 -07:00

1646 lines
51 KiB
Python

import argparse
import collections
import configparser
from datetime import datetime
from fnmatch import fnmatch
import hashlib
from math import ceil
import os
import re
import sys
import zlib
def join_path(first, *rest):
if (first == ""):
return "/".join(list(rest))
return "/".join([first] + list(rest))
argparser = argparse.ArgumentParser(description="The stupidest content tracker")
argsubparsers = argparser.add_subparsers(title="Commands", dest="command")
argsubparsers.required = True
def main(argv=sys.argv[1:]):
args = argparser.parse_args(argv)
match args.command:
case "add" : cmd_add(args)
case "cat-file" : cmd_cat_file(args)
case "check-ignore" : cmd_check_ignore(args)
case "checkout" : cmd_checkout(args)
case "commit" : cmd_commit(args)
case "hash-object" : cmd_hash_object(args)
case "init" : cmd_init(args)
case "log" : cmd_log(args)
case "ls-files" : cmd_ls_files(args)
case "ls-tree" : cmd_ls_tree(args)
case "rev-parse" : cmd_rev_parse(args)
case "rm" : cmd_rm(args)
case "show-ref" : cmd_show_ref(args)
case "status" : cmd_status(args)
case "tag" : cmd_tag(args)
case "graph-objects" : cmd_graph_objects(args)
case _ : print("Bad command.")
class GitRepository (object):
"""A git repo"""
worktree = None
gitdir = None
conf = None
def __init__(self, path, force=False):
self.worktree = path.replace("\\", "/")
self.gitdir = join_path(path, ".subcommit-git")
if not (force or os.path.isdir(self.gitdir)):
raise Exception(f"Not a git repository {path}")
# Read configuration file in .git/config
self.conf = configparser.ConfigParser()
cf = GitRepository.repo_file(self, "config")
if cf and os.path.exists(cf):
self.conf.read([cf])
elif not force:
raise Exception("Configuration file missing")
if not force:
vers = int(self.conf.get("core", "repositoryformatversion"))
if vers != 0:
raise Exception(f"Unsupported repositoryformatversion {vers}")
def repo_path(repo, *path):
"""Compute path under repo's gitdir."""
return join_path(repo.gitdir, *path)
def repo_file(repo, *path, mkdir=False):
"""Same as repo_path, but create dirname(*path) if absent. For
example, repo_file(r, \"refs\", \"remotes\", \"origin\", \"HEAD\") will create
.git/refs/remotes/origin."""
if GitRepository.repo_dir(repo, *path[:-1], mkdir=mkdir):
return GitRepository.repo_path(repo, *path)
def repo_dir(repo, *path, mkdir=False):
"""Same as repo_path, bt mkdir *path if absent if mkdir"""
path = GitRepository.repo_path(repo, *path)
if (os.path.exists(path)):
if (os.path.isdir(path)):
return path
else:
raise Exception(f"Not a directory {path}")
if mkdir:
os.makedirs(path)
return path
else:
return None
def repo_create(path):
"""Create a new repository at path."""
repo = GitRepository(path, True)
# First, we make sure the path either doesn't exist or is an
# empty dir.
if os.path.exists(repo.worktree):
if not os.path.isdir(repo.worktree):
raise Exception(f"{path} is not a directory!")
if os.path.exists(repo.gitdir) and os.listdir(repo.gitdir):
raise Exception(f"{path} is not empty!")
else:
os.makedirs(repo.worktree)
assert GitRepository.repo_dir(repo, "branches", mkdir=True)
assert GitRepository.repo_dir(repo, "objects", mkdir=True)
assert GitRepository.repo_dir(repo, "refs", "tags", mkdir=True)
assert GitRepository.repo_dir(repo, "refs", "heads", mkdir=True)
# .git/description
with open(GitRepository.repo_file(repo, "description"), "w") as f:
f.write("Unnamed repository; edit this file 'description' to name the repository")
with open(GitRepository.repo_file(repo, "HEAD"), "w") as f:
f.write("ref: refs/heads/master\n")
with open(GitRepository.repo_file(repo, "config"), "w") as f:
config = repo_default_config()
config.write(f)
return repo
def repo_default_config():
ret = configparser.ConfigParser()
ret.add_section("core")
ret.set("core", "repositoryformatVersion", "0")
ret.set("core", "filemode", "false")
ret.set("core", "bare", "false")
return ret
argsp = argsubparsers.add_parser("init", help="Initialize a new, empty repository.")
argsp.add_argument("path",
metavar="directory",
nargs="?",
default=".",
help="Where to create the repository.")
def cmd_init(args):
repo_create(args.path)
def repo_find(path=".", required=True):
path = os.path.realpath(path)
if os.path.isdir(join_path(path, ".subcommit-git")):
return GitRepository(path)
# If we haven't returned, recurse in parent
parent = os.path.realpath(join_path(path, ".."))
match os.name:
case "nt":
is_root = os.path.splitdrive(path)[1] == "\\"
case "posix":
# If parent==path, then path is root.
is_root = parent == path
case _:
raise Exception(f"Unsupported os {os.name}")
if is_root:
if required:
raise Exception("No git directory.")
else:
return None
# Recursive case
return repo_find(parent, required)
argsp = argsubparsers.add_parser("status", help="Get the status of the repo")
argsp.add_argument("path",
metavar="directory",
nargs="?",
default=".",
help="Which directory")
def cmd_status(args):
# TODO: actually get status
print(repo_find(args.path).worktree)
class GitObject (object):
def __init__(self, data=None):
if data != None:
self.deserialize(data)
else:
self.init()
def serialize(self, repo):
"""This function MUST be implemented by subclasses.
It must read the object's contents from self.data, a byte string, and
do whatever it takes to convert it into a meaningful representation. What exactly that means depends on each subclass."""
raise Exception("Unimplemented!")
def deserialize(self, repo):
raise Exception("Unimplemented!")
def init(self):
pass # other implementations might do something here
def object_read(repo, sha):
"""Read object sha from Git repository repo. Return a
GitObject whose exact type depends on the object."""
path = GitRepository.repo_file(repo, "objects", sha[0:2], sha[2:])
if not os.path.isfile(path):
return None
with open(path, "rb") as f:
raw = zlib.decompress(f.read())
# Read object type
x = raw.find(b' ')
fmt = raw[0:x]
# Read and validate object size
y = raw.find(b'\x00', x)
size = int(raw[x:y].decode("ascii"))
if size != len(raw)-y-1:
raise Exception(f"Malformed object {sha}: bad length")
match fmt:
case b'commit' : c=GitCommit
case b'tree' : c=GitTree
case b'tag' : c=GitTag
case b'blob' : c=GitBlob
case _:
raise Exception(f"Unknown type {fmt.decode('ascii')} for object {sha}")
return c(raw[y+1:])
def object_write(obj, repo=None):
data = obj.serialize()
# Add header
result = obj.fmt + b' ' + str(len(data)).encode() + b'\x00' + data
# Compute hash
sha = hashlib.sha1(result).hexdigest()
if repo:
# Compute path
path=GitRepository.repo_file(repo, "objects", sha[0:2], sha[2:], mkdir=True)
if not os.path.exists(path):
with open(path, 'wb') as f:
f.write(zlib.compress(result))
return sha
class GitBlob(GitObject):
fmt=b'blob'
def serialize(self):
return self.blobdata
def deserialize(self, data):
self.blobdata = data
argsp = argsubparsers.add_parser("cat-file",
help="Provide content of repository objects")
argsp.add_argument("type",
metavar="type",
choices=["blob", "commit", "tag", "tree"],
help="Specify the type")
argsp.add_argument("object",
metavar="object",
help="The object to display")
def cmd_cat_file(args):
repo = repo_find()
cat_file(repo, args.object, fmt=args.type.encode())
def cat_file(repo, obj, fmt=None):
obj = object_read(repo, object_find(repo, obj, fmt=fmt))
if fmt == b'tree':
for item in obj.items:
print(f"{item.mode.decode('ascii')} {item.path} {item.sha}")
else:
sys.stdout.buffer.write(obj.serialize())
def object_find(repo, name, fmt=None, follow=True):
sha = object_resolve(repo, name)
if not sha:
raise Exception(f"No such reference {name}")
if len(sha) > 1:
raise Exception(f"Ambiguous reference {0}: Candidates are:\n - {1}.".format(name, "\n - ".join(sha)))
sha = sha[0]
if not fmt:
return sha
while True:
obj = object_read(repo, sha)
if obj.fmt == fmt:
return sha
if not follow:
return None
if obj.fmt == b'tag':
sha = obj.kvlm[b'object'].decode("ascii")
elif obj.fmt == b'commit' and fmt == b'tree':
sha = obj.kvlm[b'tree'].decode("ascii")
else:
return None
argsp = argsubparsers.add_parser(
"hash-object",
help="Compute object ID an optionally creates a blob from a file")
argsp.add_argument("-t",
metavar="type",
dest="type",
choices=["blob", "commit", "tag", "tree"],
default="blob",
help="Specify the type")
argsp.add_argument("-w",
dest="write",
action="store_true",
help="Actually write the object into the database")
argsp.add_argument("path",
help="Read object from <file>")
def cmd_hash_object(args):
if args.write:
repo = repo_find()
else:
repo = None
with open(args.path, "rb") as fd:
sha = object_hash(fd, args.type.encode(), repo)
print(sha)
def object_hash(fd, fmt, repo=None):
"""Hash object, writing it to repo if provided"""
data = fd.read()
match fmt:
case b'commit' : obj=GitCommit(data)
case b'tree' : obj=GitTree(data)
case b'tag' : obj=GitTag(data)
case b'blob' : obj=GitBlob(data)
case _ : raise Exception(f"Unknown type {fmt}")
return object_write(obj, repo)
def kvlm_parse(raw, start=0, dct=None):
if not dct:
dct = collections.OrderedDict()
# You CANNOT declare the argument as dct=OrderedDict() or all
# call to the functions will endlessly grow the same dict.
# This function is recursive: it reads a key/value pair, then call
# itself back with the new position. So we first need to know
# where we are: at a keyword, or already in the messageQ
# We search for the next space and the next newline.
spc = raw.find(b' ', start)
nl = raw.find(b'\n', start)
# If space appears before newline, we have a keyword. Otherwise,
# it's the final message, which we just read to the end of the file.
# Base case
# =========
#
# If newline appears first (or there's no space at all), we asume
# a blank line. A blank line means the remainder of the data is the
# message. We store it in the dictionary, with None as the key, and
# return.
if (spc < 0) or (nl < spc):
assert nl == start
dct[None] = raw[start+1:]
return dct
# Recursive case
# ==============
#
# We read a key-value pair and recurse for the next.
key = raw[start:spc]
# Find the end of the value. Continuation lines begin with a
# space, so we loop until we find a "\n" not followed by a space.
end = start
while True:
end = raw.find(b'\n', end+1)
if raw[end+1] != ord(' '): break
value = raw[spc+1:end].replace(b'\n ', b'\n')
# Don't overwrite existing data contents
if key in dct:
if type(dct[key]) == list:
dct[key].append(value)
else:
dct[key] = [ dct[key], value ]
else:
dct[key] = value
return kvlm_parse(raw, start=end+1, dct=dct)
def kvlm_serialize(kvlm):
ret = b''
# Output fiels
for k in kvlm.keys():
# Skip the message itself
if k == None: continue
val = kvlm[k]
# Normalize to a list
if type(val) != list:
val = [ val ]
for v in val:
ret += k + b' ' + (v.replace(b'\n', b'\n ')) + b'\n'
ret += b'\n' + kvlm[None] + b'\n'
return ret
class GitCommit(GitObject):
fmt = b'commit'
def deserialize(self, data):
self.kvlm = kvlm_parse(data)
def serialize(self):
return kvlm_serialize(self.kvlm)
def init(self):
self.kvlm = dict()
argsp = argsubparsers.add_parser("log", help="Display history of a given commit.")
argsp.add_argument("commit",
default="HEAD",
nargs="?",
help="Commit to start at.")
argsp.add_argument("--show-objects", action="store_true", help="Show objects along with commit DAG")
def cmd_log(args):
repo = repo_find()
print(log_graphviz(repo, args.commit, args.show_objects))
def log_graphviz_recurse(repo, sha, seen, show_objects):
if sha in seen:
return
commit = object_read(repo, sha)
short_hash = sha[0:8]
message = commit.kvlm[None].decode("utf8").strip()
message = message.replace("\\", "\\\\")
message = message.replace("\"", "\\\"")
if "\n" in message: # keep only the first line
message = message[:message.index("\n")]
if show_objects:
for line in graph_objects(repo, sha, seen, True):
yield f" {line}\n"
else:
yield f" c_{sha} [label=\"{sha[0:7]}: {message}\"]"
assert commit.fmt==b'commit'
if not b'parent' in commit.kvlm.keys():
# Base case: the initial commit.
return
parents = commit.kvlm[b'parent']
if type(parents) != list:
parents = [ parents ]
for p in parents:
p = p.decode("ascii")
# Commits are also linked to parents in graph_objects
if not show_objects:
yield f" c_{sha} -> c_{p}"
yield from log_graphviz_recurse(repo, p, seen, show_objects)
def log_graphviz(repo, sha, show_objects):
seen = set()
graph = "digraph wyaglog{\n"
graph += " node[shape=rect]\n"
for line in log_graphviz_recurse(repo, object_find(repo, sha), seen, show_objects):
graph += " " + line + "\n"
graph += "}"
return graph
class GitTreeLeaf (object):
def __init__(self, mode, path, sha):
self.mode = mode
self.path = path
self.sha = sha
def tree_parse_one(raw, start=0):
# Find the space terminator of the mode
x = raw.find(b' ', start)
assert x-start == 5 or x-start == 6
mode = raw[start:x]
if len(mode) == 5:
# Normalize to six bytes.
mode = b"0" + mode
# Find the NULL terminator of the path
y = raw.find(b'\x00', x)
path = raw[x+1:y]
sha = format(int.from_bytes(raw[y+1:y+21], "big"), "040x")
return y+21, GitTreeLeaf(mode, path.decode('utf8'), sha)
def tree_parse(raw):
pos = 0
max = len(raw)
ret = list()
while pos < max:
pos, data = tree_parse_one(raw, pos)
ret.append(data)
return ret
# Notice: this isn't a comparison function, but a conversion function.
# Python's default sort doesn't accept a custom comparison function,
# like in most languages, but a 'key' argument that returns a new
# value, which is compared using the default rules. So we just return
# the leaf name, with an extra '/' if it's a directory.
def tree_leaf_sort_key(leaf):
if leaf.mode.startswith(b"10"):
return leaf.path
else:
return leaf.path + "/"
def tree_serialize(obj):
obj.items.sort(key=tree_leaf_sort_key)
ret = b''
for i in obj.items:
ret += i.mode
ret += b' '
ret += i.path.encode("utf8")
ret += b'\x00'
sha = int(i.sha, 16)
ret += sha.to_bytes(20, byteorder="big")
return ret
class GitTree(GitObject):
fmt=b'tree'
def deserialize(self, data):
self.items = tree_parse(data)
def serialize(self):
return tree_serialize(self)
def init(self):
self.items = list()
argsp = argsubparsers.add_parser("ls-tree", help="Pretty-print a tree object.")
argsp.add_argument("-r",
dest="recursive",
action="store_true",
help="Recurse into sub-trees")
argsp.add_argument("tree",
help="A tree-ish object.")
def cmd_ls_tree(args):
repo = repo_find()
ls_tree(repo, args.tree, args.recursive)
def ls_tree(repo, ref, recursive=None, prefix=""):
sha = object_find(repo, ref, fmt=b"tree")
obj = object_read(repo, sha)
for item in obj.items:
if len(item.mode) == 5:
type = item.mode[0:1]
else:
type = item.mode[0:2]
match type:
case b'04': type = "tree"
case b'10': type = "blob"
case b'12': type = "blob" # a symlink
case b'16': type = "commit" # a submodule
case b'sc': type = "subcommit"
case _: raise Exception(f"Weird tree leaf mode {item.mode}")
if not (recursive and type=='tree'): # This is a leaf
print("{0} {1} {2}\t{3}".format(
"0" * (6 - len(item.mode)) + item.mode.decode("ascii"),
# Git's ls-tree displays the type
# of the object pointed to.
type,
item.sha,
join_path(prefix, item.path)
))
if type=='subcommit':
commit_obj = object_read(repo, item.sha)
tree_sha = commit_obj.kvlm[b'tree'].decode("ascii")
ls_tree(repo, tree_sha, recursive, join_path(prefix, item.path))
else: # This is a tree
ls_tree(repo, item.sha, recursive, join_path(prefix, item.path))
argsp = argsubparsers.add_parser("checkout", help="Checkout a commit inside of a directory.")
argsp.add_argument("commit",
help="The commit or tree to checkout.")
argsp.add_argument("path",
help="The EMPTY directory to checkout on.")
def cmd_checkout(args):
repo = repo_find()
obj = object_read(repo, object_find(repo, args.commit))
# If the object is a commit, we grab its tree
if obj.fmt == b'commit':
obj = object_read(repo, obj.kvlm[b'tree'].decode("ascii"))
# Verify the path is an empty directory
if os.path.exists(args.path):
if not os.path.isdir(args.path):
raise Exception(f"This is not a directory: '{ags.path}'")
if os.listdir(args.path):
raise Exception(f"Directory '{args.path}' is not empty")
else:
os.makedirs(args.path)
tree_checkout(repo, obj, os.path.realpath(args.path))
def tree_checkout(repo, tree, path):
for item in tree.items:
obj = object_read(repo, item.sha)
dest = join_path(path, item.path)
if obj.fmt == b'tree':
os.mkdir(dest)
tree_checkout(repo, obj, dest)
elif obj.fmt == b'blob':
# TODO: Support symlinks (identified by mode 12*****)
with open(dest, 'wb') as f:
f.write(obj.blobdata)
def ref_resolve(repo, ref):
path = GitRepository.repo_file(repo, ref)
# Sometimes, an indirect reference may be broken. This is normal
# in one specific case: we're looking for HEAD on a new repository
# with no commits. In that case, .git/HEAD points to "ref:
# refs/heads/master", but .git/refs/heads/master doesn't exist yet
# (since there's no commit for it to refer to).
if not os.path.isfile(path):
return None
with open(path, "r") as fp:
data = fp.read()[:-1] # Drop final "\n"
if data.startswith("ref: "):
return ref_resolve(repo, data[len("ref: "):])
else:
return data
def ref_list(repo, path=None):
if not path:
path = repo_dir(repo, "refs")
ret = collections.OrderedDict()
for f in sorted(os.listdir(path)):
can = join_path(path, f)
if os.path.isdir(can):
ret[f] = ref_list(repo, can)
else:
ret[f] = ref_resolve(repo, can)
return ret
argsp = argsubparsers.add_parser(
"tag",
help="List and create tags")
argsp.add_argument("-a",
action="store_true",
dest="create_tag_object",
help="Whether to create a tag object")
argsp.add_argument("name",
nargs="?",
help="The new tag's name")
argsp.add_argument("object",
default="HEAD",
nargs="?",
help="The object the new tag will point to")
def cmd_tag(args):
repo = repo_find()
if args.name:
tag_create(repo,
args.name,
args.object,
type="object" if args.create_tag_object else "ref")
else:
refs = ref_list(repo)
show_ref(repo, refs["tags"], with_hash=False)
def tag_create(repo, name, ref, create_tag_object=False):
sha = object_find(repo, ref)
if create_tag_object:
tag = GitTag(repo)
tag.kvlm = collections.OrderedDict()
tag.kvlm[b'object'] = sha.encode()
tag.kvlm[b'type'] = b'commit'
tag.kvlm[b'tag'] = name.encode()
tag.kvlm[b'tagger'] = b'Wyag <wyag@example.com>'
tag.kvlm[None] = b'Tag generated by wyag'
tag_sha = object_write(tag)
ref_create(repo, "tags/" + name, tag_sha)
else:
ref_create(repo, "tags/" + name, sha)
def ref_create(repo, ref_name, sha):
with open(repo_file(repo, "refs/" + ref_name), "w") as fp:
fp.write(sha + "\n")
def object_resolve(repo, name):
"""Resolve a name to an object hash in repo.
This function is aware of:
- the HEAD literal
- short and long hashes
- tags
- branches
- remote branches"""
candidates = list()
hashRE = re.compile(r"[0-9A-Fa-f]{4,40}$")
# Abort on empty string
if not name.strip():
return None
if name == "HEAD":
return [ref_resolve(repo, "HEAD")]
if hashRE.match(name):
name = name.lower()
prefix = name[0:2]
path = GitRepository.repo_dir(repo, "objects", prefix, mkdir=False)
if path:
rem = name[2:]
for f in os.listdir(path):
if f.startswith(rem):
candidates.append(prefix + f)
# Try for references
as_tag = ref_resolve(repo, "refs/tags/" + name)
if as_tag:
candidates.append(as_tag)
as_branch = ref_resolve(repo, "refs/heads/" + name)
if as_branch:
candidates.append(as_branch)
return candidates
argsp = argsubparsers.add_parser(
"rev-parse",
help="Parse revision (or other objects) identifiers")
argsp.add_argument("--wyag-type",
metavar="type",
dest="type",
choices=["blob", "commit", "tag", "tree"],
default=None,
help="Specify the expected type")
argsp.add_argument("name",
help="The name to parse")
def cmd_rev_parse(args):
if args.type:
fmt = args.type.encode()
else:
fmt = None
repo = repo_find()
print(object_find(repo, args.name, fmt, follow=True))
class GitIndexEntry (object):
def __init__(self, ctime=None, mtime=None, dev=None, ino=None,
mode_type=None, mode_perms=None, uid=None, gid=None,
fsize=None, sha=None, flag_assume_valid=None,
flag_stage=None, name=None):
# The last time a file's metadata changed. This is a pair
# (timestamp in seconds, nanoseconds)
self.ctime = ctime
# The last time a file's data changed. This is a pair
# (timestamp in seconds, nanoseconds)
self.mtime = mtime
# The ID of device containing this file
self.dev = dev
# The file's inode number
self.ino = ino
# The object type, either b1000 (regular), b1010 (symlink),
# b1110 (gitlink).
self.mode_type = mode_type
# The object permissions, and integer
self.mode_perms = mode_perms
# User ID of the owner
self.uid = uid
# Group ID of the owner
self.gid = gid
# Size of this object, in bytes
self.fsize = fsize
# The object's SHA
self.sha = sha
self.flag_assume_valid = flag_assume_valid
self.flag_stage = flag_stage
# Name of the object (full path)
self.name = name
class GitIndex (object):
version = None
entries = []
# ext = None
# sha = None
def __init__(self, version=2, entries=None):
if not entries:
entries = list()
self.version = version
self.entries = entries
def index_read(repo):
index_file = GitRepository.repo_file(repo, "index")
# New repositories have no index!
if not os.path.exists(index_file):
return GitIndex()
with open(index_file, 'rb') as f:
raw = f.read()
header = raw[:12]
signature = header[:4]
assert signature == b"DIRC" # Stands for DIR Cache"
version = int.from_bytes(header[4:8], "big")
assert version == 2, "wyag only supports index file version 2"
count = int.from_bytes(header[8:12], "big")
entries = list()
content = raw[12:]
idx = 0
for i in range(0, count):
# Read creation time, as a unix timestamp (seconds since
# 1970-01-01 00:00:00, the "epoch")
ctime_s = int.from_bytes(content[idx: idx+4], "big")
# Read creation time, as nanoseconds after unix seconds
ctime_ns = int.from_bytes(content[idx+4: idx+8], "big")
# Modification time, unix timestamp
mtime_s = int.from_bytes(content[idx+8: idx+12], "big")
# Modification time, nanoseconds
mtime_ns = int.from_bytes(content[idx+12: idx+16], "big")
# Device ID
dev = int.from_bytes(content[idx+16: idx+20], "big")
# Inode
ino = int.from_bytes(content[idx+20: idx+24], "big")
# Ignored
unused = int.from_bytes(content[idx+24: idx+26], "big")
assert 0 == unused
mode = int.from_bytes(content[idx+26: idx+28], "big")
mode_type = mode >> 12
assert mode_type in [0b1000, 0b1010, 0b1110]
mode_perms = mode & 0b0000000111111111
# User ID
uid = int.from_bytes(content[idx+28: idx+32], "big")
# Group ID
gid = int.from_bytes(content[idx+32: idx+36], "big")
# Size
fsize = int.from_bytes(content[idx+36: idx+40], "big")
# SHA (object ID). We'll store it as a lowercase hex string
# for consistency
sha = format(int.from_bytes(content[idx+40: idx+60], "big"), "040x")
# Flags we're going to ignore
flags = int.from_bytes(content[idx+60: idx+62], "big")
# Parse flags
flag_assume_valid = (flags & 0b1000000000000000) != 0
flag_extended = (flags & 0b0100000000000000) != 0
assert not flag_extended
flag_stage = flags & 0b0011000000000000
# Length of the name. This is stored on 12 bits, some max value
# is 0xFFF, 4095.
name_length = flags & 0b0000111111111111
idx += 62
if name_length < 0xFFF:
assert content[idx + name_length] == 0x00
raw_name = content[idx:idx + name_length]
idx += name_length + 1
else:
print(f"Notice: Name is 0x{name_length:X} bytes long")
# TODO: This probably wasn't tested enough. It works with a
# path of exactly 0xFFF bytes. Any extra bytes broke
# something between git, my shell and my filesystem
null_idx = content.find(b'\x00', idx + 0xFFF)
raw_name = content[idx: null_idx]
idx = null_idx + 1
name = raw_name.decode("utf8")
# Data is padded on multiples of eight bytes for pointer
# alignment, so we skip as many bytes as we need for the next
# read to start at the right position.
idx = 8 * ceil(idx / 8)
entries.append(GitIndexEntry(ctime=(ctime_s, ctime_ns),
mtime=(mtime_s, mtime_ns),
dev=dev,
ino=ino,
mode_type=mode_type,
mode_perms=mode_perms,
uid=uid,
gid=gid,
fsize=fsize,
sha=sha,
flag_assume_valid=flag_assume_valid,
flag_stage=flag_stage,
name=name))
return GitIndex(version=version, entries=entries)
argsp = argsubparsers.add_parser("ls-files", help="List all the staged files")
argsp.add_argument("--verbose", action="store_true", help="Show everything.")
def cmd_ls_files(args):
repo = repo_find()
index = index_read(repo)
if args.verbose:
print(f"Index file format v{index.version}, containing {len(index.entries)} entries.")
for e in index.entries:
print(e.name)
if args.verbose:
print(" {} with perms: {:o}".format(
{0b1000: "regular file",
0b1010: "symlink",
0b1110: "git link" }[e.mode_type],
e.mode_perms))
print(f" on blob: {e.sha}")
print(" created: {}.{}, modified: {}.{}".format(
datetime.fromtimestamp(e.ctime[0]),
e.ctime[1],
datetime.fromtimestamp(e.mtime[0]),
e.mtime[1]))
print(f" device: {e.dev}, inode: {e.ino}")
print(f" uid: {e.uid} group: {e.gid}")
print(f" flags: stage={e.flag_stage} assume_valid={e.flag_assume_valid}")
argsp = argsubparsers.add_parser("check-ignore", help="Check path(s) against ignore rules.")
argsp.add_argument("path", nargs="+", help="Paths to check")
def cmd_check_ignore(args):
repo = repo_find()
rules = gitignore_read(repo)
for path in args.path:
if check_ignore(rules, path):
print(path)
def gitignore_parse1(raw):
raw = raw.strip()
if not raw or raw[0] == "#":
return None
elif raw[0] == "!":
return (raw[1:], False)
elif raw[0] == "\\":
return (raw[1:], True)
else:
return (raw, True)
def gitignore_parse(lines):
ret = list()
for line in lines:
parsed = gitignore_parse1(line)
if parsed:
ret.append(parsed)
return ret
class GitIgnore(object):
absolute = None
scoped = None
def __init__(self, absolute, scoped):
self.absolute = absolute
self.scoped = scoped
def gitignore_read(repo):
ret = GitIgnore(absolute=list(), scoped=dict())
# Read local configuration
repo_file = join_path(repo.gitdir, "info/exclude")
if os.path.exists(repo_file):
with open(repo_file, "r") as f:
ret.absolute.append(gitignore_parse(f.readlines()))
# Global configuration
if "XDG_CONFIG_HOME" in os.environ:
config_home == os.environ["XDG_CONFIG_HOME"]
else:
config_home = os.path.expanduser("~/.config")
global_file = join_path(config_home, "git/ignore")
if os.path.exists(global_file):
with open(global_file, "r") as f:
ret.absolute.append(gitignore_parse(f.readlines()))
# .gitignore files in the index
index = index_read(repo)
for entry in index.entries:
if entry.name == ".gitignore" or entry.name.endswith("/.gitignore"):
dir_name = os.path.dirname(entry.name)
contents = object_read(repo, entry.sha)
lines = contents.blobdata.decode("utf8").splitlines()
ret.scoped[dirname] = gitignore_parse(lines)
return ret
def check_ignore1(rules, path):
result = None
for (pattern, value) in rules:
if fnmatch(path, pattern):
result = value
return result
def check_ignore_scoped(rules, path):
parent = os.path.dirname(path)
while True:
if parent in rules:
result = check_ignore1(rules[parent], path)
if result != None:
return result
if parent == "":
break
parent = os.path.dirname(parent)
return None
def check_ignore_absolute(rules, path):
parent = os.path.dirname(path)
for ruleset in rules:
result = check_ignore1(ruleset, path)
if result != None:
return result
return False
def check_ignore(rules, path):
if os.path.isabs(path):
raise Exception("This function requires path to be relative to the repository's root")
# Eh, just hardcode it
if (path.startswith(".subcommit-git")):
return True
result = check_ignore_scoped(rules.scoped, path)
if result != None:
return result
return check_ignore_absolute(rules.absolute, path)
argsp = argsubparsers.add_parser("status", help="Show the working tree status.")
def cmd_status(_):
repo = repo_find()
index = index_read(repo)
cmd_status_branch(repo)
cmd_status_head_index(repo, index)
print()
cmd_status_index_worktree(repo, index)
def branch_get_active(repo):
with open(GitRepository.repo_file(repo, "HEAD"), "r") as f:
head = f.read()
if head.startswith("ref: refs/heads/"):
return(head[16:-1])
else:
return False
def cmd_status_branch(repo):
branch = branch_get_active(repo)
if branch:
print(f"On branch {branch}.")
else:
print("HEAD detached at {}".format(object_find(repo, "HEAD")))
def tree_to_dict(repo, ref, prefix=""):
ret = dict()
tree_sha = object_find(repo, ref, fmt=b"tree")
tree = object_read(repo, tree_sha)
for leaf in tree.items:
full_path = join_path(prefix, leaf.path)
# We read the object to extract its type (this is uselessly
# expensive: we could just open it as a file and read the
# first few bytes)
if (leaf.mode.startswith(b'04')):
raise Exception("Tree should not be child of tree")
is_subcommit = leaf.mode.startswith(b'sc')
# Depending on the type, we either store the path (if it's a
# blob, so a regular file), or recurse (if it's another tree,
# so a subdir)
if is_subcommit:
commit_obj = object_read(repo, leaf.sha)
tree_sha = commit_obj.kvlm[b'tree'].decode("ascii")
ret.update(tree_to_dict(repo, tree_sha, full_path))
else:
ret[full_path] = leaf.sha
return ret
def cmd_status_head_index(repo, index):
print("Changes to be commited:")
head = tree_to_dict(repo, "HEAD")
for entry in index.entries:
if entry.name in head:
if head[entry.name] != entry.sha:
print(" modified:", entry.name)
del head[entry.name]
else:
print(" added: ", entry.name)
# Keys still in HEAD are files that we haven't met in the index,
# and thus have been deleted
for entry in head.keys():
print(" deleted: ", entry)
def cmd_status_index_worktree(repo, index):
print("Changes not staged for commit:")
ignore = gitignore_read(repo)
gitdir_prefix = repo.gitdir + "/"
all_files = list()
# We begin by walking the filesystem
for (root, _, files) in os.walk(repo.worktree, True):
if root==repo.gitdir or root.startswith(gitdir_prefix):
continue
for f in files:
full_path = join_path(root, f)
rel_path = os.path.relpath(full_path, repo.worktree).replace("\\", "/")
all_files.append(rel_path)
# We now traverse the index, and compare real files with the cached
# versions.
for entry in index.entries:
full_path = join_path(repo.worktree, entry.name)
# That file *name* is in the index
if not os.path.exists(full_path):
print(" deleted: ", entry.name)
else:
stat = os.stat(full_path)
# Compare metadata
ctime_ns = entry.ctime[0] * 10**9 + entry.ctime[1]
mtime_ns = entry.mtime[0] * 10**9 + entry.mtime[1]
if (stat.st_ctime_ns != ctime_ns) or (stat.st_mtime_ns != mtime_ns):
# If different, deep compare.
# @FIXME This *will* crash on symlinks to dir.
with open(full_path, "rb") as fd:
new_sha = object_hash(fd, b"blob", None)
# If the hashes are the same, the files are actually the same.
same = entry.sha == new_sha
if not same:
print(" modified:", entry.name)
if entry.name in all_files:
all_files.remove(entry.name)
print()
print("Untracked files:")
for f in all_files:
# @TODO If a full directory is untracked, we should display
# its name without its contents.
if not check_ignore(ignore, f):
print(" ", f)
def index_write(repo, index):
with open(GitRepository.repo_file(repo, "index"), "wb") as f:
# Header
# Magic bytes which identify the file type.
f.write(b"DIRC")
f.write(index.version.to_bytes(4, "big"))
f.write(len(index.entries).to_bytes(4, "big"))
# Entries
idx = 0
for e in index.entries:
f.write(e.ctime[0].to_bytes(4, "big"))
f.write(e.ctime[1].to_bytes(4, "big"))
f.write(e.mtime[0].to_bytes(4, "big"))
f.write(e.mtime[1].to_bytes(4, "big"))
f.write((0).to_bytes(4, "big"))
f.write((0).to_bytes(4, "big"))
mode = (e.mode_type << 12) | e.mode_perms
f.write(mode.to_bytes(4, "big"))
f.write(e.uid.to_bytes(4, "big"))
f.write(e.gid.to_bytes(4, "big"))
f.write(e.fsize.to_bytes(4, "big"))
# FIXME: Convert back to int.
f.write(int(e.sha, 16).to_bytes(20, "big"))
flag_assume_valid = 0x1 << 15 if e.flag_assume_valid else 0
name_bytes = e.name.encode("utf8")
bytes_len = len(name_bytes)
if bytes_len >= 0xFFF:
name_length = 0xFFF
else:
name_length = bytes_len
f.write((flag_assume_valid | e.flag_stage | name_length).to_bytes(2, "big"))
f.write(name_bytes)
# Null-terminate the name string
f.write((0).to_bytes(1, "big"))
idx += 62 + len(name_bytes) + 1
# Add padding if necessary
if idx % 8 != 0:
pad = 8 - (idx % 8)
f.write((0).to_bytes(pad, "big"))
idx += pad
argsp = argsubparsers.add_parser("rm", help="Remove files from the working tree and the index.")
argsp.add_argument("path", nargs="+", help="Files to remove")
def cmd_rm(args):
repo = repo_find()
rm(repo, args.path)
def rm(repo, paths, delete=True, skip_missing=False):
index = index_read(repo)
worktree = repo.worktree.replace("\\", "/") + "/"
# Make paths absolute
abspaths = list()
for path in paths:
abspath = os.path.abspath(path).replace("\\", "/")
if abspath.startswith(worktree):
abspaths.append(abspath)
else:
raise Exception(f"Cannot remove paths outside of worktree: {path}")
kept_entries = list()
remove = list()
for e in index.entries:
full_path = join_path(repo.worktree, e.name)
if full_path in abspaths:
remove.append(full_path)
abspaths.remove(full_path)
else:
kept_entries.append(e)
if len(abspaths) > 0 and not skip_missing:
raise Exception(f"Cannot remove paths not in the index: {abspaths}")
if delete:
for path in remove:
os.unlink(path)
index.entries = kept_entries
index_write(repo, index)
argsp = argsubparsers.add_parser("add", help="Add files' contents to the index.")
argsp.add_argument("path", nargs="+", help="Files to add")
def cmd_add(args):
repo = repo_find()
add(repo, args.path)
def add(repo, paths, delete=True, skip_missing=False):
rm(repo, paths, delete=False, skip_missing=True)
worktree = repo.worktree + "/"
# Convert the paths to pairs: (absolute, relative_to_worktree).
# Also dlete delete them from the index if they're present.
clean_paths = list()
for path in paths:
abspath = os.path.abspath(path).replace("\\", "/")
if not (abspath.startswith(worktree) and os.path.isfile(abspath)):
raise Exception(f"Not a file, or outside the worktree: {paths}")
relpath = os.path.relpath(abspath, repo.worktree).replace("\\", "/")
clean_paths.append((abspath, relpath))
# Find and read the index. It was modified by rm. (This isn't
# optimal, but good enough)
#
# FIXME: though: we could just move the index through commands instead
# of reading and writing it over agains
index = index_read(repo)
for (abspath, relpath) in clean_paths:
with open(abspath, "rb") as fd:
sha = object_hash(fd, b"blob", repo)
stat = os.stat(abspath)
ctime_s = int(stat.st_ctime)
ctime_ns = stat.st_ctime_ns % 10**9
mtime_s = int(stat.st_mtime)
mtime_ns = stat.st_mtime_ns % 10**9
entry = GitIndexEntry(ctime=(ctime_s, ctime_ns), mtime=(mtime_s, mtime_ns), ino=stat.st_ino,
mode_type=0b1000, mode_perms=0o644, uid=stat.st_uid, gid=stat.st_gid,
fsize=stat.st_size, sha=sha, flag_assume_valid=False,
flag_stage=False, name=relpath)
index.entries.append(entry)
index_write(repo, index)
argsp = argsubparsers.add_parser("commit", help="Record changes to the repository.")
argsp.add_argument("-m",
metavar="mesaage",
dest="message",
help="Message to associate with this commit.")
def gitconfig_read():
xdg_config_home = os.environ["XDG_CONFIG_HOME"] if "XDG_CONFIG_HOME" in os.environ else "~/.config"
config_files = [
os.path.expanduser(join_path(xdg_config_home, "git/config")).replace("\\", "/"),
#os.path.expanduser("~/.gitconfig").replace("\\", "/")
]
config = configparser.ConfigParser()
config.read(config_files)
return config
def gitconfig_user_get(config):
if "user" in config:
if "name" in config["user"] and "email" in config["user"]:
return f"{config['user']['name']} <{config['user']['email']}>"
return None
def create_commit_map(repo, commit):
commit_map = dict()
create_commit_map_recurse(repo, commit, commit_map, "")
return commit_map
def create_commit_map_recurse(repo, commit, commit_map, path):
"""From a root commit, walk down the tree of subcommits. Returns a dict mapping
directory paths to (commit hash, commit objects).
Note: doesn't include root commit."""
tree_sha = commit.kvlm[b"tree"].decode('ascii')
obj = object_read(repo, tree_sha)
for item in obj.items:
if len(item.mode) == 5:
type = item.mode[0:1]
else:
type = item.mode[0:2]
match type:
case b'04': raise Exception("Trees should point to subcommits, not other trees")
case b'10': type = "blob"
case b'12': type = "blob" # a symlink
case b'16': type = "commit" # a submodule
case b'sc': type = "subcommit"
case _: raise Exception(f"Weird tree leaf mode {item.mode}")
if type == "subcommit":
if path == "":
fullpath = item.path
else:
fullpath = f"{path}/{item.path}"
subcommit = object_read(repo, item.sha)
commit_map[fullpath] = (item.sha, subcommit)
create_commit_map_recurse(repo, subcommit, commit_map, fullpath)
def tree_from_index(repo, index, commit_map, author, commit_time, message):
contents = dict()
contents[""] = list()
# Convert entries to dictionary where keys are directories, and values are
# lists of directory contents.
for entry in index.entries:
dirname = os.path.dirname(entry.name).replace("\\", "/")
# We create all dictionary entries up to root (""). We need them *all*
# because even if a directory holds no files it will contain at least
# a tree.
key = dirname
while key != "":
if not key in contents:
contents[key] = list()
key = os.path.dirname(key).replace("\\", "/")
contents[dirname].append(entry)
# Sort keys (= directories) by length, descending. This means that we'll
# always encounter a given path before its parent, which is all we need,
# since for each directory D we'll need to modify its parent P to add
# D's tree.
sorted_paths = sorted(contents.keys(), key=len, reverse=True)
sha = None
for path in sorted_paths:
tree = GitTree()
for entry in contents[path]:
# An entry can be a normal GitIndexEntry read from the index, or
# a tree we've created.
if isinstance(entry, GitIndexEntry):
leaf_mode = f"{entry.mode_type:02o}{entry.mode_perms:04o}".encode("ascii")
leaf = GitTreeLeaf(mode=leaf_mode, path=os.path.basename(entry.name), sha=entry.sha)
else: # Tree. We've stored it as a pair: (basename, SHA)
leaf = GitTreeLeaf(mode=b"sc0000", path=entry[0], sha=entry[1])
tree.items.append(leaf)
sha = object_write(tree, repo)
if path in commit_map:
(subcommit_hash, subcommit) = commit_map[path]
else:
subcommit_hash = None
subcommit = None
if subcommit == None or subcommit.kvlm[b'tree'] != sha.encode('ascii'):
new_subcommit = commit_create(repo,
sha,
subcommit_hash,
author,
commit_time,
message)
else:
new_subcommit = subcommit_hash
parent = os.path.dirname(path).replace("\\", "/")
base = os.path.basename(path)
contents[parent].append((base, new_subcommit))
return sha
def commit_create(repo, tree, parent, author, timestamp, message):
commit = GitCommit()
commit.kvlm[b"tree"] = tree.encode("ascii")
if parent:
commit.kvlm[b'parent'] = parent.encode("ascii")
offset = int(timestamp.astimezone().utcoffset().total_seconds())
hours = offset // 3600
minutes = (offset & 3600) // 60
tz = f"{'+' if offset > 0 else '-'}{hours:02}{minutes:02}"
if author == None:
author = ""
author = author + timestamp.strftime(" %S ") + tz
commit.kvlm[b"author"] = author.encode("utf8")
commit.kvlm[b"committer"] = author.encode("utf8")
commit.kvlm[None] = message.encode("utf8")
return object_write(commit, repo)
def cmd_commit(args):
repo = repo_find()
index = index_read(repo)
root_commit_sha = object_find(repo, "HEAD")
if root_commit_sha:
root_commit = object_read(repo, root_commit_sha)
else:
root_commit = None
commit_time = datetime.now()
author = gitconfig_user_get(gitconfig_read())
if root_commit:
commit_map = create_commit_map(repo, root_commit)
else:
commit_map = dict()
tree = tree_from_index(repo,
index,
commit_map,
author,
commit_time,
args.message)
commit = commit_create(repo,
tree,
root_commit_sha,
author,
commit_time,
args.message)
# Update HEAD so our commit is now the tip of the active branch.
active_branch = branch_get_active(repo)
if active_branch: # If on a branch, update that branch
with open(GitRepository.repo_file(repo, join_path("refs/heads", active_branch)), "w") as fd:
fd.write(commit + "\n")
else: # Otherwise, we update HEAD itself.
with open(repo_file(repo, "HEAD"), "w") as fd:
fd.write(commit + "\n")
argsp = argsubparsers.add_parser("graph-objects", help="Show git objects in a dot graph")
argsp.add_argument("object",
nargs="?",
metavar="object",
help="The object the graph will start from")
def graph_objects_tree(repo, sha, seen, commit_parents=False):
if sha in seen:
return
seen.add(sha)
yield f"t_{sha} [label=\"tree {sha[0:7]}\"]"
# TODO: inefficient
obj = object_read(repo, sha)
for item in obj.items:
if len(item.mode) == 5:
type = item.mode[0:1]
else:
type = item.mode[0:2]
match type:
case b'04': type = "tree"
case b'10': type = "blob"
case b'12': type = "blob" # a symlink
case b'16': type = "commit" # a submodule
case b'sc': type = "subcommit"
case _: raise Exception(f"Weird tree leaf mode {item.mode}")
if type == "tree":
yield f"t_{sha} -> t_{item.sha}"
yield from graph_objects_tree(repo, item.sha, seen)
if type == "commit" or type == "subcommit":
yield f"t_{sha} -> c_{item.sha}"
yield from graph_objects(repo, item.sha, seen, commit_parents)
if type == "blob":
yield f"b_{item.sha} [label=\"{item.sha[0:7]} {item.path}\"]"
yield f"t_{sha} -> b_{item.sha}"
def graph_objects(repo, sha, seen, commit_parents=False):
if sha in seen:
return
seen.add(sha)
obj = object_read(repo, sha)
match obj.fmt:
case b"commit":
message = obj.kvlm[None]
yield f"c_{sha} [label=\"commit {sha[0:7]}\n {message.decode('utf8')}\"]"
tree_sha = obj.kvlm[b"tree"].decode("ascii")
yield f"c_{sha} -> t_{tree_sha}"
yield from graph_objects_tree(repo, tree_sha, seen, commit_parents)
if commit_parents and b'parent' in obj.kvlm.keys():
yield f"c_{sha} -> c_{obj.kvlm[b'parent'].decode('ascii')}"
case b"tree":
yield from graph_objects_tree(repo, sha, seen, commits_parents)
def cmd_graph_objects(args):
repo = repo_find()
if args.object == None:
obj_name = "HEAD"
else:
obj_name = args.object
obj_sha = object_find(repo, obj_name)
graph = "digraph objectgraph{\n"
graph += " node[shape=rect]\n"
for str in graph_objects(repo, obj_sha, set()):
graph += " " + str + "\n"
graph += "}"
print(graph)