Last active
April 21, 2021 11:41
-
-
Save DDoSolitary/0c79a61132e43c66a07e6570979d2883 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import asyncio | |
import copy | |
import random | |
import re | |
from abc import ABC | |
from argparse import ArgumentParser | |
from asyncio import AbstractEventLoop, StreamReader, StreamWriter | |
from asyncio.subprocess import Process | |
from collections.abc import Coroutine | |
from dataclasses import dataclass | |
from decimal import Decimal | |
from typing import Optional | |
MIN_FLOOR = 1 | |
MAX_FLOOR = 20 | |
MAX_ID = 10000 | |
PREQ_COUNT = 50 | |
MAX_REQ_INTERVAL = 5 | |
TEST_COUNT = 20 | |
@dataclass | |
class TimedMessage(ABC): | |
time: Decimal | |
real_time: Decimal | |
@dataclass | |
class TimedRequest(TimedMessage, ABC): | |
def __repr__(self): | |
return f'[{self.real_time}][{self.time}]{self}' | |
@dataclass | |
class ModeRequest(TimedRequest): | |
mode: str | |
def __repr__(self): | |
return super().__repr__() | |
def __str__(self): | |
return self.mode | |
@dataclass | |
class PersonRequest(TimedRequest): | |
pid: int | |
from_floor: int | |
to_floor: int | |
def __repr__(self): | |
return super().__repr__() | |
def __str__(self): | |
return f'{self.pid}-FROM-{self.from_floor}-TO-{self.to_floor}' | |
@dataclass | |
class ElevatorRequest(TimedRequest): | |
eid: int | |
model: str | |
def __repr__(self): | |
return super().__repr__() | |
def __str__(self): | |
return f'ADD-{self.eid}-{self.model}' | |
@dataclass | |
class TimedResponse(TimedMessage, ABC): | |
eid: int | |
op: str | |
floor: int | |
def __repr__(self): | |
return f'[{self.real_time}][{self.time}]{self}' | |
@dataclass | |
class ElevatorResponse(TimedResponse): | |
def __repr__(self): | |
return super().__repr__() | |
def __str__(self): | |
return f'{self.op}-{self.floor}-{self.eid}' | |
@dataclass | |
class PersonResponse(TimedResponse): | |
pid: int | |
def __repr__(self): | |
return TimedResponse.__repr__(self) | |
def __str__(self): | |
return f'{self.op}-{self.pid}-{self.floor}-{self.eid}' | |
class TestError(Exception): | |
def __init__(self, msg: str): | |
super().__init__(msg) | |
class ElevatorState: | |
_eid: int | |
_stops: list[int] | |
_move_delay: Decimal | |
_capacity: int | |
_floor: int | |
_door_open: bool | |
_last_time: Optional[Decimal] | |
_last_real_time: Decimal | |
_cabin: dict[int, PersonRequest] | |
def __init__(self, eid: int, model: str, real_time: Decimal): | |
self._eid = eid | |
self._floor = 1 | |
self._door_open = False | |
self._last_time = None | |
self._last_real_time = real_time | |
self._cabin = dict() | |
if model == 'A': | |
self._stops = list(range(1, 21)) | |
self._move_delay = Decimal('0.6') | |
self._capacity = 8 | |
elif model == 'B': | |
self._stops = list(i * 2 + 1 for i in range(10)) | |
self._move_delay = Decimal('0.4') | |
self._capacity = 6 | |
elif model == 'C': | |
self._stops = [1, 2, 3, 18, 19, 20] | |
self._move_delay = Decimal('0.2') | |
self._capacity = 4 | |
else: | |
raise TestError("invalid elevator model") | |
def check_delay(self, delay: Decimal, new_time: Decimal, new_real_time: Decimal) -> None: | |
if self._last_time is None: | |
if new_real_time - self._last_real_time < delay: | |
raise TestError('delay too short (real time)') | |
elif new_time - self._last_time < delay: | |
raise TestError('delay too short (output time)') | |
self._last_time = new_time | |
self._last_real_time = new_real_time | |
def check_arrive(self, new_floor: int, time: Decimal, real_time: Decimal) -> None: | |
self.check_delay(self._move_delay, time, real_time) | |
if abs(new_floor - self._floor) != 1 or new_floor not in range(MIN_FLOOR, MAX_FLOOR + 1): | |
raise TestError('invalid new floor') | |
self._floor = new_floor | |
def check_open(self, floor: int, time: Decimal, real_time: Decimal) -> None: | |
self.check_delay(Decimal(0), time, real_time) | |
if floor != self._floor: | |
raise TestError('not at that floor') | |
if floor not in self._stops: | |
raise TestError('not allowed to stop here') | |
if self._door_open: | |
raise TestError('door already open') | |
self._door_open = True | |
def check_close(self, floor: int, time: Decimal, real_time: Decimal) -> None: | |
self.check_delay(Decimal('0.4'), time, real_time) | |
if floor != self._floor: | |
raise TestError('invalid floor') | |
if not self._door_open: | |
raise TestError('door already closed') | |
self._door_open = False | |
def check_in(self, floor: int, req: PersonRequest) -> None: | |
if floor != self._floor: | |
raise TestError('invalid floor') | |
if req.from_floor != floor: | |
raise TestError('request floor mismatch') | |
if req.pid in self._cabin: | |
raise TestError('request already in cabin') | |
if len(self._cabin) == self._capacity: | |
raise TestError('cabin full') | |
self._cabin[req.pid] = req | |
def check_out(self, floor: int, pid: int) -> PersonRequest: | |
if floor != self._floor: | |
raise TestError('invalid floor') | |
req: Optional[PersonRequest] = self._cabin.pop(pid) | |
if req is None: | |
raise TestError('request not in cabin') | |
return req | |
def check_final(self): | |
if self._cabin: | |
raise TestError('cabin not empty') | |
if self._door_open: | |
raise TestError('door still open') | |
class SystemState: | |
_pending_requests: dict[int, PersonRequest] | |
_elevators: dict[int, ElevatorState] | |
def __init__(self, real_time: Decimal): | |
self._pending_requests = dict() | |
self._elevators = dict() | |
self.add_elevator(1, 'A', real_time) | |
self.add_elevator(2, 'B', real_time) | |
self.add_elevator(3, 'C', real_time) | |
def add_pending(self, req: PersonRequest) -> None: | |
self._pending_requests[req.pid] = req | |
def add_elevator(self, eid: int, model: str, real_time: Decimal) -> None: | |
self._elevators[eid] = ElevatorState(eid, model, real_time) | |
def check_res(self, res: TimedResponse) -> None: | |
elevator: Optional[ElevatorState] = self._elevators.get(res.eid) | |
if elevator is None: | |
raise TestError('elevator not found') | |
if isinstance(res, ElevatorResponse): | |
if res.op == 'ARRIVE': | |
elevator.check_arrive(res.floor, res.time, res.real_time) | |
elif res.op == 'OPEN': | |
elevator.check_open(res.floor, res.time, res.real_time) | |
elif res.op == 'CLOSE': | |
elevator.check_close(res.floor, res.time, res.real_time) | |
else: | |
raise TestError('invalid op') | |
elif isinstance(res, PersonResponse): | |
if res.op == 'IN': | |
req: Optional[PersonRequest] = self._pending_requests.pop(res.pid) | |
if req is None: | |
raise TestError('request not found') | |
elevator.check_in(res.floor, req) | |
elif res.op == 'OUT': | |
req: PersonRequest = elevator.check_out(res.floor, res.pid) | |
if req.to_floor != res.floor: | |
req = copy.copy(req) | |
req.from_floor = res.floor | |
self.add_pending(req) | |
else: | |
raise TestError('invalid op') | |
else: | |
assert False | |
def check_final(self): | |
for elevator in self._elevators.values(): | |
elevator.check_final() | |
if self._pending_requests: | |
raise TestError('pending requests not processed') | |
@dataclass | |
class TestResult: | |
err_msg: Optional[str] | |
responses: list[TimedResponse] | |
raw_responses: list[str] | |
loop: AbstractEventLoop = asyncio.get_event_loop() | |
def gen_requests() -> list[TimedRequest]: | |
mode: str = random.choice(('Random', 'Morning', 'Night')) | |
requests: list[TimedRequest] = [ModeRequest(time=Decimal(1), real_time=Decimal(0), mode=mode)] | |
current_time: Decimal = Decimal(1) | |
for pid in random.sample(range(MAX_ID), PREQ_COUNT): | |
if mode != 'Night': | |
max_interval: int = MAX_REQ_INTERVAL | |
if mode == 'MORNING' and max_interval > 2: | |
max_interval = 2 | |
current_time += round(Decimal(random.uniform(0, max_interval)), 1) | |
from_floor: int | |
to_floor: int | |
if mode == 'Random': | |
from_floor = random.randrange(MIN_FLOOR, MAX_FLOOR + 1) | |
to_floor = random.randrange(MIN_FLOOR, MAX_FLOOR) | |
if to_floor >= from_floor: | |
to_floor += 1 | |
elif mode == 'Morning': | |
from_floor = 1 | |
to_floor = random.randrange(MIN_FLOOR + 1, MAX_FLOOR + 1) | |
elif mode == 'Night': | |
from_floor = random.randrange(MIN_FLOOR + 1, MAX_FLOOR + 1) | |
to_floor = 1 | |
else: | |
assert False | |
req: PersonRequest = PersonRequest( | |
time=current_time, | |
real_time=Decimal(0), | |
pid=pid, | |
from_floor=from_floor, | |
to_floor=to_floor) | |
requests.append(req) | |
max_time: Decimal = requests[-1].time | |
ereq_count: int = random.randrange(3) | |
for eid in random.sample(range(4, MAX_ID), ereq_count): | |
req: ElevatorRequest = ElevatorRequest( | |
time=round(Decimal(random.uniform(1, float(max_time) + MAX_REQ_INTERVAL)), 1), # noqa | |
real_time=Decimal(0), | |
eid=eid, | |
model=random.choice(('A', 'B', 'C'))) | |
requests.append(req) | |
requests.sort(key=lambda x: x.time) | |
return requests | |
def parse_requests(path: str) -> list[TimedRequest]: | |
requests: list[TimedRequest] = [] | |
with open(path) as f: | |
while True: | |
line: str = f.readline() | |
if not line: | |
break | |
if not line.strip(): | |
continue | |
match: re.Match = re.match(r'\[(.*)](.*)\n?', line) | |
time: Decimal = Decimal(match.group(1)) | |
req_str = match.group(2) | |
if req_str in ('Random', 'Morning', 'Night'): | |
requests.append(ModeRequest(time=time, real_time=Decimal(0), mode=req_str)) | |
elif req_str.startswith('ADD-'): | |
fields: list[str] = req_str.split('-') | |
requests.append(ElevatorRequest(time=time, real_time=Decimal(0), eid=int(fields[1]), model=fields[2])) | |
else: | |
fields: list[str] = req_str.split('-') | |
assert fields[1] == 'FROM' and fields[3] == 'TO' | |
requests.append(PersonRequest( | |
time=time, | |
real_time=Decimal(0), | |
pid=int(fields[0]), | |
from_floor=int(fields[2]), | |
to_floor=int(fields[4]))) | |
return requests | |
def get_loop_time() -> Decimal: | |
return round(Decimal(loop.time()), 9) # noqa | |
def parse_response(line: str) -> TimedResponse: | |
try: | |
match: re.Match = re.match(r'\[(.*)](.*)', line) | |
fields: list[str] = match.group(2).split('-') | |
if fields[0] == 'IN' or fields[0] == 'OUT': | |
return PersonResponse( | |
time=Decimal(match.group(1)), | |
real_time=get_loop_time(), | |
op=fields[0], | |
pid=int(fields[1]), | |
floor=int(fields[2]), | |
eid=int(fields[3])) | |
else: | |
return ElevatorResponse( | |
time=Decimal(match.group(1)), | |
real_time=get_loop_time(), | |
op=fields[0], | |
floor=int(fields[1]), | |
eid=int(fields[2])) | |
except: # noqa | |
raise TestError(f'failed to parse response {line!r}') | |
async def write_requests(writer: StreamWriter, requests: list[TimedRequest], state: SystemState) -> None: | |
start_time: Decimal = get_loop_time() | |
try: | |
for req in requests: | |
await asyncio.sleep(float(start_time + req.time - get_loop_time())) | |
req.real_time = get_loop_time() | |
if isinstance(req, PersonRequest): | |
state.add_pending(req) | |
elif isinstance(req, ElevatorRequest): | |
state.add_elevator(req.eid, req.model, req.real_time) | |
writer.write(f'{req}\n'.encode()) | |
await writer.drain() | |
writer.write_eof() | |
await writer.drain() | |
except ConnectionResetError: | |
pass | |
async def check_stdout(reader: StreamReader, state: SystemState) -> TestResult: | |
err_msg: Optional[str] = None | |
responses: list[TimedResponse] = [] | |
raw_responses: list[str] = [] | |
idx: int = 0 | |
while True: | |
try: | |
idx += 1 | |
line: bytes = await reader.readline() | |
if not line: | |
break | |
line_str: str = line.decode().strip() | |
raw_responses.append(line_str) | |
if err_msg is None: | |
res: TimedResponse = parse_response(line_str) | |
responses.append(res) | |
state.check_res(res) | |
except TestError as e: # noqa | |
err_msg = f'invalid response at line {idx}: {e}' | |
if err_msg is None: | |
try: | |
state.check_final() | |
except TestError as e: | |
err_msg = f'final check failed: {e}' | |
return TestResult(err_msg, responses, raw_responses) | |
async def collect_stderr(reader: StreamReader) -> list[str]: | |
lines: list[str] = [] | |
while True: | |
line: bytes = await reader.readline() | |
if not line: | |
break | |
lines.append(line.decode()) | |
return lines | |
async def wait_proc(proc: Process, timeout: Decimal) -> None: | |
try: | |
await asyncio.wait_for(proc.wait(), float(timeout)) | |
except asyncio.TimeoutError: | |
proc.terminate() | |
async def do_test(idx: int, cmd: list[str], requests: list[TimedRequest], verbose: bool) -> None: | |
timeout: int = sum(1 if isinstance(req, PersonRequest) else 0 for req in requests) * 10 | |
proc: Process = await asyncio.create_subprocess_exec( | |
cmd[0], | |
*cmd[1:], | |
stdin=asyncio.subprocess.PIPE, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE) | |
start_time: Decimal = get_loop_time() | |
state: SystemState = SystemState(get_loop_time()) | |
test_res: TestResult | |
stderr: list[str] | |
_, test_res, stderr, _ = await asyncio.gather( | |
write_requests(proc.stdin, requests, state), | |
check_stdout(proc.stdout, state), | |
collect_stderr(proc.stderr), | |
wait_proc(proc, Decimal(timeout))) | |
elapsed_time: Decimal = get_loop_time() - start_time | |
print(f'---TEST {idx} RESULT BEGIN---') | |
print(f'EXIT CODE: {proc.returncode}') | |
print(f'ELAPSED TIME: {elapsed_time}') | |
if stderr: | |
print('STDERR:') | |
for line in stderr: | |
print(line, end='') | |
if test_res.err_msg is not None: | |
print('ERROR:') | |
print(test_res.err_msg) | |
if verbose or proc.returncode != 0 or test_res.err_msg is not None: | |
print('REQUESTS:') | |
for req in requests: | |
print(repr(req)) | |
print('RESPONSE:') | |
for res in test_res.responses: | |
print(repr(res)) | |
print('RAW RESPONSES:') | |
for res in test_res.raw_responses: | |
print(res) | |
print(f'---TEST {idx} RESULT END---\n') | |
async def do_tests(cmd: list[str]) -> None: | |
test_tasks: list[Coroutine] = [] | |
for i in range(TEST_COUNT): | |
mode: str | |
requests: list[TimedRequest] = gen_requests() | |
test_tasks.append(do_test(i, cmd, requests, False)) | |
await asyncio.gather(*test_tasks) | |
def main() -> None: | |
parser: ArgumentParser = ArgumentParser() | |
parser.add_argument('--input', '-i') | |
parser.add_argument('cmd') | |
parser.add_argument('args', nargs='*') | |
args: argparse.Namespace = parser.parse_args() | |
requests: Optional[list[TimedRequest]] = \ | |
parse_requests(args.input) if args.input is not None else None | |
cmd = [args.cmd] + args.args | |
try: | |
if requests is None: | |
loop.run_until_complete(do_tests(cmd)) | |
else: | |
loop.run_until_complete(do_test(0, cmd, requests, True)) | |
finally: | |
loop.close() | |
if __name__ == '__main__': | |
main() | |
# vim: ts=4:sw=4:noet |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment