08771733b1
Before you had to do it through vanilla git
1646 lines
51 KiB
Python
1646 lines
51 KiB
Python
import argparse
|
|
import collections
|
|
import configparser
|
|
from datetime import datetime
|
|
from fnmatch import fnmatch
|
|
import hashlib
|
|
from math import ceil
|
|
import os
|
|
import re
|
|
import sys
|
|
import zlib
|
|
|
|
def join_path(first, *rest):
|
|
if (first == ""):
|
|
return "/".join(list(rest))
|
|
return "/".join([first] + list(rest))
|
|
|
|
argparser = argparse.ArgumentParser(description="The stupidest content tracker")
|
|
|
|
argsubparsers = argparser.add_subparsers(title="Commands", dest="command")
|
|
argsubparsers.required = True
|
|
|
|
def main(argv=sys.argv[1:]):
|
|
args = argparser.parse_args(argv)
|
|
match args.command:
|
|
case "add" : cmd_add(args)
|
|
case "cat-file" : cmd_cat_file(args)
|
|
case "check-ignore" : cmd_check_ignore(args)
|
|
case "checkout" : cmd_checkout(args)
|
|
case "commit" : cmd_commit(args)
|
|
case "hash-object" : cmd_hash_object(args)
|
|
case "init" : cmd_init(args)
|
|
case "log" : cmd_log(args)
|
|
case "ls-files" : cmd_ls_files(args)
|
|
case "ls-tree" : cmd_ls_tree(args)
|
|
case "rev-parse" : cmd_rev_parse(args)
|
|
case "rm" : cmd_rm(args)
|
|
case "show-ref" : cmd_show_ref(args)
|
|
case "status" : cmd_status(args)
|
|
case "tag" : cmd_tag(args)
|
|
case "graph-objects" : cmd_graph_objects(args)
|
|
case _ : print("Bad command.")
|
|
|
|
class GitRepository (object):
|
|
"""A git repo"""
|
|
|
|
worktree = None
|
|
gitdir = None
|
|
conf = None
|
|
|
|
def __init__(self, path, force=False):
|
|
self.worktree = path.replace("\\", "/")
|
|
self.gitdir = join_path(path, ".subcommit-git")
|
|
|
|
if not (force or os.path.isdir(self.gitdir)):
|
|
raise Exception(f"Not a git repository {path}")
|
|
|
|
# Read configuration file in .git/config
|
|
self.conf = configparser.ConfigParser()
|
|
cf = GitRepository.repo_file(self, "config")
|
|
|
|
if cf and os.path.exists(cf):
|
|
self.conf.read([cf])
|
|
elif not force:
|
|
raise Exception("Configuration file missing")
|
|
|
|
if not force:
|
|
vers = int(self.conf.get("core", "repositoryformatversion"))
|
|
if vers != 0:
|
|
raise Exception(f"Unsupported repositoryformatversion {vers}")
|
|
|
|
def repo_path(repo, *path):
|
|
"""Compute path under repo's gitdir."""
|
|
return join_path(repo.gitdir, *path)
|
|
|
|
def repo_file(repo, *path, mkdir=False):
|
|
"""Same as repo_path, but create dirname(*path) if absent. For
|
|
example, repo_file(r, \"refs\", \"remotes\", \"origin\", \"HEAD\") will create
|
|
.git/refs/remotes/origin."""
|
|
|
|
if GitRepository.repo_dir(repo, *path[:-1], mkdir=mkdir):
|
|
return GitRepository.repo_path(repo, *path)
|
|
|
|
def repo_dir(repo, *path, mkdir=False):
|
|
"""Same as repo_path, bt mkdir *path if absent if mkdir"""
|
|
|
|
path = GitRepository.repo_path(repo, *path)
|
|
|
|
if (os.path.exists(path)):
|
|
if (os.path.isdir(path)):
|
|
return path
|
|
else:
|
|
raise Exception(f"Not a directory {path}")
|
|
|
|
if mkdir:
|
|
os.makedirs(path)
|
|
return path
|
|
else:
|
|
return None
|
|
|
|
def repo_create(path):
|
|
"""Create a new repository at path."""
|
|
|
|
repo = GitRepository(path, True)
|
|
|
|
# First, we make sure the path either doesn't exist or is an
|
|
# empty dir.
|
|
|
|
if os.path.exists(repo.worktree):
|
|
if not os.path.isdir(repo.worktree):
|
|
raise Exception(f"{path} is not a directory!")
|
|
if os.path.exists(repo.gitdir) and os.listdir(repo.gitdir):
|
|
raise Exception(f"{path} is not empty!")
|
|
else:
|
|
os.makedirs(repo.worktree)
|
|
|
|
assert GitRepository.repo_dir(repo, "branches", mkdir=True)
|
|
assert GitRepository.repo_dir(repo, "objects", mkdir=True)
|
|
assert GitRepository.repo_dir(repo, "refs", "tags", mkdir=True)
|
|
assert GitRepository.repo_dir(repo, "refs", "heads", mkdir=True)
|
|
|
|
# .git/description
|
|
with open(GitRepository.repo_file(repo, "description"), "w") as f:
|
|
f.write("Unnamed repository; edit this file 'description' to name the repository")
|
|
|
|
with open(GitRepository.repo_file(repo, "HEAD"), "w") as f:
|
|
f.write("ref: refs/heads/master\n")
|
|
|
|
with open(GitRepository.repo_file(repo, "config"), "w") as f:
|
|
config = repo_default_config()
|
|
config.write(f)
|
|
|
|
return repo
|
|
|
|
def repo_default_config():
|
|
ret = configparser.ConfigParser()
|
|
|
|
ret.add_section("core")
|
|
ret.set("core", "repositoryformatVersion", "0")
|
|
ret.set("core", "filemode", "false")
|
|
ret.set("core", "bare", "false")
|
|
|
|
return ret
|
|
|
|
argsp = argsubparsers.add_parser("init", help="Initialize a new, empty repository.")
|
|
|
|
argsp.add_argument("path",
|
|
metavar="directory",
|
|
nargs="?",
|
|
default=".",
|
|
help="Where to create the repository.")
|
|
|
|
def cmd_init(args):
|
|
repo_create(args.path)
|
|
|
|
def repo_find(path=".", required=True):
|
|
path = os.path.realpath(path)
|
|
|
|
if os.path.isdir(join_path(path, ".subcommit-git")):
|
|
return GitRepository(path)
|
|
|
|
# If we haven't returned, recurse in parent
|
|
parent = os.path.realpath(join_path(path, ".."))
|
|
|
|
match os.name:
|
|
case "nt":
|
|
is_root = os.path.splitdrive(path)[1] == "\\"
|
|
case "posix":
|
|
# If parent==path, then path is root.
|
|
is_root = parent == path
|
|
case _:
|
|
raise Exception(f"Unsupported os {os.name}")
|
|
|
|
if is_root:
|
|
if required:
|
|
raise Exception("No git directory.")
|
|
else:
|
|
return None
|
|
|
|
# Recursive case
|
|
return repo_find(parent, required)
|
|
|
|
argsp = argsubparsers.add_parser("status", help="Get the status of the repo")
|
|
|
|
argsp.add_argument("path",
|
|
metavar="directory",
|
|
nargs="?",
|
|
default=".",
|
|
help="Which directory")
|
|
|
|
def cmd_status(args):
|
|
# TODO: actually get status
|
|
print(repo_find(args.path).worktree)
|
|
|
|
class GitObject (object):
|
|
def __init__(self, data=None):
|
|
if data != None:
|
|
self.deserialize(data)
|
|
else:
|
|
self.init()
|
|
|
|
def serialize(self, repo):
|
|
"""This function MUST be implemented by subclasses.
|
|
|
|
It must read the object's contents from self.data, a byte string, and
|
|
do whatever it takes to convert it into a meaningful representation. What exactly that means depends on each subclass."""
|
|
raise Exception("Unimplemented!")
|
|
|
|
def deserialize(self, repo):
|
|
raise Exception("Unimplemented!")
|
|
|
|
def init(self):
|
|
pass # other implementations might do something here
|
|
|
|
def object_read(repo, sha):
|
|
"""Read object sha from Git repository repo. Return a
|
|
GitObject whose exact type depends on the object."""
|
|
|
|
path = GitRepository.repo_file(repo, "objects", sha[0:2], sha[2:])
|
|
|
|
if not os.path.isfile(path):
|
|
return None
|
|
|
|
with open(path, "rb") as f:
|
|
raw = zlib.decompress(f.read())
|
|
|
|
# Read object type
|
|
x = raw.find(b' ')
|
|
fmt = raw[0:x]
|
|
|
|
# Read and validate object size
|
|
y = raw.find(b'\x00', x)
|
|
size = int(raw[x:y].decode("ascii"))
|
|
if size != len(raw)-y-1:
|
|
raise Exception(f"Malformed object {sha}: bad length")
|
|
|
|
match fmt:
|
|
case b'commit' : c=GitCommit
|
|
case b'tree' : c=GitTree
|
|
case b'tag' : c=GitTag
|
|
case b'blob' : c=GitBlob
|
|
case _:
|
|
raise Exception(f"Unknown type {fmt.decode('ascii')} for object {sha}")
|
|
|
|
return c(raw[y+1:])
|
|
|
|
def object_write(obj, repo=None):
|
|
data = obj.serialize()
|
|
# Add header
|
|
result = obj.fmt + b' ' + str(len(data)).encode() + b'\x00' + data
|
|
# Compute hash
|
|
sha = hashlib.sha1(result).hexdigest()
|
|
|
|
if repo:
|
|
# Compute path
|
|
path=GitRepository.repo_file(repo, "objects", sha[0:2], sha[2:], mkdir=True)
|
|
|
|
if not os.path.exists(path):
|
|
with open(path, 'wb') as f:
|
|
f.write(zlib.compress(result))
|
|
return sha
|
|
|
|
class GitBlob(GitObject):
|
|
fmt=b'blob'
|
|
|
|
def serialize(self):
|
|
return self.blobdata
|
|
|
|
def deserialize(self, data):
|
|
self.blobdata = data
|
|
|
|
argsp = argsubparsers.add_parser("cat-file",
|
|
help="Provide content of repository objects")
|
|
|
|
argsp.add_argument("type",
|
|
metavar="type",
|
|
choices=["blob", "commit", "tag", "tree"],
|
|
help="Specify the type")
|
|
|
|
argsp.add_argument("object",
|
|
metavar="object",
|
|
help="The object to display")
|
|
|
|
def cmd_cat_file(args):
|
|
repo = repo_find()
|
|
cat_file(repo, args.object, fmt=args.type.encode())
|
|
|
|
def cat_file(repo, obj, fmt=None):
|
|
obj = object_read(repo, object_find(repo, obj, fmt=fmt))
|
|
if fmt == b'tree':
|
|
for item in obj.items:
|
|
print(f"{item.mode.decode('ascii')} {item.path} {item.sha}")
|
|
else:
|
|
sys.stdout.buffer.write(obj.serialize())
|
|
|
|
def object_find(repo, name, fmt=None, follow=True):
|
|
sha = object_resolve(repo, name)
|
|
|
|
if not sha:
|
|
raise Exception(f"No such reference {name}")
|
|
|
|
if len(sha) > 1:
|
|
raise Exception(f"Ambiguous reference {0}: Candidates are:\n - {1}.".format(name, "\n - ".join(sha)))
|
|
|
|
sha = sha[0]
|
|
|
|
if not fmt:
|
|
return sha
|
|
|
|
while True:
|
|
obj = object_read(repo, sha)
|
|
|
|
if obj.fmt == fmt:
|
|
return sha
|
|
|
|
if not follow:
|
|
return None
|
|
|
|
if obj.fmt == b'tag':
|
|
sha = obj.kvlm[b'object'].decode("ascii")
|
|
elif obj.fmt == b'commit' and fmt == b'tree':
|
|
sha = obj.kvlm[b'tree'].decode("ascii")
|
|
else:
|
|
return None
|
|
|
|
argsp = argsubparsers.add_parser(
|
|
"hash-object",
|
|
help="Compute object ID an optionally creates a blob from a file")
|
|
|
|
argsp.add_argument("-t",
|
|
metavar="type",
|
|
dest="type",
|
|
choices=["blob", "commit", "tag", "tree"],
|
|
default="blob",
|
|
help="Specify the type")
|
|
|
|
argsp.add_argument("-w",
|
|
dest="write",
|
|
action="store_true",
|
|
help="Actually write the object into the database")
|
|
|
|
argsp.add_argument("path",
|
|
help="Read object from <file>")
|
|
|
|
def cmd_hash_object(args):
|
|
if args.write:
|
|
repo = repo_find()
|
|
else:
|
|
repo = None
|
|
|
|
with open(args.path, "rb") as fd:
|
|
sha = object_hash(fd, args.type.encode(), repo)
|
|
print(sha)
|
|
|
|
def object_hash(fd, fmt, repo=None):
|
|
"""Hash object, writing it to repo if provided"""
|
|
data = fd.read()
|
|
|
|
match fmt:
|
|
case b'commit' : obj=GitCommit(data)
|
|
case b'tree' : obj=GitTree(data)
|
|
case b'tag' : obj=GitTag(data)
|
|
case b'blob' : obj=GitBlob(data)
|
|
case _ : raise Exception(f"Unknown type {fmt}")
|
|
|
|
return object_write(obj, repo)
|
|
|
|
def kvlm_parse(raw, start=0, dct=None):
|
|
if not dct:
|
|
dct = collections.OrderedDict()
|
|
# You CANNOT declare the argument as dct=OrderedDict() or all
|
|
# call to the functions will endlessly grow the same dict.
|
|
|
|
# This function is recursive: it reads a key/value pair, then call
|
|
# itself back with the new position. So we first need to know
|
|
# where we are: at a keyword, or already in the messageQ
|
|
|
|
# We search for the next space and the next newline.
|
|
spc = raw.find(b' ', start)
|
|
nl = raw.find(b'\n', start)
|
|
|
|
# If space appears before newline, we have a keyword. Otherwise,
|
|
# it's the final message, which we just read to the end of the file.
|
|
|
|
# Base case
|
|
# =========
|
|
#
|
|
# If newline appears first (or there's no space at all), we asume
|
|
# a blank line. A blank line means the remainder of the data is the
|
|
# message. We store it in the dictionary, with None as the key, and
|
|
# return.
|
|
if (spc < 0) or (nl < spc):
|
|
assert nl == start
|
|
dct[None] = raw[start+1:]
|
|
return dct
|
|
|
|
# Recursive case
|
|
# ==============
|
|
#
|
|
# We read a key-value pair and recurse for the next.
|
|
key = raw[start:spc]
|
|
|
|
# Find the end of the value. Continuation lines begin with a
|
|
# space, so we loop until we find a "\n" not followed by a space.
|
|
end = start
|
|
while True:
|
|
end = raw.find(b'\n', end+1)
|
|
if raw[end+1] != ord(' '): break
|
|
|
|
value = raw[spc+1:end].replace(b'\n ', b'\n')
|
|
|
|
# Don't overwrite existing data contents
|
|
if key in dct:
|
|
if type(dct[key]) == list:
|
|
dct[key].append(value)
|
|
else:
|
|
dct[key] = [ dct[key], value ]
|
|
else:
|
|
dct[key] = value
|
|
|
|
return kvlm_parse(raw, start=end+1, dct=dct)
|
|
|
|
def kvlm_serialize(kvlm):
|
|
ret = b''
|
|
|
|
# Output fiels
|
|
for k in kvlm.keys():
|
|
# Skip the message itself
|
|
if k == None: continue
|
|
val = kvlm[k]
|
|
# Normalize to a list
|
|
if type(val) != list:
|
|
val = [ val ]
|
|
|
|
for v in val:
|
|
ret += k + b' ' + (v.replace(b'\n', b'\n ')) + b'\n'
|
|
|
|
ret += b'\n' + kvlm[None] + b'\n'
|
|
|
|
return ret
|
|
|
|
class GitCommit(GitObject):
|
|
fmt = b'commit'
|
|
|
|
def deserialize(self, data):
|
|
self.kvlm = kvlm_parse(data)
|
|
|
|
def serialize(self):
|
|
return kvlm_serialize(self.kvlm)
|
|
|
|
def init(self):
|
|
self.kvlm = dict()
|
|
|
|
argsp = argsubparsers.add_parser("log", help="Display history of a given commit.")
|
|
argsp.add_argument("commit",
|
|
default="HEAD",
|
|
nargs="?",
|
|
help="Commit to start at.")
|
|
|
|
argsp.add_argument("--show-objects", action="store_true", help="Show objects along with commit DAG")
|
|
|
|
def cmd_log(args):
|
|
repo = repo_find()
|
|
|
|
print(log_graphviz(repo, args.commit, args.show_objects))
|
|
|
|
def log_graphviz_recurse(repo, sha, seen, show_objects):
|
|
if sha in seen:
|
|
return
|
|
|
|
commit = object_read(repo, sha)
|
|
short_hash = sha[0:8]
|
|
message = commit.kvlm[None].decode("utf8").strip()
|
|
message = message.replace("\\", "\\\\")
|
|
message = message.replace("\"", "\\\"")
|
|
|
|
if "\n" in message: # keep only the first line
|
|
message = message[:message.index("\n")]
|
|
if show_objects:
|
|
for line in graph_objects(repo, sha, seen, True):
|
|
yield f" {line}\n"
|
|
|
|
else:
|
|
yield f" c_{sha} [label=\"{sha[0:7]}: {message}\"]"
|
|
assert commit.fmt==b'commit'
|
|
|
|
if not b'parent' in commit.kvlm.keys():
|
|
# Base case: the initial commit.
|
|
return
|
|
|
|
parents = commit.kvlm[b'parent']
|
|
|
|
if type(parents) != list:
|
|
parents = [ parents ]
|
|
|
|
for p in parents:
|
|
p = p.decode("ascii")
|
|
# Commits are also linked to parents in graph_objects
|
|
if not show_objects:
|
|
yield f" c_{sha} -> c_{p}"
|
|
yield from log_graphviz_recurse(repo, p, seen, show_objects)
|
|
|
|
def log_graphviz(repo, sha, show_objects):
|
|
seen = set()
|
|
|
|
graph = "digraph wyaglog{\n"
|
|
graph += " node[shape=rect]\n"
|
|
for line in log_graphviz_recurse(repo, object_find(repo, sha), seen, show_objects):
|
|
graph += " " + line + "\n"
|
|
|
|
graph += "}"
|
|
|
|
return graph
|
|
|
|
class GitTreeLeaf (object):
|
|
def __init__(self, mode, path, sha):
|
|
self.mode = mode
|
|
self.path = path
|
|
self.sha = sha
|
|
|
|
def tree_parse_one(raw, start=0):
|
|
# Find the space terminator of the mode
|
|
x = raw.find(b' ', start)
|
|
assert x-start == 5 or x-start == 6
|
|
|
|
mode = raw[start:x]
|
|
if len(mode) == 5:
|
|
# Normalize to six bytes.
|
|
mode = b"0" + mode
|
|
|
|
# Find the NULL terminator of the path
|
|
y = raw.find(b'\x00', x)
|
|
path = raw[x+1:y]
|
|
|
|
sha = format(int.from_bytes(raw[y+1:y+21], "big"), "040x")
|
|
return y+21, GitTreeLeaf(mode, path.decode('utf8'), sha)
|
|
|
|
def tree_parse(raw):
|
|
pos = 0
|
|
max = len(raw)
|
|
ret = list()
|
|
while pos < max:
|
|
pos, data = tree_parse_one(raw, pos)
|
|
ret.append(data)
|
|
|
|
return ret
|
|
|
|
# Notice: this isn't a comparison function, but a conversion function.
|
|
# Python's default sort doesn't accept a custom comparison function,
|
|
# like in most languages, but a 'key' argument that returns a new
|
|
# value, which is compared using the default rules. So we just return
|
|
# the leaf name, with an extra '/' if it's a directory.
|
|
def tree_leaf_sort_key(leaf):
|
|
if leaf.mode.startswith(b"10"):
|
|
return leaf.path
|
|
else:
|
|
return leaf.path + "/"
|
|
|
|
def tree_serialize(obj):
|
|
obj.items.sort(key=tree_leaf_sort_key)
|
|
ret = b''
|
|
for i in obj.items:
|
|
ret += i.mode
|
|
ret += b' '
|
|
ret += i.path.encode("utf8")
|
|
ret += b'\x00'
|
|
sha = int(i.sha, 16)
|
|
ret += sha.to_bytes(20, byteorder="big")
|
|
return ret
|
|
|
|
class GitTree(GitObject):
|
|
fmt=b'tree'
|
|
|
|
def deserialize(self, data):
|
|
self.items = tree_parse(data)
|
|
|
|
def serialize(self):
|
|
return tree_serialize(self)
|
|
|
|
def init(self):
|
|
self.items = list()
|
|
|
|
argsp = argsubparsers.add_parser("ls-tree", help="Pretty-print a tree object.")
|
|
argsp.add_argument("-r",
|
|
dest="recursive",
|
|
action="store_true",
|
|
help="Recurse into sub-trees")
|
|
|
|
argsp.add_argument("tree",
|
|
help="A tree-ish object.")
|
|
|
|
def cmd_ls_tree(args):
|
|
repo = repo_find()
|
|
ls_tree(repo, args.tree, args.recursive)
|
|
|
|
def ls_tree(repo, ref, recursive=None, prefix=""):
|
|
sha = object_find(repo, ref, fmt=b"tree")
|
|
obj = object_read(repo, sha)
|
|
for item in obj.items:
|
|
if len(item.mode) == 5:
|
|
type = item.mode[0:1]
|
|
else:
|
|
type = item.mode[0:2]
|
|
|
|
match type:
|
|
case b'04': type = "tree"
|
|
case b'10': type = "blob"
|
|
case b'12': type = "blob" # a symlink
|
|
case b'16': type = "commit" # a submodule
|
|
case b'sc': type = "subcommit"
|
|
case _: raise Exception(f"Weird tree leaf mode {item.mode}")
|
|
|
|
if not (recursive and type=='tree'): # This is a leaf
|
|
print("{0} {1} {2}\t{3}".format(
|
|
"0" * (6 - len(item.mode)) + item.mode.decode("ascii"),
|
|
# Git's ls-tree displays the type
|
|
# of the object pointed to.
|
|
type,
|
|
item.sha,
|
|
join_path(prefix, item.path)
|
|
))
|
|
if type=='subcommit':
|
|
commit_obj = object_read(repo, item.sha)
|
|
tree_sha = commit_obj.kvlm[b'tree'].decode("ascii")
|
|
ls_tree(repo, tree_sha, recursive, join_path(prefix, item.path))
|
|
else: # This is a tree
|
|
ls_tree(repo, item.sha, recursive, join_path(prefix, item.path))
|
|
|
|
|
|
argsp = argsubparsers.add_parser("checkout", help="Checkout a commit inside of a directory.")
|
|
|
|
argsp.add_argument("commit",
|
|
help="The commit or tree to checkout.")
|
|
|
|
argsp.add_argument("path",
|
|
help="The EMPTY directory to checkout on.")
|
|
|
|
def cmd_checkout(args):
|
|
repo = repo_find()
|
|
|
|
obj = object_read(repo, object_find(repo, args.commit))
|
|
|
|
# If the object is a commit, we grab its tree
|
|
if obj.fmt == b'commit':
|
|
obj = object_read(repo, obj.kvlm[b'tree'].decode("ascii"))
|
|
|
|
# Verify the path is an empty directory
|
|
if os.path.exists(args.path):
|
|
if not os.path.isdir(args.path):
|
|
raise Exception(f"This is not a directory: '{ags.path}'")
|
|
if os.listdir(args.path):
|
|
raise Exception(f"Directory '{args.path}' is not empty")
|
|
else:
|
|
os.makedirs(args.path)
|
|
|
|
tree_checkout(repo, obj, os.path.realpath(args.path))
|
|
|
|
def tree_checkout(repo, tree, path):
|
|
for item in tree.items:
|
|
obj = object_read(repo, item.sha)
|
|
dest = join_path(path, item.path)
|
|
|
|
if obj.fmt == b'tree':
|
|
os.mkdir(dest)
|
|
tree_checkout(repo, obj, dest)
|
|
elif obj.fmt == b'blob':
|
|
# TODO: Support symlinks (identified by mode 12*****)
|
|
with open(dest, 'wb') as f:
|
|
f.write(obj.blobdata)
|
|
|
|
def ref_resolve(repo, ref):
|
|
path = GitRepository.repo_file(repo, ref)
|
|
|
|
# Sometimes, an indirect reference may be broken. This is normal
|
|
# in one specific case: we're looking for HEAD on a new repository
|
|
# with no commits. In that case, .git/HEAD points to "ref:
|
|
# refs/heads/master", but .git/refs/heads/master doesn't exist yet
|
|
# (since there's no commit for it to refer to).
|
|
if not os.path.isfile(path):
|
|
return None
|
|
|
|
with open(path, "r") as fp:
|
|
data = fp.read()[:-1] # Drop final "\n"
|
|
|
|
if data.startswith("ref: "):
|
|
return ref_resolve(repo, data[len("ref: "):])
|
|
else:
|
|
return data
|
|
|
|
def ref_list(repo, path=None):
|
|
if not path:
|
|
path = repo_dir(repo, "refs")
|
|
|
|
ret = collections.OrderedDict()
|
|
|
|
for f in sorted(os.listdir(path)):
|
|
can = join_path(path, f)
|
|
if os.path.isdir(can):
|
|
ret[f] = ref_list(repo, can)
|
|
else:
|
|
ret[f] = ref_resolve(repo, can)
|
|
|
|
return ret
|
|
|
|
argsp = argsubparsers.add_parser(
|
|
"tag",
|
|
help="List and create tags")
|
|
|
|
argsp.add_argument("-a",
|
|
action="store_true",
|
|
dest="create_tag_object",
|
|
help="Whether to create a tag object")
|
|
|
|
argsp.add_argument("name",
|
|
nargs="?",
|
|
help="The new tag's name")
|
|
|
|
argsp.add_argument("object",
|
|
default="HEAD",
|
|
nargs="?",
|
|
help="The object the new tag will point to")
|
|
|
|
def cmd_tag(args):
|
|
repo = repo_find()
|
|
|
|
if args.name:
|
|
tag_create(repo,
|
|
args.name,
|
|
args.object,
|
|
type="object" if args.create_tag_object else "ref")
|
|
else:
|
|
refs = ref_list(repo)
|
|
show_ref(repo, refs["tags"], with_hash=False)
|
|
|
|
def tag_create(repo, name, ref, create_tag_object=False):
|
|
sha = object_find(repo, ref)
|
|
|
|
if create_tag_object:
|
|
tag = GitTag(repo)
|
|
tag.kvlm = collections.OrderedDict()
|
|
tag.kvlm[b'object'] = sha.encode()
|
|
tag.kvlm[b'type'] = b'commit'
|
|
tag.kvlm[b'tag'] = name.encode()
|
|
tag.kvlm[b'tagger'] = b'Wyag <wyag@example.com>'
|
|
tag.kvlm[None] = b'Tag generated by wyag'
|
|
tag_sha = object_write(tag)
|
|
ref_create(repo, "tags/" + name, tag_sha)
|
|
else:
|
|
ref_create(repo, "tags/" + name, sha)
|
|
|
|
def ref_create(repo, ref_name, sha):
|
|
with open(repo_file(repo, "refs/" + ref_name), "w") as fp:
|
|
fp.write(sha + "\n")
|
|
|
|
def object_resolve(repo, name):
|
|
"""Resolve a name to an object hash in repo.
|
|
|
|
This function is aware of:
|
|
|
|
- the HEAD literal
|
|
- short and long hashes
|
|
- tags
|
|
- branches
|
|
- remote branches"""
|
|
candidates = list()
|
|
hashRE = re.compile(r"[0-9A-Fa-f]{4,40}$")
|
|
|
|
# Abort on empty string
|
|
if not name.strip():
|
|
return None
|
|
|
|
if name == "HEAD":
|
|
return [ref_resolve(repo, "HEAD")]
|
|
|
|
if hashRE.match(name):
|
|
name = name.lower()
|
|
prefix = name[0:2]
|
|
path = GitRepository.repo_dir(repo, "objects", prefix, mkdir=False)
|
|
if path:
|
|
rem = name[2:]
|
|
for f in os.listdir(path):
|
|
if f.startswith(rem):
|
|
candidates.append(prefix + f)
|
|
|
|
# Try for references
|
|
as_tag = ref_resolve(repo, "refs/tags/" + name)
|
|
if as_tag:
|
|
candidates.append(as_tag)
|
|
|
|
as_branch = ref_resolve(repo, "refs/heads/" + name)
|
|
if as_branch:
|
|
candidates.append(as_branch)
|
|
|
|
return candidates
|
|
|
|
argsp = argsubparsers.add_parser(
|
|
"rev-parse",
|
|
help="Parse revision (or other objects) identifiers")
|
|
|
|
argsp.add_argument("--wyag-type",
|
|
metavar="type",
|
|
dest="type",
|
|
choices=["blob", "commit", "tag", "tree"],
|
|
default=None,
|
|
help="Specify the expected type")
|
|
|
|
argsp.add_argument("name",
|
|
help="The name to parse")
|
|
|
|
def cmd_rev_parse(args):
|
|
if args.type:
|
|
fmt = args.type.encode()
|
|
else:
|
|
fmt = None
|
|
|
|
repo = repo_find()
|
|
|
|
print(object_find(repo, args.name, fmt, follow=True))
|
|
|
|
class GitIndexEntry (object):
|
|
def __init__(self, ctime=None, mtime=None, dev=None, ino=None,
|
|
mode_type=None, mode_perms=None, uid=None, gid=None,
|
|
fsize=None, sha=None, flag_assume_valid=None,
|
|
flag_stage=None, name=None):
|
|
# The last time a file's metadata changed. This is a pair
|
|
# (timestamp in seconds, nanoseconds)
|
|
self.ctime = ctime
|
|
# The last time a file's data changed. This is a pair
|
|
# (timestamp in seconds, nanoseconds)
|
|
self.mtime = mtime
|
|
# The ID of device containing this file
|
|
self.dev = dev
|
|
# The file's inode number
|
|
self.ino = ino
|
|
# The object type, either b1000 (regular), b1010 (symlink),
|
|
# b1110 (gitlink).
|
|
self.mode_type = mode_type
|
|
# The object permissions, and integer
|
|
self.mode_perms = mode_perms
|
|
# User ID of the owner
|
|
self.uid = uid
|
|
# Group ID of the owner
|
|
self.gid = gid
|
|
# Size of this object, in bytes
|
|
self.fsize = fsize
|
|
# The object's SHA
|
|
self.sha = sha
|
|
self.flag_assume_valid = flag_assume_valid
|
|
self.flag_stage = flag_stage
|
|
# Name of the object (full path)
|
|
self.name = name
|
|
|
|
class GitIndex (object):
|
|
version = None
|
|
entries = []
|
|
# ext = None
|
|
# sha = None
|
|
|
|
def __init__(self, version=2, entries=None):
|
|
if not entries:
|
|
entries = list()
|
|
|
|
self.version = version
|
|
self.entries = entries
|
|
|
|
def index_read(repo):
|
|
index_file = GitRepository.repo_file(repo, "index")
|
|
|
|
# New repositories have no index!
|
|
if not os.path.exists(index_file):
|
|
return GitIndex()
|
|
|
|
with open(index_file, 'rb') as f:
|
|
raw = f.read()
|
|
|
|
header = raw[:12]
|
|
signature = header[:4]
|
|
assert signature == b"DIRC" # Stands for DIR Cache"
|
|
version = int.from_bytes(header[4:8], "big")
|
|
assert version == 2, "wyag only supports index file version 2"
|
|
count = int.from_bytes(header[8:12], "big")
|
|
|
|
entries = list()
|
|
|
|
content = raw[12:]
|
|
idx = 0
|
|
for i in range(0, count):
|
|
# Read creation time, as a unix timestamp (seconds since
|
|
# 1970-01-01 00:00:00, the "epoch")
|
|
ctime_s = int.from_bytes(content[idx: idx+4], "big")
|
|
# Read creation time, as nanoseconds after unix seconds
|
|
ctime_ns = int.from_bytes(content[idx+4: idx+8], "big")
|
|
# Modification time, unix timestamp
|
|
mtime_s = int.from_bytes(content[idx+8: idx+12], "big")
|
|
# Modification time, nanoseconds
|
|
mtime_ns = int.from_bytes(content[idx+12: idx+16], "big")
|
|
# Device ID
|
|
dev = int.from_bytes(content[idx+16: idx+20], "big")
|
|
# Inode
|
|
ino = int.from_bytes(content[idx+20: idx+24], "big")
|
|
# Ignored
|
|
unused = int.from_bytes(content[idx+24: idx+26], "big")
|
|
assert 0 == unused
|
|
mode = int.from_bytes(content[idx+26: idx+28], "big")
|
|
mode_type = mode >> 12
|
|
assert mode_type in [0b1000, 0b1010, 0b1110]
|
|
mode_perms = mode & 0b0000000111111111
|
|
# User ID
|
|
uid = int.from_bytes(content[idx+28: idx+32], "big")
|
|
# Group ID
|
|
gid = int.from_bytes(content[idx+32: idx+36], "big")
|
|
# Size
|
|
fsize = int.from_bytes(content[idx+36: idx+40], "big")
|
|
# SHA (object ID). We'll store it as a lowercase hex string
|
|
# for consistency
|
|
sha = format(int.from_bytes(content[idx+40: idx+60], "big"), "040x")
|
|
# Flags we're going to ignore
|
|
flags = int.from_bytes(content[idx+60: idx+62], "big")
|
|
# Parse flags
|
|
flag_assume_valid = (flags & 0b1000000000000000) != 0
|
|
flag_extended = (flags & 0b0100000000000000) != 0
|
|
assert not flag_extended
|
|
flag_stage = flags & 0b0011000000000000
|
|
# Length of the name. This is stored on 12 bits, some max value
|
|
# is 0xFFF, 4095.
|
|
name_length = flags & 0b0000111111111111
|
|
|
|
idx += 62
|
|
|
|
if name_length < 0xFFF:
|
|
assert content[idx + name_length] == 0x00
|
|
raw_name = content[idx:idx + name_length]
|
|
idx += name_length + 1
|
|
else:
|
|
print(f"Notice: Name is 0x{name_length:X} bytes long")
|
|
# TODO: This probably wasn't tested enough. It works with a
|
|
# path of exactly 0xFFF bytes. Any extra bytes broke
|
|
# something between git, my shell and my filesystem
|
|
null_idx = content.find(b'\x00', idx + 0xFFF)
|
|
raw_name = content[idx: null_idx]
|
|
idx = null_idx + 1
|
|
|
|
name = raw_name.decode("utf8")
|
|
|
|
# Data is padded on multiples of eight bytes for pointer
|
|
# alignment, so we skip as many bytes as we need for the next
|
|
# read to start at the right position.
|
|
idx = 8 * ceil(idx / 8)
|
|
|
|
entries.append(GitIndexEntry(ctime=(ctime_s, ctime_ns),
|
|
mtime=(mtime_s, mtime_ns),
|
|
dev=dev,
|
|
ino=ino,
|
|
mode_type=mode_type,
|
|
mode_perms=mode_perms,
|
|
uid=uid,
|
|
gid=gid,
|
|
fsize=fsize,
|
|
sha=sha,
|
|
flag_assume_valid=flag_assume_valid,
|
|
flag_stage=flag_stage,
|
|
name=name))
|
|
|
|
return GitIndex(version=version, entries=entries)
|
|
|
|
argsp = argsubparsers.add_parser("ls-files", help="List all the staged files")
|
|
argsp.add_argument("--verbose", action="store_true", help="Show everything.")
|
|
|
|
def cmd_ls_files(args):
|
|
repo = repo_find()
|
|
index = index_read(repo)
|
|
if args.verbose:
|
|
print(f"Index file format v{index.version}, containing {len(index.entries)} entries.")
|
|
|
|
for e in index.entries:
|
|
print(e.name)
|
|
if args.verbose:
|
|
print(" {} with perms: {:o}".format(
|
|
{0b1000: "regular file",
|
|
0b1010: "symlink",
|
|
0b1110: "git link" }[e.mode_type],
|
|
e.mode_perms))
|
|
print(f" on blob: {e.sha}")
|
|
print(" created: {}.{}, modified: {}.{}".format(
|
|
datetime.fromtimestamp(e.ctime[0]),
|
|
e.ctime[1],
|
|
datetime.fromtimestamp(e.mtime[0]),
|
|
e.mtime[1]))
|
|
print(f" device: {e.dev}, inode: {e.ino}")
|
|
print(f" uid: {e.uid} group: {e.gid}")
|
|
print(f" flags: stage={e.flag_stage} assume_valid={e.flag_assume_valid}")
|
|
|
|
argsp = argsubparsers.add_parser("check-ignore", help="Check path(s) against ignore rules.")
|
|
argsp.add_argument("path", nargs="+", help="Paths to check")
|
|
|
|
def cmd_check_ignore(args):
|
|
repo = repo_find()
|
|
rules = gitignore_read(repo)
|
|
for path in args.path:
|
|
if check_ignore(rules, path):
|
|
print(path)
|
|
|
|
def gitignore_parse1(raw):
|
|
raw = raw.strip()
|
|
|
|
if not raw or raw[0] == "#":
|
|
return None
|
|
elif raw[0] == "!":
|
|
return (raw[1:], False)
|
|
elif raw[0] == "\\":
|
|
return (raw[1:], True)
|
|
else:
|
|
return (raw, True)
|
|
|
|
def gitignore_parse(lines):
|
|
ret = list()
|
|
|
|
for line in lines:
|
|
parsed = gitignore_parse1(line)
|
|
if parsed:
|
|
ret.append(parsed)
|
|
|
|
return ret
|
|
|
|
class GitIgnore(object):
|
|
absolute = None
|
|
scoped = None
|
|
|
|
def __init__(self, absolute, scoped):
|
|
self.absolute = absolute
|
|
self.scoped = scoped
|
|
|
|
def gitignore_read(repo):
|
|
ret = GitIgnore(absolute=list(), scoped=dict())
|
|
|
|
# Read local configuration
|
|
repo_file = join_path(repo.gitdir, "info/exclude")
|
|
if os.path.exists(repo_file):
|
|
with open(repo_file, "r") as f:
|
|
ret.absolute.append(gitignore_parse(f.readlines()))
|
|
|
|
# Global configuration
|
|
if "XDG_CONFIG_HOME" in os.environ:
|
|
config_home == os.environ["XDG_CONFIG_HOME"]
|
|
else:
|
|
config_home = os.path.expanduser("~/.config")
|
|
global_file = join_path(config_home, "git/ignore")
|
|
|
|
if os.path.exists(global_file):
|
|
with open(global_file, "r") as f:
|
|
ret.absolute.append(gitignore_parse(f.readlines()))
|
|
|
|
# .gitignore files in the index
|
|
index = index_read(repo)
|
|
|
|
for entry in index.entries:
|
|
if entry.name == ".gitignore" or entry.name.endswith("/.gitignore"):
|
|
dir_name = os.path.dirname(entry.name)
|
|
contents = object_read(repo, entry.sha)
|
|
lines = contents.blobdata.decode("utf8").splitlines()
|
|
ret.scoped[dirname] = gitignore_parse(lines)
|
|
return ret
|
|
|
|
def check_ignore1(rules, path):
|
|
result = None
|
|
for (pattern, value) in rules:
|
|
if fnmatch(path, pattern):
|
|
result = value
|
|
return result
|
|
|
|
def check_ignore_scoped(rules, path):
|
|
parent = os.path.dirname(path)
|
|
while True:
|
|
if parent in rules:
|
|
result = check_ignore1(rules[parent], path)
|
|
if result != None:
|
|
return result
|
|
if parent == "":
|
|
break
|
|
parent = os.path.dirname(parent)
|
|
return None
|
|
|
|
def check_ignore_absolute(rules, path):
|
|
parent = os.path.dirname(path)
|
|
for ruleset in rules:
|
|
result = check_ignore1(ruleset, path)
|
|
if result != None:
|
|
return result
|
|
return False
|
|
|
|
def check_ignore(rules, path):
|
|
if os.path.isabs(path):
|
|
raise Exception("This function requires path to be relative to the repository's root")
|
|
|
|
# Eh, just hardcode it
|
|
if (path.startswith(".subcommit-git")):
|
|
return True
|
|
|
|
result = check_ignore_scoped(rules.scoped, path)
|
|
if result != None:
|
|
return result
|
|
|
|
return check_ignore_absolute(rules.absolute, path)
|
|
|
|
argsp = argsubparsers.add_parser("status", help="Show the working tree status.")
|
|
|
|
def cmd_status(_):
|
|
repo = repo_find()
|
|
index = index_read(repo)
|
|
|
|
cmd_status_branch(repo)
|
|
cmd_status_head_index(repo, index)
|
|
print()
|
|
cmd_status_index_worktree(repo, index)
|
|
|
|
def branch_get_active(repo):
|
|
with open(GitRepository.repo_file(repo, "HEAD"), "r") as f:
|
|
head = f.read()
|
|
|
|
if head.startswith("ref: refs/heads/"):
|
|
return(head[16:-1])
|
|
else:
|
|
return False
|
|
|
|
def cmd_status_branch(repo):
|
|
branch = branch_get_active(repo)
|
|
if branch:
|
|
print(f"On branch {branch}.")
|
|
else:
|
|
print("HEAD detached at {}".format(object_find(repo, "HEAD")))
|
|
|
|
def tree_to_dict(repo, ref, prefix=""):
|
|
ret = dict()
|
|
tree_sha = object_find(repo, ref, fmt=b"tree")
|
|
tree = object_read(repo, tree_sha)
|
|
|
|
for leaf in tree.items:
|
|
full_path = join_path(prefix, leaf.path)
|
|
|
|
# We read the object to extract its type (this is uselessly
|
|
# expensive: we could just open it as a file and read the
|
|
# first few bytes)
|
|
if (leaf.mode.startswith(b'04')):
|
|
raise Exception("Tree should not be child of tree")
|
|
|
|
is_subcommit = leaf.mode.startswith(b'sc')
|
|
|
|
# Depending on the type, we either store the path (if it's a
|
|
# blob, so a regular file), or recurse (if it's another tree,
|
|
# so a subdir)
|
|
if is_subcommit:
|
|
commit_obj = object_read(repo, leaf.sha)
|
|
tree_sha = commit_obj.kvlm[b'tree'].decode("ascii")
|
|
|
|
ret.update(tree_to_dict(repo, tree_sha, full_path))
|
|
else:
|
|
ret[full_path] = leaf.sha
|
|
|
|
return ret
|
|
|
|
def cmd_status_head_index(repo, index):
|
|
print("Changes to be commited:")
|
|
|
|
head = tree_to_dict(repo, "HEAD")
|
|
for entry in index.entries:
|
|
if entry.name in head:
|
|
if head[entry.name] != entry.sha:
|
|
print(" modified:", entry.name)
|
|
del head[entry.name]
|
|
else:
|
|
print(" added: ", entry.name)
|
|
|
|
# Keys still in HEAD are files that we haven't met in the index,
|
|
# and thus have been deleted
|
|
for entry in head.keys():
|
|
print(" deleted: ", entry)
|
|
|
|
def cmd_status_index_worktree(repo, index):
|
|
print("Changes not staged for commit:")
|
|
|
|
ignore = gitignore_read(repo)
|
|
|
|
gitdir_prefix = repo.gitdir + "/"
|
|
|
|
all_files = list()
|
|
|
|
# We begin by walking the filesystem
|
|
for (root, _, files) in os.walk(repo.worktree, True):
|
|
if root==repo.gitdir or root.startswith(gitdir_prefix):
|
|
continue
|
|
for f in files:
|
|
full_path = join_path(root, f)
|
|
rel_path = os.path.relpath(full_path, repo.worktree).replace("\\", "/")
|
|
all_files.append(rel_path)
|
|
|
|
# We now traverse the index, and compare real files with the cached
|
|
# versions.
|
|
|
|
for entry in index.entries:
|
|
full_path = join_path(repo.worktree, entry.name)
|
|
|
|
# That file *name* is in the index
|
|
|
|
if not os.path.exists(full_path):
|
|
print(" deleted: ", entry.name)
|
|
else:
|
|
stat = os.stat(full_path)
|
|
|
|
# Compare metadata
|
|
ctime_ns = entry.ctime[0] * 10**9 + entry.ctime[1]
|
|
mtime_ns = entry.mtime[0] * 10**9 + entry.mtime[1]
|
|
if (stat.st_ctime_ns != ctime_ns) or (stat.st_mtime_ns != mtime_ns):
|
|
# If different, deep compare.
|
|
# @FIXME This *will* crash on symlinks to dir.
|
|
with open(full_path, "rb") as fd:
|
|
new_sha = object_hash(fd, b"blob", None)
|
|
# If the hashes are the same, the files are actually the same.
|
|
same = entry.sha == new_sha
|
|
|
|
if not same:
|
|
print(" modified:", entry.name)
|
|
|
|
if entry.name in all_files:
|
|
all_files.remove(entry.name)
|
|
|
|
print()
|
|
print("Untracked files:")
|
|
|
|
for f in all_files:
|
|
# @TODO If a full directory is untracked, we should display
|
|
# its name without its contents.
|
|
if not check_ignore(ignore, f):
|
|
print(" ", f)
|
|
|
|
def index_write(repo, index):
|
|
with open(GitRepository.repo_file(repo, "index"), "wb") as f:
|
|
|
|
# Header
|
|
|
|
# Magic bytes which identify the file type.
|
|
f.write(b"DIRC")
|
|
f.write(index.version.to_bytes(4, "big"))
|
|
f.write(len(index.entries).to_bytes(4, "big"))
|
|
|
|
# Entries
|
|
|
|
idx = 0
|
|
for e in index.entries:
|
|
f.write(e.ctime[0].to_bytes(4, "big"))
|
|
f.write(e.ctime[1].to_bytes(4, "big"))
|
|
f.write(e.mtime[0].to_bytes(4, "big"))
|
|
f.write(e.mtime[1].to_bytes(4, "big"))
|
|
f.write((0).to_bytes(4, "big"))
|
|
f.write((0).to_bytes(4, "big"))
|
|
|
|
mode = (e.mode_type << 12) | e.mode_perms
|
|
f.write(mode.to_bytes(4, "big"))
|
|
|
|
f.write(e.uid.to_bytes(4, "big"))
|
|
f.write(e.gid.to_bytes(4, "big"))
|
|
|
|
f.write(e.fsize.to_bytes(4, "big"))
|
|
# FIXME: Convert back to int.
|
|
f.write(int(e.sha, 16).to_bytes(20, "big"))
|
|
|
|
flag_assume_valid = 0x1 << 15 if e.flag_assume_valid else 0
|
|
|
|
name_bytes = e.name.encode("utf8")
|
|
bytes_len = len(name_bytes)
|
|
if bytes_len >= 0xFFF:
|
|
name_length = 0xFFF
|
|
else:
|
|
name_length = bytes_len
|
|
|
|
f.write((flag_assume_valid | e.flag_stage | name_length).to_bytes(2, "big"))
|
|
|
|
f.write(name_bytes)
|
|
# Null-terminate the name string
|
|
f.write((0).to_bytes(1, "big"))
|
|
|
|
idx += 62 + len(name_bytes) + 1
|
|
|
|
# Add padding if necessary
|
|
if idx % 8 != 0:
|
|
pad = 8 - (idx % 8)
|
|
f.write((0).to_bytes(pad, "big"))
|
|
idx += pad
|
|
|
|
argsp = argsubparsers.add_parser("rm", help="Remove files from the working tree and the index.")
|
|
argsp.add_argument("path", nargs="+", help="Files to remove")
|
|
|
|
def cmd_rm(args):
|
|
repo = repo_find()
|
|
rm(repo, args.path)
|
|
|
|
def rm(repo, paths, delete=True, skip_missing=False):
|
|
index = index_read(repo)
|
|
|
|
worktree = repo.worktree.replace("\\", "/") + "/"
|
|
|
|
# Make paths absolute
|
|
abspaths = list()
|
|
for path in paths:
|
|
abspath = os.path.abspath(path).replace("\\", "/")
|
|
if abspath.startswith(worktree):
|
|
abspaths.append(abspath)
|
|
else:
|
|
raise Exception(f"Cannot remove paths outside of worktree: {path}")
|
|
|
|
kept_entries = list()
|
|
remove = list()
|
|
|
|
for e in index.entries:
|
|
full_path = join_path(repo.worktree, e.name)
|
|
|
|
if full_path in abspaths:
|
|
remove.append(full_path)
|
|
abspaths.remove(full_path)
|
|
else:
|
|
kept_entries.append(e)
|
|
|
|
if len(abspaths) > 0 and not skip_missing:
|
|
raise Exception(f"Cannot remove paths not in the index: {abspaths}")
|
|
|
|
if delete:
|
|
for path in remove:
|
|
os.unlink(path)
|
|
|
|
index.entries = kept_entries
|
|
index_write(repo, index)
|
|
|
|
argsp = argsubparsers.add_parser("add", help="Add files' contents to the index.")
|
|
argsp.add_argument("path", nargs="+", help="Files to add")
|
|
|
|
def cmd_add(args):
|
|
repo = repo_find()
|
|
add(repo, args.path)
|
|
|
|
def add(repo, paths, delete=True, skip_missing=False):
|
|
rm(repo, paths, delete=False, skip_missing=True)
|
|
|
|
worktree = repo.worktree + "/"
|
|
|
|
# Convert the paths to pairs: (absolute, relative_to_worktree).
|
|
# Also dlete delete them from the index if they're present.
|
|
clean_paths = list()
|
|
for path in paths:
|
|
abspath = os.path.abspath(path).replace("\\", "/")
|
|
if not (abspath.startswith(worktree) and os.path.isfile(abspath)):
|
|
raise Exception(f"Not a file, or outside the worktree: {paths}")
|
|
relpath = os.path.relpath(abspath, repo.worktree).replace("\\", "/")
|
|
clean_paths.append((abspath, relpath))
|
|
|
|
# Find and read the index. It was modified by rm. (This isn't
|
|
# optimal, but good enough)
|
|
#
|
|
# FIXME: though: we could just move the index through commands instead
|
|
# of reading and writing it over agains
|
|
index = index_read(repo)
|
|
|
|
for (abspath, relpath) in clean_paths:
|
|
with open(abspath, "rb") as fd:
|
|
sha = object_hash(fd, b"blob", repo)
|
|
|
|
stat = os.stat(abspath)
|
|
|
|
ctime_s = int(stat.st_ctime)
|
|
ctime_ns = stat.st_ctime_ns % 10**9
|
|
mtime_s = int(stat.st_mtime)
|
|
mtime_ns = stat.st_mtime_ns % 10**9
|
|
|
|
entry = GitIndexEntry(ctime=(ctime_s, ctime_ns), mtime=(mtime_s, mtime_ns), ino=stat.st_ino,
|
|
mode_type=0b1000, mode_perms=0o644, uid=stat.st_uid, gid=stat.st_gid,
|
|
fsize=stat.st_size, sha=sha, flag_assume_valid=False,
|
|
flag_stage=False, name=relpath)
|
|
index.entries.append(entry)
|
|
|
|
index_write(repo, index)
|
|
|
|
argsp = argsubparsers.add_parser("commit", help="Record changes to the repository.")
|
|
|
|
argsp.add_argument("-m",
|
|
metavar="mesaage",
|
|
dest="message",
|
|
help="Message to associate with this commit.")
|
|
|
|
def gitconfig_read():
|
|
xdg_config_home = os.environ["XDG_CONFIG_HOME"] if "XDG_CONFIG_HOME" in os.environ else "~/.config"
|
|
config_files = [
|
|
os.path.expanduser(join_path(xdg_config_home, "git/config")).replace("\\", "/"),
|
|
#os.path.expanduser("~/.gitconfig").replace("\\", "/")
|
|
]
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read(config_files)
|
|
return config
|
|
|
|
def gitconfig_user_get(config):
|
|
if "user" in config:
|
|
if "name" in config["user"] and "email" in config["user"]:
|
|
return f"{config['user']['name']} <{config['user']['email']}>"
|
|
return None
|
|
|
|
def create_commit_map(repo, commit):
|
|
commit_map = dict()
|
|
|
|
create_commit_map_recurse(repo, commit, commit_map, "")
|
|
|
|
return commit_map
|
|
|
|
def create_commit_map_recurse(repo, commit, commit_map, path):
|
|
"""From a root commit, walk down the tree of subcommits. Returns a dict mapping
|
|
directory paths to (commit hash, commit objects).
|
|
Note: doesn't include root commit."""
|
|
|
|
tree_sha = commit.kvlm[b"tree"].decode('ascii')
|
|
obj = object_read(repo, tree_sha)
|
|
for item in obj.items:
|
|
if len(item.mode) == 5:
|
|
type = item.mode[0:1]
|
|
else:
|
|
type = item.mode[0:2]
|
|
|
|
match type:
|
|
case b'04': raise Exception("Trees should point to subcommits, not other trees")
|
|
case b'10': type = "blob"
|
|
case b'12': type = "blob" # a symlink
|
|
case b'16': type = "commit" # a submodule
|
|
case b'sc': type = "subcommit"
|
|
case _: raise Exception(f"Weird tree leaf mode {item.mode}")
|
|
|
|
if type == "subcommit":
|
|
if path == "":
|
|
fullpath = item.path
|
|
else:
|
|
fullpath = f"{path}/{item.path}"
|
|
subcommit = object_read(repo, item.sha)
|
|
commit_map[fullpath] = (item.sha, subcommit)
|
|
|
|
create_commit_map_recurse(repo, subcommit, commit_map, fullpath)
|
|
|
|
|
|
def tree_from_index(repo, index, commit_map, author, commit_time, message):
|
|
contents = dict()
|
|
contents[""] = list()
|
|
|
|
# Convert entries to dictionary where keys are directories, and values are
|
|
# lists of directory contents.
|
|
for entry in index.entries:
|
|
dirname = os.path.dirname(entry.name).replace("\\", "/")
|
|
|
|
# We create all dictionary entries up to root (""). We need them *all*
|
|
# because even if a directory holds no files it will contain at least
|
|
# a tree.
|
|
key = dirname
|
|
while key != "":
|
|
if not key in contents:
|
|
contents[key] = list()
|
|
key = os.path.dirname(key).replace("\\", "/")
|
|
|
|
contents[dirname].append(entry)
|
|
|
|
# Sort keys (= directories) by length, descending. This means that we'll
|
|
# always encounter a given path before its parent, which is all we need,
|
|
# since for each directory D we'll need to modify its parent P to add
|
|
# D's tree.
|
|
sorted_paths = sorted(contents.keys(), key=len, reverse=True)
|
|
|
|
sha = None
|
|
|
|
for path in sorted_paths:
|
|
tree = GitTree()
|
|
|
|
for entry in contents[path]:
|
|
# An entry can be a normal GitIndexEntry read from the index, or
|
|
# a tree we've created.
|
|
if isinstance(entry, GitIndexEntry):
|
|
leaf_mode = f"{entry.mode_type:02o}{entry.mode_perms:04o}".encode("ascii")
|
|
leaf = GitTreeLeaf(mode=leaf_mode, path=os.path.basename(entry.name), sha=entry.sha)
|
|
else: # Tree. We've stored it as a pair: (basename, SHA)
|
|
leaf = GitTreeLeaf(mode=b"sc0000", path=entry[0], sha=entry[1])
|
|
|
|
tree.items.append(leaf)
|
|
|
|
sha = object_write(tree, repo)
|
|
|
|
if path in commit_map:
|
|
(subcommit_hash, subcommit) = commit_map[path]
|
|
else:
|
|
subcommit_hash = None
|
|
subcommit = None
|
|
|
|
if subcommit == None or subcommit.kvlm[b'tree'] != sha.encode('ascii'):
|
|
new_subcommit = commit_create(repo,
|
|
sha,
|
|
subcommit_hash,
|
|
author,
|
|
commit_time,
|
|
message)
|
|
else:
|
|
new_subcommit = subcommit_hash
|
|
|
|
parent = os.path.dirname(path).replace("\\", "/")
|
|
base = os.path.basename(path)
|
|
contents[parent].append((base, new_subcommit))
|
|
|
|
return sha
|
|
|
|
def commit_create(repo, tree, parent, author, timestamp, message):
|
|
commit = GitCommit()
|
|
commit.kvlm[b"tree"] = tree.encode("ascii")
|
|
if parent:
|
|
commit.kvlm[b'parent'] = parent.encode("ascii")
|
|
|
|
offset = int(timestamp.astimezone().utcoffset().total_seconds())
|
|
hours = offset // 3600
|
|
minutes = (offset & 3600) // 60
|
|
tz = f"{'+' if offset > 0 else '-'}{hours:02}{minutes:02}"
|
|
|
|
if author == None:
|
|
author = ""
|
|
author = author + timestamp.strftime(" %S ") + tz
|
|
|
|
commit.kvlm[b"author"] = author.encode("utf8")
|
|
commit.kvlm[b"committer"] = author.encode("utf8")
|
|
commit.kvlm[None] = message.encode("utf8")
|
|
|
|
return object_write(commit, repo)
|
|
|
|
def cmd_commit(args):
|
|
repo = repo_find()
|
|
index = index_read(repo)
|
|
root_commit_sha = object_find(repo, "HEAD")
|
|
if root_commit_sha:
|
|
root_commit = object_read(repo, root_commit_sha)
|
|
else:
|
|
root_commit = None
|
|
|
|
commit_time = datetime.now()
|
|
author = gitconfig_user_get(gitconfig_read())
|
|
|
|
if root_commit:
|
|
commit_map = create_commit_map(repo, root_commit)
|
|
else:
|
|
commit_map = dict()
|
|
|
|
tree = tree_from_index(repo,
|
|
index,
|
|
commit_map,
|
|
author,
|
|
commit_time,
|
|
args.message)
|
|
|
|
commit = commit_create(repo,
|
|
tree,
|
|
root_commit_sha,
|
|
author,
|
|
commit_time,
|
|
args.message)
|
|
|
|
# Update HEAD so our commit is now the tip of the active branch.
|
|
active_branch = branch_get_active(repo)
|
|
if active_branch: # If on a branch, update that branch
|
|
with open(GitRepository.repo_file(repo, join_path("refs/heads", active_branch)), "w") as fd:
|
|
fd.write(commit + "\n")
|
|
else: # Otherwise, we update HEAD itself.
|
|
with open(repo_file(repo, "HEAD"), "w") as fd:
|
|
fd.write(commit + "\n")
|
|
|
|
argsp = argsubparsers.add_parser("graph-objects", help="Show git objects in a dot graph")
|
|
|
|
argsp.add_argument("object",
|
|
nargs="?",
|
|
metavar="object",
|
|
help="The object the graph will start from")
|
|
|
|
def graph_objects_tree(repo, sha, seen, commit_parents=False):
|
|
if sha in seen:
|
|
return
|
|
seen.add(sha)
|
|
|
|
yield f"t_{sha} [label=\"tree {sha[0:7]}\"]"
|
|
# TODO: inefficient
|
|
obj = object_read(repo, sha)
|
|
|
|
for item in obj.items:
|
|
if len(item.mode) == 5:
|
|
type = item.mode[0:1]
|
|
else:
|
|
type = item.mode[0:2]
|
|
|
|
match type:
|
|
case b'04': type = "tree"
|
|
case b'10': type = "blob"
|
|
case b'12': type = "blob" # a symlink
|
|
case b'16': type = "commit" # a submodule
|
|
case b'sc': type = "subcommit"
|
|
case _: raise Exception(f"Weird tree leaf mode {item.mode}")
|
|
|
|
if type == "tree":
|
|
yield f"t_{sha} -> t_{item.sha}"
|
|
yield from graph_objects_tree(repo, item.sha, seen)
|
|
if type == "commit" or type == "subcommit":
|
|
yield f"t_{sha} -> c_{item.sha}"
|
|
yield from graph_objects(repo, item.sha, seen, commit_parents)
|
|
if type == "blob":
|
|
yield f"b_{item.sha} [label=\"{item.sha[0:7]} {item.path}\"]"
|
|
yield f"t_{sha} -> b_{item.sha}"
|
|
|
|
|
|
def graph_objects(repo, sha, seen, commit_parents=False):
|
|
if sha in seen:
|
|
return
|
|
seen.add(sha)
|
|
|
|
obj = object_read(repo, sha)
|
|
|
|
match obj.fmt:
|
|
case b"commit":
|
|
message = obj.kvlm[None]
|
|
yield f"c_{sha} [label=\"commit {sha[0:7]}\n {message.decode('utf8')}\"]"
|
|
tree_sha = obj.kvlm[b"tree"].decode("ascii")
|
|
yield f"c_{sha} -> t_{tree_sha}"
|
|
yield from graph_objects_tree(repo, tree_sha, seen, commit_parents)
|
|
if commit_parents and b'parent' in obj.kvlm.keys():
|
|
yield f"c_{sha} -> c_{obj.kvlm[b'parent'].decode('ascii')}"
|
|
case b"tree":
|
|
yield from graph_objects_tree(repo, sha, seen, commits_parents)
|
|
|
|
def cmd_graph_objects(args):
|
|
repo = repo_find()
|
|
|
|
if args.object == None:
|
|
obj_name = "HEAD"
|
|
else:
|
|
obj_name = args.object
|
|
|
|
obj_sha = object_find(repo, obj_name)
|
|
graph = "digraph objectgraph{\n"
|
|
graph += " node[shape=rect]\n"
|
|
for str in graph_objects(repo, obj_sha, set()):
|
|
graph += " " + str + "\n"
|
|
graph += "}"
|
|
|
|
print(graph)
|