aboutsummaryrefslogblamecommitdiff
path: root/src/cli/hashienv.py
blob: 57708a843be819f7490247ee5aca398f4f23ba1e (plain) (tree)
1
2
3
4
5
6

                      
         


               






                                       




                                                    


                                                                          
                            
               
           






























                                                               
                                                                                            


















































                                                                                                      
                                                                                                               
































                                                                 

                                                   
 
                                                
                              



                                             

                             



                                            

                             



                                            







                                                                       





















                                                                                





                                                                                        
                       


                          
#!/usr/bin/env python3

import os
import json
import hashlib
import datetime

import click
import requests

# the ID of the vault in 1password
op_vault = "v4mof5qwozyvob2utdk3crwxnu"

vault_addrs = {
    "chi1": "https://chi1-vault.simulprod.com:8200",
    "ash1": "https://ash1-vault.simulprod.com:8200",
}

creds_cache = os.path.join(os.getenv("HOME") or "", ".local/state/rbxenv")


valid_dcs = ["ash1", "chi1"]
valid_edges = [
    "bom1",
    "ams1",
    "atl1",
    "dwf1",
    "fra2",
    "hkg1",
    "iad1",
    "lax1",
    "lga1",
    "lhr1",
    "mia1",
    "nrt1",
    "ord1",
    "ord2",
    "sjfc1",
]


def _path_cached_file(val: str) -> str:
    """The path to the cache file.
    The full path is created using the URL to the vault server.
    """
    m = hashlib.sha256()
    m.update(bytes(val, "utf-8"))
    val = m.hexdigest()
    return os.path.join(creds_cache, f"{val}.json")


def get_ldap_password() -> str:
    """Return the LDAP password.
    The password is expected to be in 1password, under `LDAP'.
    """
    return os.popen(f"/usr/local/bin/op read op://{op_vault}/LDAP/password").read().rstrip()


def _get_token_from_cache(addr: str) -> str | None:
    """Return the token from the cache if it is still valid."""
    cached_path = _path_cached_file(addr)
    if not os.path.isfile(cached_path):
        return None

    with open(cached_path) as f:
        d = json.load(f)
        expires_after = datetime.datetime.fromtimestamp(int(d["until"]))
        if datetime.datetime.now() > expires_after:
            return None
        return d["token"]

    return None


def _set_token_to_cache(addr: str, token: str, expires_after: datetime.datetime):
    """Set the token in the cache.
    The cache also contains the time after which the token won't be valid anymore."""
    if not os.path.isdir(creds_cache):
        os.makedirs(creds_cache)

    cache = {
        "until": int(expires_after.timestamp()),
        "token": token,
    }

    cached_path = _path_cached_file(addr)

    with open(cached_path, "w") as f:
        json.dump(cache, f)


def _vault_login(addr: str) -> str:
    """Log into vault to get a token.
    If we get a token, we store it in the cache so we don't need to request it again until it expires.
    """
    ldap_username = os.getenv("USER")
    ldap_password = get_ldap_password()

    url = "{}/v1/auth/ldap/login/{}".format(addr, ldap_username)
    obj = {"method": "push", "password": ldap_password}

    try:
        resp = requests.post(url, json=obj)
        resp.raise_for_status()
    except Exception as e:
        print("{} returned {}".format(url, str(e)))

    expires_after = datetime.datetime.now() + datetime.timedelta(seconds=resp.json()["auth"]["lease_duration"])
    token = resp.json()["auth"]["client_token"]

    _set_token_to_cache(addr, token, expires_after)

    return token


def get_vault_token(addr: str) -> str:
    """Get the token for vault."""
    token = _get_token_from_cache(addr)
    if token is None:
        token = _vault_login(addr)
    return token


def vault_read(path: str, addr: str, token: str, dc: str) -> str:
    """Read some values from a path in vault."""
    url = "{}/v1/{}".format(addr, path)
    headers = {"x-vault-token": token}
    try:
        resp = requests.get(url, headers=headers)
        resp.raise_for_status()
    except Exception as e:
        print("{} returned {}".format(url, str(e)))

    return resp.json()["data"][dc]


@click.command()
@click.argument("dc")
def setpop(dc: str):
    """Print some information regarding hashi stack in a POP."""
    if dc not in valid_edges:
        print(f"{dc} is not a valid edge location")
        return

    token = get_vault_token(vault_addrs["chi1"])
    consul_token = vault_read(
        "secret/teams/neteng/traffic/consul",
        addr=vault_addrs["chi1"],
        token=token,
        dc=dc,
    )
    nomad_token = vault_read(
        "secret/teams/neteng/traffic/nomad",
        addr=vault_addrs["chi1"],
        token=token,
        dc=dc,
    )
    vault_token = vault_read(
        "secret/teams/neteng/traffic/vault",
        addr=vault_addrs["chi1"],
        token=token,
        dc=dc,
    )

    print(f"consul token: {consul_token}")
    print(f"nomad token: {nomad_token}")
    print(f"vault token: {vault_token}")
    print(f"https://{dc}-vault.simulprod.com/ui/vault/auth?with=token")


@click.command()
@click.argument("dc")
def setenv(dc: str):
    """Print environment variables for the URL and tokens to various components.
    This command can be passed to `export` in order to export variables.
    For example:
    ```
    export (robloxenv setenv chi1)
    ```
    """
    if dc not in valid_dcs:
        print(f"{dc} is not a valid data center")
        return

    if dc not in vault_addrs:
        print(f"{dc} doesn't have an associated vault address")

    token = get_vault_token(vault_addrs[dc])
    print(f"VAULT_ADDR={vault_addrs[dc]}")
    print(f"VAULT_TOKEN={token}")


@click.group(help="CLI tool to manage environment variables for hashi things at Roblox")
def cli():
    pass


cli.add_command(setpop)
cli.add_command(setenv)

if __name__ == "__main__":
    cli()