Skip to content

Instantly share code, notes, and snippets.

@tserong
Last active August 25, 2022 04:16
Show Gist options
  • Save tserong/ce7307860a07ff726618d80a3a7d7093 to your computer and use it in GitHub Desktop.
Save tserong/ce7307860a07ff726618d80a3a7d7093 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
#
# This script sets the osdspec_affinity (i.e. drive group) for the given OSDs
# on the host on which it is executed.
#
# For example, to set osdspec_affinity=foo for OSDs 1, 2 and 3, you'd run:
#
# ./set-osdspec-affinity.py foo 1 2 3
#
# For each OSD specified, it will:
# - Remove the old ceph.osdspec_affinity LVM tag if present from each device
# - Add a new, updated ceph.osdspec_affinity LVM tag to each device
# - Use ceph-bluestore-tool to set the osdspec_affinity label in the OSD
# metadata (this is where the orchestrator actually gets osdspec_affinity
# from, to correlate with defined drive groups)
# To be safe, we're not allowing changes to running OSDs. If an OSD is
# already stopped, we go ahead and make the change. If an OSD is currently
# running, it's skipped, *unless* the --restart flag is specified, in which
# case this script will take care of stopping the OSD first, and starting it
# again once the changes have been made.
#
# Bonus: you can also use this to remove the osdspec_affinity, by passing
# an empty string (e.g. `./set-osdspec-affinity.py '' 1 2 3`), although I'm
# currently struggling to think why anyone would actually want to do that,
# aside from when manually testing the operation of this script.
#
import argparse
import logging
import json
import subprocess
import sys
from typing import Dict, List, Optional, Tuple
from logging import Logger
CEPHADM="/usr/sbin/cephadm"
LVCHANGE="/sbin/lvchange"
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
logger: Logger = logging.getLogger(__name__)
def run(cmd: List) -> Tuple[int, str, str]:
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8")
return proc.returncode, proc.stdout, proc.stderr
def osd_is_active(osd_id: str) -> bool:
rc, _, _ = run([CEPHADM, "unit", "--name", f"osd.{osd_id}", "is-active"])
return rc == 0
def osd_unit_command(osd_id: str, command: str) -> bool:
rc, _, err = run([CEPHADM, "unit", "--name", f"osd.{osd_id}", command])
if rc != 0:
logger.error(err)
return rc == 0
def set_osdspec_affinity_lvm(device: str, old_affinity: str, new_affinity: str) -> bool:
logger.info(f"Deleting LVM tag 'ceph.osdspec_affinity={old_affinity}' from {device}")
rc, _, err = run([LVCHANGE, "--deltag", f"ceph.osdspec_affinity={old_affinity}", device])
if rc != 0:
logger.error(err)
else:
logger.info(f"Adding LVM tag 'ceph.osdspec_affinity={new_affinity}' to {device}")
rc, _, err = run([LVCHANGE, "--addtag", f"ceph.osdspec_affinity={new_affinity}", device])
if rc != 0:
logger.error(err)
return rc == 0
def set_osdspec_affinity_osd(device: str, new_affinity: str, fsid: Optional[str] = None) -> bool:
cmd = ["cephadm", "shell"]
if fsid:
cmd.extend(["--fsid", fsid])
cmd.extend(["ceph-bluestore-tool", "--dev", device])
if new_affinity:
logger.info(f"Setting osdspec_affinity label '{new_affinity}' on {device}")
cmd.extend(["--command", "set-label-key", "-k", "osdspec_affinity", "-v", new_affinity])
rc, _, err = run(cmd)
else:
logger.info(f"Removing osdspec_affinity label from {device}'")
cmd.extend(["--command", "rm-label-key", "-k", "osdspec_affinity"])
rc, _, err = run(cmd)
if rc != 0 and f"key 'osdspec_affinity' not present" in err:
# Fake up a successful return if the key already doesn't exist
rc = 0
if rc != 0:
logger.error(err)
return rc == 0
def set_osdspec_affinity(new_affinity: str, osd_ids: List, restart: bool, fsid: Optional[str] = None) -> bool:
cmd = [CEPHADM, "ceph-volume"]
if fsid:
cmd.extend(["--fsid", fsid])
cmd.extend(["lvm", "list", "--format=json"])
rc, out, err = run(cmd)
if rc != 0:
logger.error(err)
return False
try:
osds = json.loads(out)
except ValueError as e:
logger.error(f"Error parsing `{' '.join(cmd)}` output")
logger.error(out)
logger.error(e)
return False
for osd_id, devices in sorted(osds.items()):
logger.debug(f"OSD {osd_id}:")
for device in devices:
current_affinity = device["tags"]["ceph.osdspec_affinity"] if "ceph.osdspec_affinity" in device["tags"] else ""
logger.debug(f" {device['lv_path']}")
logger.debug(f" type='{device['type']}'")
logger.debug(f" ceph.osdspec_affinity='{current_affinity}'")
for id in osd_ids:
if id in osds:
is_active = osd_is_active(id)
if is_active:
if not restart:
logger.info(f"OSD {id} is active but --restart not specified, skipping")
continue
logger.info(f"Stopping OSD {id}")
if not osd_unit_command(id, "stop"):
logger.error(f"Error stopping OSD {id}, bailing out")
return False
logger.info(f"Updating osdspec_affinity for OSD {id}")
block_device = None
for device in osds[id]:
old_affinity = device["tags"]["ceph.osdspec_affinity"] if "ceph.osdspec_affinity" in device["tags"] else ""
if not set_osdspec_affinity_lvm(device["lv_path"], old_affinity, new_affinity):
logger.error(f"Error updating LVM tags for OSD {id}, bailing out")
return False
if device["type"] == "block":
block_device = device["lv_path"]
if block_device:
if not set_osdspec_affinity_osd(block_device, new_affinity, fsid):
logger.error(f"Error setting osdspec_affinity label for OSD {id}, bailing out")
return False
else:
logger.error(f"OSD {id} has no block device (this should be impossible)")
if is_active:
logger.info(f"Starting OSD {id}")
if not osd_unit_command(id, "start"):
logger.error(f"Error starting OSD {id}")
else:
logger.error(f"error: OSD {id} not found (have {', '.join(sorted(osds.keys()))})")
return False
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Set the osdspec_affinity LVM tag and OSD label for the given OSDs.")
parser.add_argument("osdspec_affinity", help="value to set (i.e. name of OSD Service Spec / Drive Group)")
parser.add_argument("osd_ids", metavar="ID", nargs="+", help="ID of OSD to set osdspec_affinity on")
parser.add_argument("-r", "--restart", help="restart running OSDs", action="store_true")
parser.add_argument("-d", "--debug", help="show debug output", action="store_true")
parser.add_argument("--fsid", help="cluster FSID (usually not necessary to specify)")
args = parser.parse_args()
if args.debug:
logger.setLevel(logging.DEBUG)
if set_osdspec_affinity(args.osdspec_affinity, args.osd_ids, args.restart, args.fsid):
sys.exit(0)
else:
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment