Skip to content

Instantly share code, notes, and snippets.

@JohnyDeath
Created March 16, 2020 17:03
Show Gist options
  • Save JohnyDeath/c7b04f25f186e85aedd5fbfd3b65aae9 to your computer and use it in GitHub Desktop.
Save JohnyDeath/c7b04f25f186e85aedd5fbfd3b65aae9 to your computer and use it in GitHub Desktop.
import os
import re
import time
import random
from enum import Enum
from lib.common import bootstrap
from lib.common.logger import global_logger
from lib.common.config import StrPathExpanded
from lib.common.base_scenario import BaseScenario
from lib.common.errors import SACError
from lib.utils import KNOWN_ARCHIVE_EXTENSIONS, splitext_archive, \
unpack_archive, pack_archive
from lib.utils.fs import copy_file_or_directory, remove_file_or_directory
## Enum with type of sources.
class PathType(Enum):
FILE = "file"
DIRECTORY = "directory"
ARCHIVE = "archive"
## Get source type by path.
# @param cls Class.
# @param path Path to source.
# @return Enumeration member.
@classmethod
def detect(cls, path):
# if path is directory, return DIRECTORY
if os.path.isdir(path):
return cls.DIRECTORY
else:
# if path no a file (and not a directory), raise an exception
if not os.path.isfile(path):
raise SACError(
"ARGS_ERROR", "Specified path not a directory nor a file",
path=path
)
# split file name to name and extension
h, t = splitext_archive(os.path.basename(path))
# if head (and tail is empty) or tail in known archive extension,
# assume that path is archive, file otherwise
if t in KNOWN_ARCHIVE_EXTENSIONS or (h in KNOWN_ARCHIVE_EXTENSIONS \
and t == ""):
return cls.ARCHIVE
else:
return cls.FILE
class CopyFilesScenario(BaseScenario):
## Available object properties:
# self.config - ScenarioConfiguration object.
# self.tmp - Path to temporary folder.
def _validate_specific_data(self):
validate_data = [
["source", StrPathExpanded],
["destination", StrPathExpanded],
["copy-with-root", bool],
["replace", bool],
["remove-source", bool],
["archive-type", str, str, ["zip", "tar", "tar.gz", "tar.xz",
"tar.bz2"]],
["archive-action", str, str, ["pack", "unpack", "none"]],
["max-delay", int, int, lambda x: x >= 0],
["pattern", str],
["invert-pattern", bool]
]
self.config.validate(validate_data)
# check regex
if self.config["pattern"]:
try:
re.compile(self.config["pattern"])
except re.error:
raise SACError("ARGS_ERROR", "Invalid regular expression",
regex=self.config["pattern"])
## Check destination location.
# @param self Pointer to object.
def __check_destination_folder(self):
# first, if destination directory doesn't exists, try to create it
if not os.path.exists(self.config["destination"]):
try:
os.makedirs(self.config["destination"])
except:
raise SACError(
"ARGS_ERROR",
"Cannot create directory at specified location",
location=self.config["destination"]
)
else:
if not os.path.isdir(self.config["destination"]) and \
not self.config["replace"]:
raise SACError(
"ARGS_ERROR",
"Destination path already exists and replace is not "
"allowed",
destination_path=self.config["destination"]
)
## Check source location existence.
# @param self Pointer to object.
def __check_source_file(self):
if not os.path.exists(self.config["source"]):
raise SACError("ARGS_ERROR", "Source path not exists",
location=self.config["source"])
self.__source_type = PathType.detect(self.config["source"])
def _get_available_tests(self):
return [
["check-destination-folder", self.__check_destination_folder,
False],
["check-source", self.__check_source_file, True]
]
## Default files copy.
# @param self Pointer to object.
def __default_copy(self):
copy_file_or_directory(self.config["source"],
self.config["destination"],
self.config["copy-with-root"],
self.config["pattern"],
self.config["invert-pattern"],
self.config["replace"])
if self.config["remove-source"]:
remove_file_or_directory(self.config["source"],
self.config["pattern"],
self.config["invert-pattern"])
## Unpack source and copy it to destination folder.
# @param self Pointer to object.
def __unpack_copy(self):
unpack_archive(self.config["source"], self.config["destination"],
self.config["copy-with-root"], self.config["pattern"],
self.config["invert-pattern"],
self.config["replace"])
if self.config["remove-source"]:
remove_file_or_directory(self.config["source"])
## Pack required files to archive and place it in destination folder.
# @param self Pointer to object.
def __pack_copy(self):
pack_archive(self.config["source"], self.config["destination"],
self.config["archive-type"],
self.config["copy-with-root"],
self.config["pattern"], self.config["invert-pattern"],
self.config["replace"])
if self.config["remove-source"]:
remove_file_or_directory(self.config["source"],
self.config["pattern"],
self.config["invert-pattern"])
def _real(self):
# random delay before copy
if self.config["max-delay"] > 0:
delay = random.uniform(0, self.config["max-delay"])
global_logger.info(message="Selected delay", delay=delay)
time.sleep(delay)
global_logger.info(message="Starting copy")
# create destination folder
if os.path.exists(self.config["destination"]):
if not os.path.isdir(self.config["destination"]):
if self.config["replace"]:
remove_file_or_directory(self.config["destination"])
else:
raise SACError(
"ARGS_ERROR",
"Destination path exists and not a directory",
path=self.config["destination"]
)
else:
os.makedirs(self.config["destination"])
# depends on source type and archive action, perform action
if self.config["archive-action"] == "none":
self.__default_copy()
elif self.config["archive-action"] == "pack":
self.__pack_copy()
else:
source_type = PathType.detect(self.config["source"])
if source_type != PathType.ARCHIVE:
self.__default_copy()
else:
self.__unpack_copy()
if __name__ == "__main__":
CopyFilesScenario.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment