Last active
August 17, 2022 06:54
-
-
Save Alyetama/5348dfdb0fe4bb33596ebdd608b3dc76 to your computer and use it in GitHub Desktop.
Label Studio S3 to YOLOv5
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
import argparse | |
import json | |
import os | |
import shutil | |
import signal | |
import sys | |
import textwrap | |
from pathlib import Path | |
from typing import Union | |
import ray | |
import requests | |
from dotenv import load_dotenv | |
from tqdm import tqdm | |
from split_dataset import autosplit | |
class PrepareDataset: | |
def __init__(self, | |
project_id: int, | |
dataset_path: str = './dataset', | |
weights: Union[list, tuple] = (0.8, 0.2, 0.0)): | |
self.project_id = project_id | |
self.dataset_path = Path(dataset_path) | |
self.weights = weights | |
@staticmethod | |
def keyboard_interrupt_handler(sig: int, _) -> None: | |
print(f'\nKeyboardInterrupt (id: {sig}) has been caught...') | |
print('Terminating the session gracefully...') | |
sys.exit(1) | |
def get_project_data(self) -> None: | |
@ray.remote | |
def download(task: dict): | |
img_url = task['data']['image'].replace('s3://', s3_endpoint) | |
fname = self.dataset_path / 'images' / Path(img_url).name | |
if fname.exists(): | |
return | |
with open(fname, 'wb') as fp: | |
res = requests.get(img_url) | |
fp.write(res.content) | |
load_dotenv() | |
s3_endpoint = os.environ['S3_ENDPOINT'].rstrip('/') + '/' | |
headers = { | |
'Authorization': f'Token {os.environ["LABEL_STUDIO_TOKEN"]}' | |
} | |
label_studio_host = os.environ['LABEL_STUDIO_HOST'].rstrip('/') + '/' | |
r = requests.get( | |
f'{label_studio_host}/api/projects/{self.project_id}/export?exportType=YOLO', # noqa E501 | |
headers=headers) | |
r.raise_for_status() | |
with open(f'{self.dataset_path}.zip', 'wb') as f: | |
f.write(r.content) | |
shutil.unpack_archive(f'{self.dataset_path}.zip', self.dataset_path) | |
Path(f'{self.dataset_path}.zip').unlink() | |
r = requests.get( | |
f'{label_studio_host}/api/projects/{self.project_id}/export?exportType=JSON', # noqa E501 | |
headers=headers) | |
r.raise_for_status() | |
data = r.json() | |
with open(f'{self.dataset_path}/annotated_tasks.json', 'w') as j: | |
json.dump(data, j) | |
futures = [download.remote(task) for task in tqdm(data)] | |
results_nums = [ray.get(future) for future in tqdm(futures)] | |
def create_dataset_config(self): | |
with open(self.dataset_path / 'classes.txt') as f: | |
classes = f.read().splitlines() | |
num_classes = len(classes) | |
content = f'''\ | |
path: {self.dataset_path.absolute()} | |
train: autosplit_train.txt | |
val: autosplit_val.txt | |
test: | |
nc: {num_classes} | |
names: {classes}\n''' | |
with open(self.dataset_path / 'dataset_config.yml', 'w') as f: | |
f.write(textwrap.dedent(content)) | |
def run_pipeline(self): | |
signal.signal(signal.SIGINT, self.keyboard_interrupt_handler) | |
self.get_project_data() | |
splits = [ | |
'autosplit_train.txt', 'autosplit_val.txt', 'autosplit_test.txt' | |
] | |
autosplit(self.dataset_path, self.weights) | |
for split in splits: | |
if Path(split).exists(): | |
if (self.dataset_path / split).exists(): | |
Path(self.dataset_path / split).unlink() | |
shutil.move(split, self.dataset_path) | |
self.create_dataset_config() | |
def _opts() -> argparse.Namespace: | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-p', | |
'--project-id', | |
help='Label-studio project id', | |
type=int, | |
required=True) | |
parser.add_argument('-d', | |
'--dataset-path', | |
help='Path to the output dataset ' | |
'(if existing, dataset will be updated)', | |
type=str, | |
default='./dataset') | |
parser.add_argument( | |
'-w', | |
'--weights', | |
help='Split weights: train val test (default: 0.8 0.2 0.0)', | |
type=float, | |
default=[0.8, 0.2, 0.0], | |
nargs=3) | |
return parser.parse_args() | |
if __name__ == '__main__': | |
args = _opts() | |
pd = PrepareDataset(project_id=args.project_id, | |
dataset_path=args.dataset_path, | |
weights=args.weights) | |
pd.run_pipeline() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ray>=1.13.0 | |
requests>=2.28.1 | |
python-dotenv>=0.20.0 | |
tqdm>=4.64.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
""" | |
Source: https://github.com/ultralytics/yolov5/blob/master/utils/dataloaders.py | |
""" | |
import os | |
import random | |
from pathlib import Path | |
from tqdm import tqdm | |
def img2label_paths(img_paths) -> list: | |
# Define label paths as a function of image paths | |
sa = os.sep + 'images' + os.sep # /images/ substrings | |
sb = os.sep + 'labels' + os.sep # /labels/ substrings | |
return [ | |
sb.join(x.rsplit(sa, 1)).rsplit('.', 1)[0] + '.txt' for x in img_paths | |
] | |
def autosplit(path: str, | |
weights: tuple = (0.9, 0.1, 0.0), | |
annotated_only: bool = False) -> None: | |
"""Autosplit a dataset | |
Autosplit a dataset into train/val/test splits and save | |
path/autosplit_*.txt files | |
Args: | |
path (str): Path to images directory | |
weights (tuple): Train, val, test weights | |
annotated_only (bool): Only use images with an annotated txt file | |
""" | |
IMG_FORMATS = [ | |
'bmp', 'jpg', 'jpeg', 'png', 'tif', 'tiff', 'dng', 'webp', 'mpo' | |
] # acceptable image suffixes | |
path = Path(path) # images dir | |
files = sorted(x for x in path.rglob('*.*') | |
if x.suffix[1:].lower() in IMG_FORMATS) # image files only | |
n = len(files) # number of files | |
random.seed(0) # for reproducibility | |
indices = random.choices([0, 1, 2], weights=weights, | |
k=n) # assign each image to a split | |
txt = ['autosplit_train.txt', 'autosplit_val.txt', | |
'autosplit_test.txt'] # 3 txt files | |
_ = [(path.parent / x).unlink() for x in txt | |
if Path(x).exists()] # remove existing | |
print(f'Autosplitting images from {path}' + | |
', using *.txt labeled images only' * annotated_only) | |
for i, img in tqdm(zip(indices, files), total=n): | |
if not annotated_only or Path(img2label_paths( | |
[str(img)])[0]).exists(): # check label | |
with open(path.parent / txt[i], 'a') as f: | |
f.write(f'./{Path(*img.parts[1:])}' + | |
'\n') # add image to txt file |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment