Instantly share code, notes, and snippets.
Last active
May 14, 2025 07:32
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
Save davidlu1001/671802b2653b5398eb4c0512baa281f6 to your computer and use it in GitHub Desktop.
convert_immuta_tags.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Immuta Tag Converter - Converts Immuta Tag API GET response to POST /tag API format. | |
This script processes tag data from Immuta's GET /tag API response and converts it into | |
the format required for POST /tag API requests, handling tag hierarchies and structure | |
according to Immuta API requirements. | |
Features: | |
- Support for tag hierarchies with custom separators | |
- Include/exclude pattern filtering for targeted tag processing | |
- Proper handling of top-level tags (no rootTag as per API requirements) | |
- Preservation of tag descriptions and metadata | |
- Option to control the deleteHierarchy flag | |
Reference: https://documentation.immuta.com/latest/developer-guides/api-intro/immuta-v1-api/configure-your-instance-of-immuta/tagging | |
""" | |
import json | |
import logging | |
import sys | |
import re | |
from pathlib import Path | |
from typing import List, Dict, Optional, Set | |
def setup_logging(verbose: bool = True, log_file: Optional[str] = None) -> None: | |
"""Configure logging settings.""" | |
level = logging.DEBUG if verbose else logging.INFO | |
handlers = [logging.StreamHandler(sys.stdout)] | |
if log_file: | |
handlers.append(logging.FileHandler(log_file)) | |
logging.basicConfig( | |
level=level, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=handlers | |
) | |
def validate_tag_name(tag_name: str, separator: str) -> None: | |
""" | |
Validate tag name according to Immuta constraints. | |
Args: | |
tag_name: The tag name to validate | |
separator: Character used to denote hierarchy in tag names | |
Raises: | |
ValueError: If tag name is invalid | |
""" | |
if not tag_name or not all(part for part in tag_name.split(separator)): | |
raise ValueError("Tag name cannot be empty or contain consecutive separators") | |
if any(c in "<>[]{}|\\" for c in tag_name): | |
raise ValueError("Tag name contains invalid characters (<>[]{}|\\)") | |
if len(tag_name) > 255: | |
raise ValueError(f"Tag name '{tag_name}' exceeds maximum length of 255 characters") | |
def process_tag_hierarchy( | |
tags_data: List[Dict], | |
separator: str = ".", | |
delete_hierarchy: bool = True, | |
exclude_pattern: Optional[str] = None, | |
include_pattern: Optional[str] = None | |
) -> List[Dict]: | |
""" | |
Process tag data into a format suitable for POST /tag API, handling full hierarchies | |
and structuring requests according to Immuta API requirements. | |
Args: | |
tags_data: List of tag objects from GET /tag response. | |
separator: Character used to denote hierarchy in tag names. | |
delete_hierarchy: Whether to set deleteHierarchy: true for rootTag. | |
exclude_pattern: Regular expression pattern to exclude tags (and their children). | |
include_pattern: Regular expression pattern to include only matching tags (and their children). | |
Returns: | |
List of Dicts, each formatted for the Immuta API: | |
- First request has only 'tags' array for top-level tags (no rootTag) | |
- Subsequent requests have 'rootTag' and 'tags' for child hierarchies | |
""" | |
if not tags_data: | |
logging.warning("No tags found in input data") | |
return [{"tags": []}] | |
# Compile the exclude and include patterns if provided | |
exclude_regex = None | |
if exclude_pattern: | |
try: | |
exclude_regex = re.compile(exclude_pattern) | |
except re.error as e: | |
logging.error("Invalid exclude pattern '%s': %s", exclude_pattern, e) | |
raise ValueError(f"Invalid exclude pattern: {e}") | |
include_regex = None | |
if include_pattern: | |
try: | |
include_regex = re.compile(include_pattern) | |
except re.error as e: | |
logging.error("Invalid include pattern '%s': %s", include_pattern, e) | |
raise ValueError(f"Invalid include pattern: {e}") | |
# Initial filtering: Remove tags explicitly matching exclude pattern | |
filtered_tags = [] | |
excluded_by_pattern: Set[str] = set() | |
for tag in tags_data: | |
if "name" not in tag: | |
logging.warning("Skipping tag with missing 'name' field: %s", tag) | |
continue | |
tag_name = tag["name"] | |
if exclude_regex and exclude_regex.search(tag_name): | |
logging.info("Excluding tag '%s' due to matching exclude pattern", tag_name) | |
excluded_by_pattern.add(tag_name) | |
continue | |
filtered_tags.append(tag) | |
# Build tag map with validation | |
tag_map: Dict[str, Dict] = {} | |
for tag in filtered_tags: | |
tag_name = tag["name"] | |
try: | |
validate_tag_name(tag_name, separator) | |
if tag_name in tag_map: | |
raise ValueError(f"Duplicate tag name '{tag_name}' detected") | |
except ValueError as e: | |
logging.error("Invalid tag name: %s", e) | |
continue | |
tag_map[tag_name] = tag | |
# Apply include pattern filtering with ancestor preservation | |
if include_regex: | |
matching_tags: Set[str] = set() | |
# First identify all tags that match the include pattern | |
for tag_name in list(tag_map.keys()): | |
if include_regex.search(tag_name): | |
matching_tags.add(tag_name) | |
# Add all ancestors | |
parts = tag_name.split(separator) | |
for i in range(1, len(parts)): | |
ancestor = separator.join(parts[:i]) | |
matching_tags.add(ancestor) | |
# Then filter out tags that don't match and aren't ancestors of matching tags | |
for tag_name in list(tag_map.keys()): | |
if tag_name not in matching_tags: | |
logging.info("Excluding tag '%s' due to not matching include pattern", tag_name) | |
del tag_map[tag_name] | |
# Exclude tags whose ancestors were excluded | |
for tag_name in list(tag_map.keys()): | |
parts = tag_name.split(separator) | |
for i in range(1, len(parts)): | |
ancestor = separator.join(parts[:i]) | |
if ancestor in excluded_by_pattern: | |
logging.info("Excluding tag '%s' because ancestor '%s' was excluded", | |
tag_name, ancestor) | |
del tag_map[tag_name] | |
break | |
# Organize tags by level and parent-child relationships | |
top_level_tags: List[str] = [] | |
child_tags_by_parent: Dict[str, List[str]] = {} | |
for tag_name in tag_map: | |
parts = tag_name.split(separator) | |
if len(parts) == 1: | |
top_level_tags.append(tag_name) | |
else: | |
parent = separator.join(parts[:-1]) | |
if parent not in child_tags_by_parent: | |
child_tags_by_parent[parent] = [] | |
child_tags_by_parent[parent].append(tag_name) | |
if not top_level_tags and not child_tags_by_parent: | |
logging.warning("No valid tags remain after filtering") | |
return [{"tags": []}] | |
# Build requests | |
requests = [] | |
# 1. Top-level tags request (no rootTag) | |
if top_level_tags: | |
top_level_request = {"tags": []} | |
for tag_name in sorted(top_level_tags): | |
tag_request = {"name": tag_name} | |
if "description" in tag_map[tag_name]: | |
tag_request["description"] = tag_map[tag_name]["description"] | |
if "metadata" in tag_map[tag_name]: | |
tag_request["metadata"] = tag_map[tag_name]["metadata"] | |
top_level_request["tags"].append(tag_request) | |
requests.append(top_level_request) | |
logging.info("Created request with %d top-level tags", len(top_level_request["tags"])) | |
# 2. Process child tag requests (with rootTag) | |
# Sort parents to ensure proper order (parents created before their children) | |
processed_parents = set() | |
def process_parent_level(level: int) -> None: | |
"""Process all parents at a specific hierarchy level.""" | |
current_level_parents = [ | |
p for p in child_tags_by_parent.keys() | |
if p.count(separator) == level - 1 and p not in processed_parents | |
] | |
for parent in sorted(current_level_parents): | |
if parent not in tag_map: | |
continue | |
child_requests = [] | |
direct_children = [ | |
c for c in child_tags_by_parent.get(parent, []) | |
if c.count(separator) == level | |
] | |
for child in sorted(direct_children): | |
child_name = child.split(separator)[-1] | |
child_request = {"name": child_name} | |
if "description" in tag_map[child]: | |
child_request["description"] = tag_map[child]["description"] | |
if "metadata" in tag_map[child]: | |
child_request["metadata"] = tag_map[child]["metadata"] | |
child_requests.append(child_request) | |
if child_requests: | |
parent_request = { | |
"rootTag": {"name": parent, "deleteHierarchy": delete_hierarchy}, | |
"tags": child_requests | |
} | |
requests.append(parent_request) | |
logging.info("Created request with %d child tags under parent '%s'", | |
len(child_requests), parent) | |
processed_parents.add(parent) | |
# Process by hierarchy level to ensure correct order | |
max_depth = max([name.count(separator) for name in tag_map.keys()], default=0) + 1 | |
for level in range(1, max_depth): | |
process_parent_level(level) | |
return requests | |
def convert_immuta_tags( | |
input_file: str, | |
output_file: Optional[str] = None, | |
verbose: bool = True, | |
dry_run: bool = False, | |
separator: str = ".", | |
delete_hierarchy: bool = True, | |
log_file: Optional[str] = None, | |
exclude_pattern: Optional[str] = None, | |
include_pattern: Optional[str] = None | |
) -> bool: | |
""" | |
Convert Immuta tag data from GET /tag API format to POST /tag API format. | |
Args: | |
input_file: Path to the JSON file containing tag data from GET /tag API | |
output_file: Optional path to save the output JSON files | |
verbose: Enable detailed logging | |
dry_run: Preview output without writing to file | |
separator: Character used to denote hierarchy in tag names | |
delete_hierarchy: Whether to set deleteHierarchy flag for child tags | |
log_file: Optional path to save log output | |
exclude_pattern: Regular expression pattern to exclude tags | |
include_pattern: Regular expression pattern to include only matching tags | |
Returns: | |
Boolean indicating success or failure | |
""" | |
setup_logging(verbose, log_file) | |
logging.info("Starting tag conversion process for file: %s", input_file) | |
if delete_hierarchy: | |
logging.info("deleteHierarchy is enabled: existing tag hierarchies in the target environment will be deleted before import") | |
else: | |
logging.warning("deleteHierarchy is disabled: import may fail if tags already exist in the target environment") | |
if exclude_pattern: | |
logging.info("Exclude pattern '%s' will be applied to tags", exclude_pattern) | |
if include_pattern: | |
logging.info("Include pattern '%s' will be applied to tags", include_pattern) | |
try: | |
input_path = Path(input_file) | |
if not input_path.is_file(): | |
raise FileNotFoundError(f"Input file '{input_file}' does not exist") | |
logging.debug("Reading tag data from %s", input_file) | |
with input_path.open('r', encoding='utf-8') as f: | |
try: | |
data = json.load(f) | |
except json.JSONDecodeError as e: | |
logging.error("Invalid JSON format in input file: %s", e) | |
raise ValueError("Input file contains invalid JSON") | |
if isinstance(data, list): | |
tags_data = data | |
elif isinstance(data, dict) and 'tags' in data: | |
tags_data = data['tags'] | |
else: | |
raise ValueError("Invalid JSON format: expected a list of tags or an object with a 'tags' property") | |
logging.info("Found %d tags in the input file", len(tags_data)) | |
output_data = process_tag_hierarchy( | |
tags_data, | |
separator, | |
delete_hierarchy, | |
exclude_pattern, | |
include_pattern | |
) | |
if dry_run: | |
logging.info("Dry run mode: skipping file write") | |
for idx, request in enumerate(output_data): | |
formatted_json = json.dumps(request, indent=2) | |
logging.info("=== JSON REQUEST %d (COPY THIS TO IMMUTA API) ===\n%s\n=== END OF JSON ===", idx + 1, formatted_json) | |
elif output_file: | |
output_base = Path(output_file).stem | |
output_dir = Path(output_file).parent | |
for idx, request in enumerate(output_data): | |
formatted_json = json.dumps(request, indent=2) | |
output_path = output_dir / f"{output_base}_{idx + 1}.json" | |
logging.debug("Writing output to %s", output_path) | |
with output_path.open('w', encoding='utf-8') as f: | |
f.write(formatted_json) | |
logging.info("Converted tag data saved to %s", output_path) | |
else: | |
for idx, request in enumerate(output_data): | |
formatted_json = json.dumps(request, indent=2) | |
logging.info("=== JSON REQUEST %d (COPY THIS TO IMMUTA API) ===\n%s\n=== END OF JSON ===", idx + 1, formatted_json) | |
logging.info( | |
"Generated %d separate requests for import. Submit each request to the Immuta API in order (1, 2, 3, ...) to ensure parent tags are created before children.", | |
len(output_data) | |
) | |
return True | |
except Exception as e: | |
logging.error("Conversion failed: %s", e, exc_info=True) | |
return False | |
def print_usage() -> None: | |
"""Print usage instructions for the script.""" | |
print("Immuta Tag Converter") | |
print("====================") | |
print("Converts Immuta Tag API GET response to POST /tag API format.") | |
print("\nUsage: python convert_immuta_tags.py input_file.json [output_file.json] [options]") | |
print("\nOptions:") | |
print(" input_file.json - Path to the JSON file from GET /tag API") | |
print(" output_file.json - (Optional) Path to save the output JSON") | |
print(" --verbose - Enable detailed logging (default: true)") | |
print(" --no-verbose - Disable detailed logging") | |
print(" --dry-run - Preview output without writing to file") | |
print(" --separator STR - Hierarchy separator (default: '.')") | |
print(" --no-delete-hierarchy - Disable deleteHierarchy for child tags") | |
print(" --exclude-tags REGEX - Exclude tags matching the regular expression pattern (e.g., '^Discovered($|\\..*)')") | |
print(" --include-tags REGEX - Include only tags matching the regular expression pattern") | |
print(" --log-file FILE - Save logs to a file") | |
print("\nIf output_file is not provided, the result will be printed to console.") | |
print("\nExamples:") | |
print(" python convert_immuta_tags.py tags.json --dry-run") | |
print(" python convert_immuta_tags.py tags.json output.json --include-tags '^Data'") | |
print(" python convert_immuta_tags.py tags.json --exclude-tags 'Test' --separator '/'") | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="Convert Immuta Tag API GET response to POST /tag API format", | |
add_help=False | |
) | |
parser.add_argument("input_file", help="Path to the JSON file from GET /tag API") | |
parser.add_argument("output_file", nargs="?", default=None, help="Path to save the output JSON (optional)") | |
parser.add_argument("--verbose", action="store_true", default=True, help="Enable detailed logging") | |
parser.add_argument("--no-verbose", dest="verbose", action="store_false", help="Disable detailed logging") | |
parser.add_argument("--dry-run", action="store_true", help="Preview output without writing to file") | |
parser.add_argument("--separator", default=".", help="Hierarchy separator (default: '.')") | |
parser.add_argument("--no-delete-hierarchy", dest="delete_hierarchy", action="store_false", default=True, | |
help="Disable deleteHierarchy for child tags") | |
parser.add_argument("--exclude-tags", help="Regular expression pattern to exclude tags (e.g., '^Discovered($|\\..*)')") | |
parser.add_argument("--include-tags", help="Regular expression pattern to include only matching tags") | |
parser.add_argument("--log-file", help="Save logs to a file") | |
parser.add_argument("-h", "--help", action="store_true", help="Show this help message and exit") | |
args = parser.parse_args() | |
if args.help: | |
print_usage() | |
sys.exit(0) | |
if not Path(args.input_file).is_file(): | |
print(f"❌ Error: Input file '{args.input_file}' not found") | |
sys.exit(1) | |
success = convert_immuta_tags( | |
input_file=args.input_file, | |
output_file=args.output_file, | |
verbose=args.verbose, | |
dry_run=args.dry_run, | |
separator=args.separator, | |
delete_hierarchy=args.delete_hierarchy, | |
log_file=args.log_file, | |
exclude_pattern=args.exclude_tags, | |
include_pattern=args.include_tags | |
) | |
sys.exit(0 if success else 1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment