import argparse import collections import configparser from datetime import datetime from fnmatch import fnmatch import hashlib from math import ceil import os import re import sys import zlib argparser = argparse.ArgumentParser(description="The stupidest content tracker") argsubparsers = argparser.add_subparsers(title="Commands", dest="command") argsubparsers.required = True def main(argv=sys.argv[1:]): args = argparser.parse_args(argv) match args.command: case "add" : cmd_add(args) case "cat-file" : cmd_cat_file(args) case "check-ignore" : cmd_check_ignore(args) case "checkout" : cmd_checkout(args) case "commit" : cmd_commit(args) case "hash-object" : cmd_hash_object(args) case "init" : cmd_init(args) case "log" : cmd_log(args) case "ls-files" : cmd_ls_files(args) case "ls-tree" : cmd_ls_tree(args) case "rev-parse" : cmd_rev_parse(args) case "rm" : cmd_rm(args) case "show-ref" : cmd_show_ref(args) case "status" : cmd_status(args) case "tag" : cmd_tag(args) case _ : print("Bad command.") class GitRepository (object): """A git repo""" worktree = None gitdir = None conf = None def __init__(self, path, force=False): self.worktree = path self.gitdir = os.path.join(path, ".git") if not (force or os.path.isdir(self.gitdir)): raise Exception(f"Not a git repository {path}") # Read configuration file in .git/config self.conf = configparser.ConfigParser() cf = GitRepository.repo_file(self, "config") if cf and os.path.exists(cf): self.conf.read([cf]) elif not force: raise Exception("Configuration file missing") if not force: vers = int(self.conf.get("core", "repositoryformatversion")) if vers != 0: raise Exception(f"Unsupported repositoryformatversion {vers}") def repo_path(repo, *path): """Compute path under repo's gitdir.""" return os.path.join(repo.gitdir, *path) def repo_file(repo, *path, mkdir=False): """Same as repo_path, but create dirname(*path) if absent. For example, repo_file(r, \"refs\", \"remotes\", \"origin\", \"HEAD\") will create .git/refs/remotes/origin.""" if GitRepository.repo_dir(repo, *path[:-1], mkdir=mkdir): return GitRepository.repo_path(repo, *path) def repo_dir(repo, *path, mkdir=False): """Same as repo_path, bt mkdir *path if absent if mkdir""" path = GitRepository.repo_path(repo, *path) if (os.path.exists(path)): if (os.path.isdir(path)): return path else: raise Exception(f"Not a directory {path}") if mkdir: os.makedirs(path) return path else: return None def repo_create(path): """Create a new repository at path.""" repo = GitRepository(path, True) # First, we make sure the path either doesn't exist or is an # empty dir. if os.path.exists(repo.worktree): if not os.path.isdir(repo.worktree): raise Exception(f"{path} is not a directory!") if os.path.exists(repo.gitdir) and os.listdir(repo.gitdir): raise Exception(f"{path} is not empty!") else: os.makedirs(repo.worktree) assert GitRepository.repo_dir(repo, "branches", mkdir=True) assert GitRepository.repo_dir(repo, "objects", mkdir=True) assert GitRepository.repo_dir(repo, "refs", "tags", mkdir=True) assert GitRepository.repo_dir(repo, "refs", "heads", mkdir=True) # .git/description with open(GitRepository.repo_file(repo, "description"), "w") as f: f.write("Unnamed repository; edit this file 'description' to name the repository") with open(GitRepository.repo_file(repo, "HEAD"), "w") as f: f.write("ref: refs/heads/master\n") with open(GitRepository.repo_file(repo, "config"), "w") as f: config = repo_default_config() config.write(f) return repo def repo_default_config(): ret = configparser.ConfigParser() ret.add_section("core") ret.set("core", "repositoryformatVersion", "0") ret.set("core", "filemode", "false") ret.set("core", "bare", "false") return ret argsp = argsubparsers.add_parser("init", help="Initialize a new, empty repository.") argsp.add_argument("path", metavar="directory", nargs="?", default=".", help="Where to create the repository.") def cmd_init(args): repo_create(args.path) def repo_find(path=".", required=True): path = os.path.realpath(path) if os.path.isdir(os.path.join(path, ".git")): return GitRepository(path) # If we haven't returned, recurse in parent parent = os.path.realpath(os.path.join(path, "..")) match os.name: case "nt": is_root = os.path.splitdrive(path)[1] == "\\" case "posix": # If parent==path, then path is root. is_root = parent == path case _: raise Exception(f"Unsupported os {os.name}") if is_root: if required: raise Exception("No git directory.") else: return None # Recursive case return repo_find(parent, required) argsp = argsubparsers.add_parser("status", help="Get the status of the repo") argsp.add_argument("path", metavar="directory", nargs="?", default=".", help="Which directory") def cmd_status(args): # TODO: actually get status print(repo_find(args.path).worktree) class GitObject (object): def __init__(self, data=None): if data != None: self.deserialize(data) else: self.init() def serialize(self, repo): """This function MUST be implemented by subclasses. It must read the object's contents from self.data, a byte string, and do whatever it takes to convert it into a meaningful representation. What exactly that means depends on each subclass.""" raise Exception("Unimplemented!") def deserialize(self, repo): raise Exception("Unimplemented!") def init(self): pass # other implementations might do something here def object_read(repo, sha): """Read object sha from Git repository repo. Return a GitObject whose exact type depends on the object.""" path = GitRepository.repo_file(repo, "objects", sha[0:2], sha[2:]) if not os.path.isfile(path): return None with open(path, "rb") as f: raw = zlib.decompress(f.read()) # Read object type x = raw.find(b' ') fmt = raw[0:x] # Read and validate object size y = raw.find(b'\x00', x) size = int(raw[x:y].decode("ascii")) if size != len(raw)-y-1: raise Exception(f"Malformed object {sha}: bad length") match fmt: case b'commit' : c=GitCommit case b'tree' : c=GitTree case b'tag' : c=GitTag case b'blob' : c=GitBlob case _: raise Exception(f"Unknown type {fmt.decode('ascii')} for object {sha}") return c(raw[y+1:]) def object_write(obj, repo=None): data = obj.serialize() # Add header result = obj.fmt + b' ' + str(len(data)).encode() + b'\x00' + data # Compute hash sha = hashlib.sha1(result).hexdigest() if repo: # Compute path path=GitRepository.repo_file(repo, "objects", sha[0:2], sha[2:], mkdir=True) if not os.path.exists(path): with open(path, 'wb') as f: f.write(zlib.compress(result)) return sha class GitBlob(GitObject): fmt=b'blob' def serialize(self): return self.blobdata def deserialize(self, data): self.blobdata = data argsp = argsubparsers.add_parser("cat-file", help="Provide content of repository objects") argsp.add_argument("type", metavar="type", choices=["blob", "commit", "tag", "tree"], help="Specify the type") argsp.add_argument("object", metavar="object", help="The object to display") def cmd_cat_file(args): repo = repo_find() cat_file(repo, args.object, fmt=args.type.encode()) def cat_file(repo, obj, fmt=None): obj = object_read(repo, object_find(repo, obj, fmt=fmt)) sys.stdout.buffer.write(obj.serialize()) def object_find(repo, name, fmt=None, follow=True): return name argsp = argsubparsers.add_parser( "hash-object", help="Compute object ID an optionally creates a blob from a file") argsp.add_argument("-t", metavar="type", dest="type", choices=["blob", "commit", "tag", "tree"], default="blob", help="Specify the type") argsp.add_argument("-w", dest="write", action="store_true", help="Actually write the object into the database") argsp.add_argument("path", help="Read object from ") def cmd_hash_object(args): if args.write: repo = repo_find() else: repo = None with open(args.path, "rb") as fd: sha = object_hash(fd, args.type.encode(), repo) print(sha) def object_hash(fd, fmt, repo=None): """Hash object, writing it to repo if provided""" data = fd.read() match fmt: case b'commit' : obj=GitCommit(data) case b'tree' : obj=GitTree(data) case b'tag' : obj=GitTag(data) case b'blob' : obj=GitBlob(data) case _ : raise Exception(f"Unknown type {fmt}") return object_write(obj, repo) def kvlm_parse(raw, start=0, dct=None): if not dct: dct = collections.OrderedDict() # You CANNOT declare the argument as dct=OrderedDict() or all # call to the functions will endlessly grow the same dict. # This function is recursive: it reads a key/value pair, then call # itself back with the new position. So we first need to know # where we are: at a keyword, or already in the messageQ # We search for the next space and the next newline. spc = raw.find(b' ', start) nl = raw.find(b'\n', start) # If space appears before newline, we have a keyword. Otherwise, # it's the final message, which we just read to the end of the file. # Base case # ========= # # If newline appears first (or there's no space at all), we asume # a blank line. A blank line means the remainder of the data is the # message. We store it in the dictionary, with None as the key, and # return. if (spc < 0) or (nl < spc): assert nl == start dct[None] = raw[start+1:] return dct # Recursive case # ============== # # We read a key-value pair and recurse for the next. key = raw[start:spc] # Find the end of the value. Continuation lines begin with a # space, so we loop until we find a "\n" not followed by a space. end = start while True: end = raw.find(b'\n', end+1) if raw[end+1] != ord(' '): break value = raw[spc+1:end].replace(b'\n ', b'\n') # Don't overwrite existing data contents if key in dct: if type(dct[key]) == list: dct[key].append(value) else: dct[key] = [ dct[key], value ] else: dct[key] = value return kvlm_parse(raw, start=end+1, dct=dct) def kvlm_serialize(kvlm): ret = b'' # Output fiels for k in kvlm.keys(): # Skip the message itself if k == None: continue val = kvlm[k] # Normalize to a list if type(val) != list: val = [ val ] for v in val: ret += k + b' ' + (v.replace(b'\n', b'\n ')) + b'\n' ret += b'\n' + kvlm[None] + b'\n' return ret class GitCommit(GitObject): fmt = b'commit' def deserialize(self, data): self.kvlm = kvlm_parse(data) def serialize(self): return kvlm_serialize(self.kvlm) def init(self): self.kvlm = dict() argsp = argsubparsers.add_parser("log", help="Display history of a given commit.") argsp.add_argument("commit", default="HEAD", nargs="?", help="Commit to start at.") def cmd_log(args): repo = repo_find() print("digraph wyaglog{") print(" node[shape=rect]") log_graphviz(repo, object_find(repo, args.commit), set()) print("}") def log_graphviz(repo, sha, seen): if sha in seen: return seen.add(sha) commit = object_read(repo, sha) short_hash = sha[0:8] message = commit.kvlm[None].decode("utf8").strip() message = message.replace("\\", "\\\\") message = message.replace("\"", "\\\"") if "\n" in message: # keep only the first line message = message[:message.index("\n")] print(f" c_{sha} [label=\"{sha[0:7]}: {message}\"]") assert commit.fmt==b'commit' if not b'parent' in commit.kvlm.keys(): # Base case: the initial commit. return parents = commit.kvlm[b'parent'] if type(parents) != list: parents = [ parents ] for p in parents: p = p.decode("ascii") print (f" c_{sha} -> c_{p}") log_graphviz(repo, p, seen) class GitTreeLeaf (object): def __init__(self, mode, path, sha): self.mode = mode self.path = path self.sha = sha def tree_parse_one(raw, start=0): # Find the space terminator of the mode x = raw.find(b' ', start) assert x-start == 5 or x-start == 6 mode = raw[start:x] if len(mode) == 5: # Normalize to six bytes. mode = b"0" + mode # Find the NULL terminator of the path y = raw.find(b'\x00', x) path = raw[x+1:y] sha = format(int.from_bytes(raw[y+1:y+21], "big"), "040x") return y+21, GitTreeLeaf(mode, path.decode('utf8'), sha) def tree_parse(raw): pos = 0 max = len(raw) ret = list() while pos < max: pos, data = tree_parse_one(raw, pos) ret.append(data) return ret # Notice: this isn't a comparison function, but a conversion function. # Python's default sort doesn't accept a custom comparison function, # like in most languages, but a 'key' argument that returns a new # value, which is compared using the default rules. So we just return # the leaf name, with an extra '/' if it's a directory. def tree_leaf_sort_key(leaf): if leaf.mode.startswith(b"10"): return leaf.path else: return leaf.path + "/" def tree_serialize(obj): obj.items.sort(key=tree_leaf_sort_key) ret = b'' for i in obj.items: ret += i.mode ret += b' ' ret += i.path.encode("utf8") ret += b'\x00' sha = int(i.sha, 16) ret += sha.to_bytes(20, byteorder="big") return ret class GitTree(GitObject): fmt=b'tree' def deserialize(self, data): self.items = tree_parse(data) def serialize(self): return tree_serialize(self) def init(self): self.items = list() argsp = argsubparsers.add_parser("ls-tree", help="Pretty-print a tree object.") argsp.add_argument("-r", dest="recursive", action="store_true", help="Recurse into sub-trees") argsp.add_argument("tree", help="A tree-ish object.") def cmd_ls_tree(args): repo = repo_find() ls_tree(repo, args.tree, args.recursive) def ls_tree(repo, ref, recursive=None, prefix=""): sha = object_find(repo, ref, fmt=b"tree") obj = object_read(repo, sha) for item in obj.items: if len(item.mode) == 5: type = item.mode[0:1] else: type = item.mode[0:2] match type: case b'04': type = "tree" case b'10': type = "blob" case b'12': type = "blob" # a symlink case b'16': type = "commit" # a submodule case _: raise Exception(f"Weird tree leaf mode {item.mode}") if not (recursive and type=='tree'): # This is a leaf print("{0} {1} {2}\t{3}".format( "0" * (6 - len(item.mode)) + item.mode.decode("ascii"), # Git's ls-tree displays the type # of the object pointed to. type, item.sha, os.path.join(prefix, item.path) )) else: # This is a branch (vs. leaf), recurse ls_tree(repo, item.sha, recursive, os.path.join(prefix, item.path)) argsp = argsubparsers.add_parser("checkout", help="Checkout a commit inside of a directory.") argsp.add_argument("commit", help="The commit or tree to checkout.") argsp.add_argument("path", help="The EMPTY directory to checkout on.") def cmd_checkout(args): repo = repo_find() obj = object_read(repo, object_find(repo, args.commit)) # If the object is a commit, we grab its tree if obj.fmt == b'commit': obj = object_read(repo, obj.kvlm[b'tree'].decode("ascii")) # Verify the path is an empty directory if os.path.exists(args.path): if not os.path.isdir(args.path): raise Exception(f"This is not a directory: '{ags.path}'") if os.listdir(args.path): raise Exception(f"Directory '{args.path}' is not empty") else: os.makedirs(args.path) tree_checkout(repo, obj, os.path.realpath(args.path)) def tree_checkout(repo, tree, path): for item in tree.items: obj = object_read(repo, item.sha) dest = os.path.join(path, item.path) if obj.fmt == b'tree': os.mkdir(dest) tree_checkout(repo, obj, dest) elif obj.fmt == b'blob': # TODO: Support symlinks (identified by mode 12*****) with open(dest, 'wb') as f: f.write(obj.blobdata)