Created
March 16, 2020 17:03
-
-
Save JohnyDeath/c7b04f25f186e85aedd5fbfd3b65aae9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import time | |
import random | |
from enum import Enum | |
from lib.common import bootstrap | |
from lib.common.logger import global_logger | |
from lib.common.config import StrPathExpanded | |
from lib.common.base_scenario import BaseScenario | |
from lib.common.errors import SACError | |
from lib.utils import KNOWN_ARCHIVE_EXTENSIONS, splitext_archive, \ | |
unpack_archive, pack_archive | |
from lib.utils.fs import copy_file_or_directory, remove_file_or_directory | |
## Enum with type of sources. | |
class PathType(Enum): | |
FILE = "file" | |
DIRECTORY = "directory" | |
ARCHIVE = "archive" | |
## Get source type by path. | |
# @param cls Class. | |
# @param path Path to source. | |
# @return Enumeration member. | |
@classmethod | |
def detect(cls, path): | |
# if path is directory, return DIRECTORY | |
if os.path.isdir(path): | |
return cls.DIRECTORY | |
else: | |
# if path no a file (and not a directory), raise an exception | |
if not os.path.isfile(path): | |
raise SACError( | |
"ARGS_ERROR", "Specified path not a directory nor a file", | |
path=path | |
) | |
# split file name to name and extension | |
h, t = splitext_archive(os.path.basename(path)) | |
# if head (and tail is empty) or tail in known archive extension, | |
# assume that path is archive, file otherwise | |
if t in KNOWN_ARCHIVE_EXTENSIONS or (h in KNOWN_ARCHIVE_EXTENSIONS \ | |
and t == ""): | |
return cls.ARCHIVE | |
else: | |
return cls.FILE | |
class CopyFilesScenario(BaseScenario): | |
## Available object properties: | |
# self.config - ScenarioConfiguration object. | |
# self.tmp - Path to temporary folder. | |
def _validate_specific_data(self): | |
validate_data = [ | |
["source", StrPathExpanded], | |
["destination", StrPathExpanded], | |
["copy-with-root", bool], | |
["replace", bool], | |
["remove-source", bool], | |
["archive-type", str, str, ["zip", "tar", "tar.gz", "tar.xz", | |
"tar.bz2"]], | |
["archive-action", str, str, ["pack", "unpack", "none"]], | |
["max-delay", int, int, lambda x: x >= 0], | |
["pattern", str], | |
["invert-pattern", bool] | |
] | |
self.config.validate(validate_data) | |
# check regex | |
if self.config["pattern"]: | |
try: | |
re.compile(self.config["pattern"]) | |
except re.error: | |
raise SACError("ARGS_ERROR", "Invalid regular expression", | |
regex=self.config["pattern"]) | |
## Check destination location. | |
# @param self Pointer to object. | |
def __check_destination_folder(self): | |
# first, if destination directory doesn't exists, try to create it | |
if not os.path.exists(self.config["destination"]): | |
try: | |
os.makedirs(self.config["destination"]) | |
except: | |
raise SACError( | |
"ARGS_ERROR", | |
"Cannot create directory at specified location", | |
location=self.config["destination"] | |
) | |
else: | |
if not os.path.isdir(self.config["destination"]) and \ | |
not self.config["replace"]: | |
raise SACError( | |
"ARGS_ERROR", | |
"Destination path already exists and replace is not " | |
"allowed", | |
destination_path=self.config["destination"] | |
) | |
## Check source location existence. | |
# @param self Pointer to object. | |
def __check_source_file(self): | |
if not os.path.exists(self.config["source"]): | |
raise SACError("ARGS_ERROR", "Source path not exists", | |
location=self.config["source"]) | |
self.__source_type = PathType.detect(self.config["source"]) | |
def _get_available_tests(self): | |
return [ | |
["check-destination-folder", self.__check_destination_folder, | |
False], | |
["check-source", self.__check_source_file, True] | |
] | |
## Default files copy. | |
# @param self Pointer to object. | |
def __default_copy(self): | |
copy_file_or_directory(self.config["source"], | |
self.config["destination"], | |
self.config["copy-with-root"], | |
self.config["pattern"], | |
self.config["invert-pattern"], | |
self.config["replace"]) | |
if self.config["remove-source"]: | |
remove_file_or_directory(self.config["source"], | |
self.config["pattern"], | |
self.config["invert-pattern"]) | |
## Unpack source and copy it to destination folder. | |
# @param self Pointer to object. | |
def __unpack_copy(self): | |
unpack_archive(self.config["source"], self.config["destination"], | |
self.config["copy-with-root"], self.config["pattern"], | |
self.config["invert-pattern"], | |
self.config["replace"]) | |
if self.config["remove-source"]: | |
remove_file_or_directory(self.config["source"]) | |
## Pack required files to archive and place it in destination folder. | |
# @param self Pointer to object. | |
def __pack_copy(self): | |
pack_archive(self.config["source"], self.config["destination"], | |
self.config["archive-type"], | |
self.config["copy-with-root"], | |
self.config["pattern"], self.config["invert-pattern"], | |
self.config["replace"]) | |
if self.config["remove-source"]: | |
remove_file_or_directory(self.config["source"], | |
self.config["pattern"], | |
self.config["invert-pattern"]) | |
def _real(self): | |
# random delay before copy | |
if self.config["max-delay"] > 0: | |
delay = random.uniform(0, self.config["max-delay"]) | |
global_logger.info(message="Selected delay", delay=delay) | |
time.sleep(delay) | |
global_logger.info(message="Starting copy") | |
# create destination folder | |
if os.path.exists(self.config["destination"]): | |
if not os.path.isdir(self.config["destination"]): | |
if self.config["replace"]: | |
remove_file_or_directory(self.config["destination"]) | |
else: | |
raise SACError( | |
"ARGS_ERROR", | |
"Destination path exists and not a directory", | |
path=self.config["destination"] | |
) | |
else: | |
os.makedirs(self.config["destination"]) | |
# depends on source type and archive action, perform action | |
if self.config["archive-action"] == "none": | |
self.__default_copy() | |
elif self.config["archive-action"] == "pack": | |
self.__pack_copy() | |
else: | |
source_type = PathType.detect(self.config["source"]) | |
if source_type != PathType.ARCHIVE: | |
self.__default_copy() | |
else: | |
self.__unpack_copy() | |
if __name__ == "__main__": | |
CopyFilesScenario.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment