Skip to content

Instantly share code, notes, and snippets.

@bitmingw
Last active June 11, 2020 02:36
Show Gist options
  • Save bitmingw/212f941207570a3a19de4d2d594b9ed1 to your computer and use it in GitHub Desktop.
Save bitmingw/212f941207570a3a19de4d2d594b9ed1 to your computer and use it in GitHub Desktop.
Scripts to increase system CPU, memory and fill up the disk in linux. Tested on ubuntu system.
#!/usr/bin/env python3
from argparse import ArgumentParser
from datetime import datetime, timedelta
from multiprocessing import Process
import random
def work(t):
current_time = datetime.now()
delta = timedelta(seconds=t)
exit_time = current_time + delta
while datetime.now() <= exit_time:
res = random.randint(2, 10)
for i in range(1000000):
res *= random.randint(2, 10)
res /= random.randint(2, 10)
def main():
parser = ArgumentParser()
parser.add_argument("-p", "--process",
type=int, help="Number of processes", required=True)
parser.add_argument("-t", "--time",
type=int, help="Running time in seconds", required=True)
args = parser.parse_args()
all_processes = [Process(target=work, args=(args.time,)) for i in range(args.process)]
print("Starting...")
for p in all_processes:
p.start()
for p in all_processes:
p.join()
print("Complete!")
if __name__ == "__main__":
main()
#!/bin/bash
dd if=/dev/zero of=/root/file.txt count=5120 bs=1048576
#!/usr/bin/env python3
from argparse import ArgumentParser
from datetime import datetime, timedelta
from multiprocessing import Process
import os
import os.path
import random
import shutil
import subprocess
import time
_TEST_SUB_DIR = "storage_latency_test"
_FILE_SIZE = 64 * 1024 # 64 KB
_BATCH_SIZE = 256 # 256 bytes
_NUM_OF_FILES = 1000
def reset_dir(disk: str) -> None:
"""
Recreate the test directory.
"""
test_dir = os.path.join(disk, _TEST_SUB_DIR)
shutil.rmtree(test_dir, ignore_errors=True)
os.mkdir(test_dir)
time.sleep(1)
def task(d: str, p: int, t: int, i: int) -> None:
"""
Before the allocated time is elapsed,
each time open a new file or an existing file,
write 64 KB content to it, then sleep for up to 0.5 second.
The total number of such files is 1000.
"""
current_time = datetime.now()
delta = timedelta(seconds=t)
exit_time = current_time + delta
test_dir = os.path.join(d, _TEST_SUB_DIR)
while datetime.now() <= exit_time:
next_file_num = random.randrange(0, _NUM_OF_FILES)
next_file_num = next_file_num // p * p + i
next_file = os.open(os.path.join(test_dir, "{0}.txt".format(str(next_file_num))),
flags=os.O_WRONLY | os.O_CREAT)
os.write(next_file, generate_random_bytes(_FILE_SIZE))
os.close(next_file)
time.sleep(random.randrange(1, 50) / 100)
def generate_random_bytes(size: int) -> bytearray:
"""
Create a binary byte array with random data of given size.
"""
barr = bytearray(size)
for i in range(size):
if (i % _BATCH_SIZE == 0):
barr[i] = random.randrange(0, 255)
else:
barr[i] = barr[i-1]
return barr
def main():
parser = ArgumentParser()
parser.add_argument("-d", "--disk",
type=str, help="Disk partition", required=True)
parser.add_argument("-p", "--process",
type=int, help="Number of processes", required=True)
parser.add_argument("-t", "--time",
type=int, help="Running time in seconds", required=True)
args = parser.parse_args()
reset_dir(args.disk)
all_processes = [Process(target=task, args=(args.disk, args.process, args.time, i)) for i in range(args.process)]
print("Starting I/O operations...")
for p in all_processes:
p.start()
for p in all_processes:
p.join()
reset_dir(args.disk)
print("Complete!")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
from argparse import ArgumentParser
from datetime import datetime, timedelta
from multiprocessing import Process
import random
import time
def generate_numbers(s):
num = random.randint(20000, 1000000)
res = [[[num for k in range(1000)] for j in range(1000)] for i in range(s)]
return res
def use(res):
i = len(res)
j = len(res[0])
k = len(res[0][0])
ans = 0
for z in range(10):
ans += res[random.randrange(0, i)][random.randrange(0, j)][random.randrange(0, k)] * \
res[random.randrange(0, i)][random.randrange(0, j)][random.randrange(0, k)]
def task(s, t, i):
print("Process", i, "loading data...")
res = generate_numbers(s)
print("Process", i, "loading complete.")
current_time = datetime.now()
delta = timedelta(seconds=t)
exit_time = current_time + delta
while datetime.now() <= exit_time:
use(res)
time.sleep(0.1)
print("Process", i, "exit.")
def main():
parser = ArgumentParser()
parser.add_argument("-s", "--size",
type=int, help="Number of integers in million", required=True)
parser.add_argument("-p", "--process",
type=int, help="Number of processes", required=True)
parser.add_argument("-t", "--time",
type=int, help="Running time in seconds", required=True)
args = parser.parse_args()
all_processes = [Process(target=task, args=(args.size, args.time, i)) for i in range(args.process)]
print("Starting...")
for p in all_processes:
p.start()
for p in all_processes:
p.join()
print("Complete!")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment