aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFranck Cuny <franck@fcuny.net>2022-10-23 13:20:03 -0700
committerFranck Cuny <franck@fcuny.net>2022-10-23 13:29:37 -0700
commit21e9b655e9e82be8775e9375ce113858dc5791dc (patch)
tree533e9390eef5f09133f8c8687c79a129da29c55b
parentref(tools/git-bootstrap): this is replaced by terraform (diff)
downloadinfra-21e9b655e9e82be8775e9375ce113858dc5791dc.tar.gz
feat(tools/git-broom): CLI to delete local and remote branches
This tool helps to keep only the branches that are relevant: the ones that have not been merged yet into the main branch on the principal remote repository.
-rw-r--r--tools/default.nix1
-rw-r--r--tools/git-broom/default.nix25
-rwxr-xr-xtools/git-broom/git-broom.py350
3 files changed, 376 insertions, 0 deletions
diff --git a/tools/default.nix b/tools/default.nix
index 4078f35..543882f 100644
--- a/tools/default.nix
+++ b/tools/default.nix
@@ -6,5 +6,6 @@ pkgs.lib.makeScope pkgs.newScope (pkgs: {
ipconverter = pkgs.callPackage ./ipconverter { };
seqstat = pkgs.callPackage ./seqstat { };
git-blame-stats = pkgs.callPackage ./git-blame-stats { };
+ git-broom = pkgs.callPackage ./git-broom { };
sendsms = pkgs.callPackage ./sendsms { };
})
diff --git a/tools/git-broom/default.nix b/tools/git-broom/default.nix
new file mode 100644
index 0000000..e25c6ec
--- /dev/null
+++ b/tools/git-broom/default.nix
@@ -0,0 +1,25 @@
+{ self, lib, python3, stdenvNoCC, pkgs }:
+
+stdenvNoCC.mkDerivation rec {
+ pname = "git-broom";
+ src = ./git-broom.py;
+ version = "0.1.0";
+
+ nativeBuildInputs = with pkgs; [ python3 ];
+
+ dontUnpack = true;
+ dontBuild = true;
+
+ installPhase = ''
+ mkdir -p $out/bin
+ cp $src $out/bin/${pname}
+ '';
+
+
+ meta = with pkgs.lib; {
+ description = "CLI to delete local and remote git branches that have been merged.";
+ license = licenses.mit;
+ platforms = platforms.unix;
+ maintainers = [ ];
+ };
+}
diff --git a/tools/git-broom/git-broom.py b/tools/git-broom/git-broom.py
new file mode 100755
index 0000000..8721b3c
--- /dev/null
+++ b/tools/git-broom/git-broom.py
@@ -0,0 +1,350 @@
+#!/usr/bin/env python3
+
+import argparse
+import os
+import re
+import subprocess
+import sys
+from typing import List, Dict
+
+import logging
+
+logging.basicConfig(format="[%(asctime)s]%(levelname)s:%(message)s", level=logging.INFO)
+
+# regular expression to find the name of the main branch on the remote
+re_match_remote_branch = re.compile(r"ref: refs/heads/(?P<branch>\S+)\tHEAD")
+
+# never delete any branches or references with one of these names
+immortal_ref = ["main", "master", "HEAD"]
+
+# that's how my remotes are usually named, and in that order of preference.
+preferred_remotes = ["origin", "github", "work"]
+
+
+class GitConfig(object):
+ """Represent the configuration for the git repository."""
+
+ def __init__(self) -> None:
+ self.guess_remote()
+ self.guess_primary_branch()
+ self.remote_ref = f"{self.remote_name}/{self.primary_branch}"
+ self.me = os.getenv("USER")
+
+ def guess_remote(self) -> None:
+ """Guess the name and URL for the remote repository.
+
+ If the name of the remote is from the list of preferred remote, we
+ return the name and URL.
+
+ If we don't have a remote set, throw an exception.
+ If we don't find any remote, throw an exception.
+ """
+ candidates = subprocess.run(
+ ["git", "config", "--get-regexp", "remote\.[a-z0-9]+.url"],
+ capture_output=True,
+ check=True,
+ encoding="utf-8",
+ ).stdout.splitlines()
+
+ if len(candidates) == 0:
+ raise ValueError("No remote is defined.")
+
+ remotes = dict()
+
+ for candidate in candidates:
+ parts = candidate.split(" ")
+ remote = parts[0].split(".")[1]
+ url = parts[1]
+ remotes[remote] = url
+
+ for remote in preferred_remotes:
+ if remote in remotes:
+ self.remote_name = remote
+ self.remote_url = remotes[remote]
+ return
+
+ raise ValueError("can't find the preferred remote.")
+
+ def guess_primary_branch(self) -> None:
+ """Guess the primary branch on the remote.
+
+ If we can't figure out the default branch, thrown an exception.
+ """
+ remote_head = subprocess.run(
+ ["git", "ls-remote", "--symref", self.remote_name, "HEAD"],
+ capture_output=True,
+ check=True,
+ encoding="utf-8",
+ ).stdout.splitlines()
+
+ for l in remote_head:
+ m = re_match_remote_branch.match(l)
+ if m:
+ self.primary_branch = m.group("branch")
+ return
+
+ raise ValueError(
+ f"can't find the name of the remote branch for {self.remote_name}"
+ )
+
+
+def is_git_repository() -> bool:
+ """Check if we are inside a git repository.
+
+ Return True if we are, false otherwise."""
+ res = subprocess.run(
+ ["git", "rev-parse", "--show-toplevel"], check=False, capture_output=True
+ )
+ return not res.returncode
+
+
+def fetch(remote: str):
+ """Fetch updates from the remote repository."""
+ subprocess.run(["git", "fetch", remote, "--prune"], capture_output=True, check=True)
+
+
+def ref_sha(ref: str) -> str:
+ """Get the sha from a ref."""
+ res = subprocess.run(
+ ["git", "show-ref", ref], capture_output=True, check=True, encoding="utf-8"
+ )
+ return res.stdout.rstrip()
+
+
+def get_branches(options: List[str]) -> List[str]:
+ """Get a list of branches."""
+ return subprocess.run(
+ ["git", "branch", "--format", "%(refname:short)"] + options,
+ capture_output=True,
+ check=True,
+ encoding="utf-8",
+ ).stdout.splitlines()
+
+
+def ref_tree(ref: str) -> str:
+ """Get the reference from a tree."""
+ return subprocess.run(
+ ["git", "rev-parse", f"{ref}^{{tree}}"],
+ check=True,
+ capture_output=True,
+ encoding="utf-8",
+ ).stdout.rstrip()
+
+
+def rebase_local_branches(config: GitConfig, local_rebase_tree_id: dict) -> None:
+ """Try to rebase the local branches that have been not been merged."""
+ for branch in get_branches(["--list", "--no-merged"]):
+ _rebase_local_branch(branch, config, local_rebase_tree_id)
+
+
+def _rebase_local_branch(
+ branch: str, config: GitConfig, local_rebase_tree_id: dict
+) -> None:
+ res = subprocess.run(
+ [
+ "git",
+ "merge-base",
+ "--is-ancestor",
+ config.remote_ref,
+ branch,
+ ],
+ check=False,
+ capture_output=True,
+ )
+ if res.returncode == 0:
+ logging.info(
+ f"local branch {branch} is already a descendant of {config.remote_ref}."
+ )
+ local_rebase_tree_id[branch] = ref_tree(branch)
+ return
+
+ logging.info(f"local branch {branch} will be rebased on {config.remote_ref}.")
+ subprocess.run(
+ ["git", "checkout", "--force", branch], check=True, capture_output=True
+ )
+ res = subprocess.run(
+ ["git", "rebase", config.remote_ref], check=True, capture_output=True
+ )
+ if res.returncode == 0:
+ logging.info(f"local branch {branch} has been rebased")
+ local_rebase_tree_id[branch] = ref_tree(branch)
+ else:
+ logging.error(f"failed to rebase local branch {branch}.")
+ subprocess.run(["git", "rebase", "--abort"], check=True)
+ subprocess.run(
+ ["git", "checkout", "--force", config.primary_branch], check=True
+ )
+ subprocess.run(["git", "reset", "--hard"], check=True)
+
+
+def rebase_remote_branches(
+ config: GitConfig, local_rebase_tree_id: dict, main_sha: str
+) -> None:
+ for branch in get_branches(
+ ["--list", "-r", f"{config.me}/*", "--no-merged", config.remote_ref]
+ ):
+ _rebase_remote_branches(branch, config, local_rebase_tree_id, main_sha)
+
+
+def _rebase_remote_branches(
+ branch: str, config: GitConfig, local_rebase_tree_id: dict, main_sha: str
+) -> None:
+ remote, head = branch.split("/")
+ if head in immortal_ref:
+ return
+
+ res = subprocess.run(
+ ["git", "merge-base", "--is-ancestor", config.remote_ref, branch],
+ check=False,
+ capture_output=True,
+ )
+ if res.returncode == 0:
+ logging.info(
+ f"local branch {branch} is already a descendant of {config.remote_ref}."
+ )
+ return
+
+ logging.info(f"remote branch {branch} will be rebased on {config.remote_ref}.")
+
+ sha = ref_sha(branch)
+ subprocess.run(["git", "checkout", "--force", sha], capture_output=True, check=True)
+ res = subprocess.run(
+ ["git", "rebase", config.remote_ref],
+ capture_output=True,
+ check=True,
+ )
+ if res.returncode == 0:
+ new_sha = ref_sha("--head")
+ short_sha = new_sha[0:8]
+ logging.info(f"remote branch {branch} at {sha} rebased to {new_sha}.")
+ if new_sha == main_sha:
+ logging.info(f"remote branch {branch}, when rebased, is already merged!")
+ logging.info(f"would run `git push {remote} :{head}'")
+ elif new_sha == sha:
+ logging.info(f"remote branch {branch}, when rebased, is unchanged!")
+ elif ref_tree(new_sha) == local_rebase_tree_id.get(head, ""):
+ logging.info(f"remote branch {branch}, when rebased, same as local branch!")
+ logging.info(f"would run `git push --force-with-lease {remote} {head}'")
+ else:
+ logging.info(
+ f"remote branch {branch} has been rebased to create {short_sha}!"
+ )
+ logging.info(
+ f"would run `git push --force-with-lease {remote} {new_sha}:{head}'"
+ )
+ else:
+ logging.error(f"failed to rebase remote branch {branch}.")
+ subprocess.run(["git", "rebase", "--abort"], check=True)
+ subprocess.run(
+ ["git", "checkout", "--force", config.primary_branch], check=True
+ )
+ subprocess.run(["git", "reset", "--hard"], check=True)
+
+
+def destroy_remote_merged_branches(config: GitConfig, dry_run: bool) -> None:
+ """Destroy remote branches that have been merged."""
+ for branch in get_branches(
+ ["--list", "-r", f"{config.me}/*", "--merged", config.remote_ref]
+ ):
+ remote, head = branch.split("/")
+ if head in immortal_ref:
+ continue
+ logging.info(f"remote branch {branch} has been merged")
+ if dry_run:
+ logging.info(f"would have run git push {remote} :{head}")
+ else:
+ subprocess.run(
+ ["git", "push", remote, f":{head}"], check=True, encoding="utf-8"
+ )
+
+
+def destroy_local_merged_branches(config: GitConfig, dry_run: bool) -> None:
+ """Destroy local branches that have been merged."""
+ for branch in get_branches(["--list", "--merged", config.remote_ref]):
+ if branch in immortal_ref:
+ continue
+
+ logging.info(f"local branch {branch} has been merged")
+ if dry_run:
+ logging.info(f"would have run git branch --delete --force {branch}")
+ else:
+ subprocess.run(
+ ["git", "branch", "--delete", "--force", branch],
+ check=True,
+ encoding="utf-8",
+ )
+
+
+def workdir_is_clean() -> bool:
+ """Check the git workdir is clean."""
+ res = subprocess.run(
+ ["git", "status", "--porcelain"],
+ check=True,
+ capture_output=True,
+ encoding="utf-8",
+ ).stdout.splitlines()
+ return not len(res)
+
+
+def main(dry_run: bool) -> bool:
+ if not is_git_repository():
+ logging.error("error: run this inside a git repository")
+ return False
+
+ if not workdir_is_clean():
+ logging.error("the git workdir is not clean, commit or stash your changes.")
+ return False
+
+ config = GitConfig()
+
+ # what's our current sha ?
+ origin_main_sha = ref_sha(config.remote_ref)
+
+ # let's get everything up to date
+ fetch(config.remote_name)
+
+ # let's get the new sha
+ main_sha = ref_sha(config.remote_ref)
+
+ if origin_main_sha != main_sha:
+ logging.info(f"we started with {origin_main_sha} and now we have {main_sha}")
+
+ local_rebase_tree_id: Dict[str, str] = dict()
+
+ # try to rebase local branches that have been not been merged
+ rebase_local_branches(config, local_rebase_tree_id)
+
+ # try to rebase remote branches that have been not been merged
+ rebase_remote_branches(config, local_rebase_tree_id, main_sha)
+
+ # let's checkout to main now and see what left to do
+ subprocess.run(
+ ["git", "checkout", "--force", config.primary_branch],
+ check=True,
+ capture_output=True,
+ )
+
+ # branches on the remote that have been merged can be destroyed.
+ destroy_remote_merged_branches(config, dry_run)
+
+ # local branches that have been merged can be destroyed.
+ destroy_local_merged_branches(config, dry_run)
+
+ # TODO: restore to the branch I was on before ?
+ return True
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description="delete local and remote branches that have been merged."
+ )
+ parser.add_argument(
+ "--dry-run",
+ action=argparse.BooleanOptionalAction,
+ help="when set to True, do not execute the destructive actions",
+ default=True,
+ )
+ args = parser.parse_args()
+
+ if not main(args.dry_run):
+ sys.exit(1)