#!/usr/bin/env python3
import datetime
import hashlib
import json
import os
import click
import requests
# the ID of the vault in 1password
op_vault = "v4mof5qwozyvob2utdk3crwxnu"
vault_addrs = {
"chi1": "https://chi1-vault.simulprod.com:8200",
"ash1": "https://ash1-vault.simulprod.com:8200",
}
creds_cache = os.path.join(os.getenv("HOME") or "", ".local/state/rbxenv")
valid_dcs = ["ash1", "chi1"]
valid_edges = [
"bom1",
"ams1",
"atl1",
"dwf1",
"fra2",
"hkg1",
"iad1",
"lax1",
"lga1",
"lhr1",
"mia1",
"nrt1",
"ord1",
"ord2",
"sjfc1",
]
def _path_cached_file(val: str) -> str:
"""The path to the cache file.
The full path is created using the URL to the vault server.
"""
m = hashlib.sha256()
m.update(bytes(val, "utf-8"))
val = m.hexdigest()
return os.path.join(creds_cache, f"{val}.json")
def get_ldap_password() -> str:
"""Return the LDAP password.
The password is expected to be in 1password, under `LDAP'.
"""
return (
os.popen(f"/usr/local/bin/op read op://{op_vault}/LDAP/password")
.read()
.rstrip()
)
def _get_token_from_cache(addr: str) -> str | None:
"""Return the token from the cache if it is still valid."""
cached_path = _path_cached_file(addr)
if not os.path.isfile(cached_path):
return None
with open(cached_path) as f:
d = json.load(f)
expires_after = datetime.datetime.fromtimestamp(int(d["until"]))
if datetime.datetime.now() > expires_after:
return None
return d["token"]
return None
def _set_token_to_cache(addr: str, token: str, expires_after: datetime.datetime):
"""Set the token in the cache.
The cache also contains the time after which the token won't be valid anymore."""
if not os.path.isdir(creds_cache):
os.makedirs(creds_cache)
cache = {
"until": int(expires_after.timestamp()),
"token": token,
}
cached_path = _path_cached_file(addr)
with open(cached_path, "w") as f:
json.dump(cache, f)
def _vault_login(addr: str) -> str:
"""Log into vault to get a token.
If we get a token, we store it in the cache so we don't need to request it again until it expires.
"""
ldap_username = os.getenv("USER")
ldap_password = get_ldap_password()
url = "{}/v1/auth/ldap/login/{}".format(addr, ldap_username)
obj = {"method": "push", "password": ldap_password}
try:
resp = requests.post(url, json=obj)
resp.raise_for_status()
except Exception as e:
print("{} returned {}".format(url, str(e)))
expires_after = datetime.datetime.now() + datetime.timedelta(
seconds=resp.json()["auth"]["lease_duration"]
)
token = resp.json()["auth"]["client_token"]
_set_token_to_cache(addr, token, expires_after)
return token
def get_vault_token(addr: str) -> str:
"""Get the token for vault."""
token = _get_token_from_cache(addr)
if token is None:
token = _vault_login(addr)
return token
def vault_read(path: str, addr: str, token: str, dc: str) -> str:
"""Read some values from a path in vault."""
url = "{}/v1/{}".format(addr, path)
headers = {"x-vault-token": token}
try:
resp = requests.get(url, headers=headers)
resp.raise_for_status()
except Exception as e:
print("{} returned {}".format(url, str(e)))
return resp.json()["data"][dc]
@click.command()
@click.argument("dc")
def setpop(dc: str):
"""Print some information regarding hashi stack in a POP."""
if dc not in valid_edges:
print(f"{dc} is not a valid edge location")
return
token = get_vault_token(vault_addrs["chi1"])
consul_token = vault_read(
"secret/teams/neteng/traffic/consul",
addr=vault_addrs["chi1"],
token=token,
dc=dc,
)
nomad_token = vault_read(
"secret/teams/neteng/traffic/nomad",
addr=vault_addrs["chi1"],
token=token,
dc=dc,
)
vault_token = vault_read(
"secret/teams/neteng/traffic/vault",
addr=vault_addrs["chi1"],
token=token,
dc=dc,
)
print(f"consul token: {consul_token}")
print(f"nomad token: {nomad_token}")
print(f"vault token: {vault_token}")
print(f"https://{dc}-vault.simulprod.com/ui/vault/auth?with=token")
@click.command()
@click.argument("dc")
def setenv(dc: str):
"""Print environment variables for the URL and tokens to various components.
This command can be passed to `export` in order to export variables.
For example:
```
export (robloxenv setenv chi1)
```
"""
if dc not in valid_dcs:
print(f"{dc} is not a valid data center")
return
if dc not in vault_addrs:
print(f"{dc} doesn't have an associated vault address")
token = get_vault_token(vault_addrs[dc])
print(f"VAULT_ADDR={vault_addrs[dc]}")
print(f"VAULT_TOKEN={token}")
@click.group(help="CLI tool to manage environment variables for hashi things at Roblox")
def cli():
pass
cli.add_command(setpop)
cli.add_command(setenv)
if __name__ == "__main__":
cli()