Last active
April 26, 2024 05:12
-
-
Save kishimotonico/54768276769544b4845f5102de554109 to your computer and use it in GitHub Desktop.
LinuxでSSH接続用のユーザを作成するユーティリティ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
# linux-useradd-util.py: LinuxでSSH接続できるユーザを作成するユーティリティ | |
## 使い方 | |
ユーザの追加のため適切な権限が必要な点に注意してください。 | |
また、実行前にスクリプトが問題ないことを確認してください。 | |
```bash | |
# simple usage | |
sudo python3 linux-useradd-util.py username --password password | |
# with group & public key | |
sudo python3 linux-useradd-util.py username --password password --groups wheel --public-key-url https://github.com/kishimotonico.keys | |
``` | |
""" | |
import subprocess | |
import sys | |
from logging import getLogger | |
from pathlib import Path | |
from typing import List, Union, Optional | |
import urllib.request | |
logger = getLogger(__name__) | |
def _commandExec( | |
command: Union[str, List[str]], | |
checkReturn: bool = True, | |
dontLogging: Union[str, bool] = False, | |
shell: bool = False, | |
) -> subprocess.CompletedProcess: | |
if isinstance(command, list): | |
loggingCommand = " ".join(command) | |
else: | |
loggingCommand = command | |
if isinstance(dontLogging, str): | |
loggingCommand = loggingCommand.replace(dontLogging, "******") | |
elif dontLogging: | |
loggingCommand = "*************************" | |
logger.info(f"$ {loggingCommand}") | |
if sys.version_info >= (3, 7): | |
process = subprocess.run(command, capture_output=True, text=True, shell=shell) | |
else: | |
# alternative for python 3.6 | |
process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell) | |
logger.debug(f"(stdout){process.stdout}") | |
if checkReturn and process.returncode != 0: | |
logger.error(f"(stderr){process.stderr}") | |
raise Exception(f"command '{loggingCommand}' failed") | |
return process | |
def _useraddExec( | |
username: str, | |
uid: Optional[int] = None, | |
gid: Optional[int] = None, | |
groups: Optional[List[str]] = None, | |
home: Optional[str] = None, | |
shell: Optional[str] = None, | |
makeHome: bool = True, | |
) -> None: | |
command = ["useradd", username] | |
if uid is not None: | |
command += ["-u", str(uid)] | |
if gid is not None: | |
command += ["-g", str(gid)] | |
if groups is not None: | |
command += ["-G", ",".join(groups)] | |
if home is not None: | |
command += ["-d", home] | |
if shell is not None: | |
command += ["-s", shell] | |
if makeHome: | |
command += ["-m"] | |
else: | |
command += ["-M"] | |
_commandExec(command) | |
def _passwdExec(username: str, password: str) -> None: | |
_commandExec(f"echo {username}:{password} | chpasswd", dontLogging=password, shell=True) | |
def _isUserExists(username: str) -> bool: | |
p = _commandExec(["id", username], False) | |
return p.returncode == 0 | |
def _downloadFile(url: str, path: Path) -> None: | |
with urllib.request.urlopen(url) as response: | |
with path.open("wb") as f: | |
f.write(response.read()) | |
def createUser( | |
username: str, | |
groups: List[str] = [], | |
password: Optional[str] = None, | |
sshPublicKeyUrl: Optional[str] = None, | |
groupOwn: bool = False, | |
existsOk: bool = False, | |
) -> None: | |
""" | |
SSH接続できるユーザを作成する | |
""" | |
if _isUserExists(username): | |
if existsOk: | |
logger.warning("skipping useradd because user already exists") | |
else: | |
raise Exception(f"user '{username}' already exists") | |
_useraddExec(username, groups=groups) | |
homePath = f"/home/{username}" | |
owner = f"{username}:{username}" | |
if groupOwn and groups: | |
# ユーザのグループを所有者にする | |
owner = f"{username}:{groups[0]}" | |
_commandExec(["chown", "-R", owner, homePath]) | |
if password: | |
_passwdExec(username, password) | |
if sshPublicKeyUrl: | |
sshDirectory = f"/home/{username}/.ssh" | |
_commandExec(["mkdir", "-p", sshDirectory]) | |
_commandExec(["chown", owner, sshDirectory]) | |
_commandExec(["chmod", "700", sshDirectory]) | |
authorizedKeys = f"/home/{username}/.ssh/authorized_keys" | |
_downloadFile(sshPublicKeyUrl, Path(authorizedKeys)) | |
_commandExec(["chown", owner, authorizedKeys]) | |
_commandExec(["chmod", "600", authorizedKeys]) | |
# for CLI | |
if __name__ == "__main__": | |
import argparse | |
from logging import StreamHandler, DEBUG, INFO, WARNING, ERROR | |
parser = argparse.ArgumentParser() | |
parser.add_argument("username", help="ユーザ名") | |
parser.add_argument("-g", "--groups", help="グループ名", nargs="+", default=[]) | |
parser.add_argument("-p", "--password", help="パスワード") | |
parser.add_argument("--public-key-url", help="SSH公開鍵のURL") | |
parser.add_argument("--group-own", help="-gオプションで最初に指定したグループを、ユーザのホームディレクトリの所有者にする。-gオプション未指定時は無視される", action="store_true", default=False) | |
parser.add_argument("-q", "--quiet", help="ログを出力しない", action="count" ,default=0) | |
parser.add_argument("-v", "--verbose", help="ログを詳細に出力する", action="count", default=0) | |
args = parser.parse_args() | |
logger.setLevel(DEBUG) | |
streamHandler = StreamHandler(sys.stdout) | |
streamHandler.setLevel(INFO) | |
if args.verbose > 0: | |
streamHandler.setLevel(DEBUG) | |
if args.quiet == 1: | |
streamHandler.setLevel(WARNING) | |
if args.quiet >= 2: | |
streamHandler.setLevel(ERROR) | |
logger.addHandler(streamHandler) | |
try: | |
createUser( | |
args.username, | |
args.groups, | |
args.password, | |
args.public_key_url, | |
args.group_own, | |
) | |
exit(0) | |
except Exception as e: | |
logger.exception(e) | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment