Skip to content

Instantly share code, notes, and snippets.

@davidlu1001
Last active May 14, 2025 07:32
Show Gist options
  • Save davidlu1001/671802b2653b5398eb4c0512baa281f6 to your computer and use it in GitHub Desktop.
Save davidlu1001/671802b2653b5398eb4c0512baa281f6 to your computer and use it in GitHub Desktop.
convert_immuta_tags.py
#!/usr/bin/env python3
"""
Immuta Tag Converter - Converts Immuta Tag API GET response to POST /tag API format.
This script processes tag data from Immuta's GET /tag API response and converts it into
the format required for POST /tag API requests, handling tag hierarchies and structure
according to Immuta API requirements.
Features:
- Support for tag hierarchies with custom separators
- Include/exclude pattern filtering for targeted tag processing
- Proper handling of top-level tags (no rootTag as per API requirements)
- Preservation of tag descriptions and metadata
- Option to control the deleteHierarchy flag
Reference: https://documentation.immuta.com/latest/developer-guides/api-intro/immuta-v1-api/configure-your-instance-of-immuta/tagging
"""
import json
import logging
import sys
import re
from pathlib import Path
from typing import List, Dict, Optional, Set
def setup_logging(verbose: bool = True, log_file: Optional[str] = None) -> None:
"""Configure logging settings."""
level = logging.DEBUG if verbose else logging.INFO
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
handlers.append(logging.FileHandler(log_file))
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=handlers
)
def validate_tag_name(tag_name: str, separator: str) -> None:
"""
Validate tag name according to Immuta constraints.
Args:
tag_name: The tag name to validate
separator: Character used to denote hierarchy in tag names
Raises:
ValueError: If tag name is invalid
"""
if not tag_name or not all(part for part in tag_name.split(separator)):
raise ValueError("Tag name cannot be empty or contain consecutive separators")
if any(c in "<>[]{}|\\" for c in tag_name):
raise ValueError("Tag name contains invalid characters (<>[]{}|\\)")
if len(tag_name) > 255:
raise ValueError(f"Tag name '{tag_name}' exceeds maximum length of 255 characters")
def process_tag_hierarchy(
tags_data: List[Dict],
separator: str = ".",
delete_hierarchy: bool = True,
exclude_pattern: Optional[str] = None,
include_pattern: Optional[str] = None
) -> List[Dict]:
"""
Process tag data into a format suitable for POST /tag API, handling full hierarchies
and structuring requests according to Immuta API requirements.
Args:
tags_data: List of tag objects from GET /tag response.
separator: Character used to denote hierarchy in tag names.
delete_hierarchy: Whether to set deleteHierarchy: true for rootTag.
exclude_pattern: Regular expression pattern to exclude tags (and their children).
include_pattern: Regular expression pattern to include only matching tags (and their children).
Returns:
List of Dicts, each formatted for the Immuta API:
- First request has only 'tags' array for top-level tags (no rootTag)
- Subsequent requests have 'rootTag' and 'tags' for child hierarchies
"""
if not tags_data:
logging.warning("No tags found in input data")
return [{"tags": []}]
# Compile the exclude and include patterns if provided
exclude_regex = None
if exclude_pattern:
try:
exclude_regex = re.compile(exclude_pattern)
except re.error as e:
logging.error("Invalid exclude pattern '%s': %s", exclude_pattern, e)
raise ValueError(f"Invalid exclude pattern: {e}")
include_regex = None
if include_pattern:
try:
include_regex = re.compile(include_pattern)
except re.error as e:
logging.error("Invalid include pattern '%s': %s", include_pattern, e)
raise ValueError(f"Invalid include pattern: {e}")
# Initial filtering: Remove tags explicitly matching exclude pattern
filtered_tags = []
excluded_by_pattern: Set[str] = set()
for tag in tags_data:
if "name" not in tag:
logging.warning("Skipping tag with missing 'name' field: %s", tag)
continue
tag_name = tag["name"]
if exclude_regex and exclude_regex.search(tag_name):
logging.info("Excluding tag '%s' due to matching exclude pattern", tag_name)
excluded_by_pattern.add(tag_name)
continue
filtered_tags.append(tag)
# Build tag map with validation
tag_map: Dict[str, Dict] = {}
for tag in filtered_tags:
tag_name = tag["name"]
try:
validate_tag_name(tag_name, separator)
if tag_name in tag_map:
raise ValueError(f"Duplicate tag name '{tag_name}' detected")
except ValueError as e:
logging.error("Invalid tag name: %s", e)
continue
tag_map[tag_name] = tag
# Apply include pattern filtering with ancestor preservation
if include_regex:
matching_tags: Set[str] = set()
# First identify all tags that match the include pattern
for tag_name in list(tag_map.keys()):
if include_regex.search(tag_name):
matching_tags.add(tag_name)
# Add all ancestors
parts = tag_name.split(separator)
for i in range(1, len(parts)):
ancestor = separator.join(parts[:i])
matching_tags.add(ancestor)
# Then filter out tags that don't match and aren't ancestors of matching tags
for tag_name in list(tag_map.keys()):
if tag_name not in matching_tags:
logging.info("Excluding tag '%s' due to not matching include pattern", tag_name)
del tag_map[tag_name]
# Exclude tags whose ancestors were excluded
for tag_name in list(tag_map.keys()):
parts = tag_name.split(separator)
for i in range(1, len(parts)):
ancestor = separator.join(parts[:i])
if ancestor in excluded_by_pattern:
logging.info("Excluding tag '%s' because ancestor '%s' was excluded",
tag_name, ancestor)
del tag_map[tag_name]
break
# Organize tags by level and parent-child relationships
top_level_tags: List[str] = []
child_tags_by_parent: Dict[str, List[str]] = {}
for tag_name in tag_map:
parts = tag_name.split(separator)
if len(parts) == 1:
top_level_tags.append(tag_name)
else:
parent = separator.join(parts[:-1])
if parent not in child_tags_by_parent:
child_tags_by_parent[parent] = []
child_tags_by_parent[parent].append(tag_name)
if not top_level_tags and not child_tags_by_parent:
logging.warning("No valid tags remain after filtering")
return [{"tags": []}]
# Build requests
requests = []
# 1. Top-level tags request (no rootTag)
if top_level_tags:
top_level_request = {"tags": []}
for tag_name in sorted(top_level_tags):
tag_request = {"name": tag_name}
if "description" in tag_map[tag_name]:
tag_request["description"] = tag_map[tag_name]["description"]
if "metadata" in tag_map[tag_name]:
tag_request["metadata"] = tag_map[tag_name]["metadata"]
top_level_request["tags"].append(tag_request)
requests.append(top_level_request)
logging.info("Created request with %d top-level tags", len(top_level_request["tags"]))
# 2. Process child tag requests (with rootTag)
# Sort parents to ensure proper order (parents created before their children)
processed_parents = set()
def process_parent_level(level: int) -> None:
"""Process all parents at a specific hierarchy level."""
current_level_parents = [
p for p in child_tags_by_parent.keys()
if p.count(separator) == level - 1 and p not in processed_parents
]
for parent in sorted(current_level_parents):
if parent not in tag_map:
continue
child_requests = []
direct_children = [
c for c in child_tags_by_parent.get(parent, [])
if c.count(separator) == level
]
for child in sorted(direct_children):
child_name = child.split(separator)[-1]
child_request = {"name": child_name}
if "description" in tag_map[child]:
child_request["description"] = tag_map[child]["description"]
if "metadata" in tag_map[child]:
child_request["metadata"] = tag_map[child]["metadata"]
child_requests.append(child_request)
if child_requests:
parent_request = {
"rootTag": {"name": parent, "deleteHierarchy": delete_hierarchy},
"tags": child_requests
}
requests.append(parent_request)
logging.info("Created request with %d child tags under parent '%s'",
len(child_requests), parent)
processed_parents.add(parent)
# Process by hierarchy level to ensure correct order
max_depth = max([name.count(separator) for name in tag_map.keys()], default=0) + 1
for level in range(1, max_depth):
process_parent_level(level)
return requests
def convert_immuta_tags(
input_file: str,
output_file: Optional[str] = None,
verbose: bool = True,
dry_run: bool = False,
separator: str = ".",
delete_hierarchy: bool = True,
log_file: Optional[str] = None,
exclude_pattern: Optional[str] = None,
include_pattern: Optional[str] = None
) -> bool:
"""
Convert Immuta tag data from GET /tag API format to POST /tag API format.
Args:
input_file: Path to the JSON file containing tag data from GET /tag API
output_file: Optional path to save the output JSON files
verbose: Enable detailed logging
dry_run: Preview output without writing to file
separator: Character used to denote hierarchy in tag names
delete_hierarchy: Whether to set deleteHierarchy flag for child tags
log_file: Optional path to save log output
exclude_pattern: Regular expression pattern to exclude tags
include_pattern: Regular expression pattern to include only matching tags
Returns:
Boolean indicating success or failure
"""
setup_logging(verbose, log_file)
logging.info("Starting tag conversion process for file: %s", input_file)
if delete_hierarchy:
logging.info("deleteHierarchy is enabled: existing tag hierarchies in the target environment will be deleted before import")
else:
logging.warning("deleteHierarchy is disabled: import may fail if tags already exist in the target environment")
if exclude_pattern:
logging.info("Exclude pattern '%s' will be applied to tags", exclude_pattern)
if include_pattern:
logging.info("Include pattern '%s' will be applied to tags", include_pattern)
try:
input_path = Path(input_file)
if not input_path.is_file():
raise FileNotFoundError(f"Input file '{input_file}' does not exist")
logging.debug("Reading tag data from %s", input_file)
with input_path.open('r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError as e:
logging.error("Invalid JSON format in input file: %s", e)
raise ValueError("Input file contains invalid JSON")
if isinstance(data, list):
tags_data = data
elif isinstance(data, dict) and 'tags' in data:
tags_data = data['tags']
else:
raise ValueError("Invalid JSON format: expected a list of tags or an object with a 'tags' property")
logging.info("Found %d tags in the input file", len(tags_data))
output_data = process_tag_hierarchy(
tags_data,
separator,
delete_hierarchy,
exclude_pattern,
include_pattern
)
if dry_run:
logging.info("Dry run mode: skipping file write")
for idx, request in enumerate(output_data):
formatted_json = json.dumps(request, indent=2)
logging.info("=== JSON REQUEST %d (COPY THIS TO IMMUTA API) ===\n%s\n=== END OF JSON ===", idx + 1, formatted_json)
elif output_file:
output_base = Path(output_file).stem
output_dir = Path(output_file).parent
for idx, request in enumerate(output_data):
formatted_json = json.dumps(request, indent=2)
output_path = output_dir / f"{output_base}_{idx + 1}.json"
logging.debug("Writing output to %s", output_path)
with output_path.open('w', encoding='utf-8') as f:
f.write(formatted_json)
logging.info("Converted tag data saved to %s", output_path)
else:
for idx, request in enumerate(output_data):
formatted_json = json.dumps(request, indent=2)
logging.info("=== JSON REQUEST %d (COPY THIS TO IMMUTA API) ===\n%s\n=== END OF JSON ===", idx + 1, formatted_json)
logging.info(
"Generated %d separate requests for import. Submit each request to the Immuta API in order (1, 2, 3, ...) to ensure parent tags are created before children.",
len(output_data)
)
return True
except Exception as e:
logging.error("Conversion failed: %s", e, exc_info=True)
return False
def print_usage() -> None:
"""Print usage instructions for the script."""
print("Immuta Tag Converter")
print("====================")
print("Converts Immuta Tag API GET response to POST /tag API format.")
print("\nUsage: python convert_immuta_tags.py input_file.json [output_file.json] [options]")
print("\nOptions:")
print(" input_file.json - Path to the JSON file from GET /tag API")
print(" output_file.json - (Optional) Path to save the output JSON")
print(" --verbose - Enable detailed logging (default: true)")
print(" --no-verbose - Disable detailed logging")
print(" --dry-run - Preview output without writing to file")
print(" --separator STR - Hierarchy separator (default: '.')")
print(" --no-delete-hierarchy - Disable deleteHierarchy for child tags")
print(" --exclude-tags REGEX - Exclude tags matching the regular expression pattern (e.g., '^Discovered($|\\..*)')")
print(" --include-tags REGEX - Include only tags matching the regular expression pattern")
print(" --log-file FILE - Save logs to a file")
print("\nIf output_file is not provided, the result will be printed to console.")
print("\nExamples:")
print(" python convert_immuta_tags.py tags.json --dry-run")
print(" python convert_immuta_tags.py tags.json output.json --include-tags '^Data'")
print(" python convert_immuta_tags.py tags.json --exclude-tags 'Test' --separator '/'")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Convert Immuta Tag API GET response to POST /tag API format",
add_help=False
)
parser.add_argument("input_file", help="Path to the JSON file from GET /tag API")
parser.add_argument("output_file", nargs="?", default=None, help="Path to save the output JSON (optional)")
parser.add_argument("--verbose", action="store_true", default=True, help="Enable detailed logging")
parser.add_argument("--no-verbose", dest="verbose", action="store_false", help="Disable detailed logging")
parser.add_argument("--dry-run", action="store_true", help="Preview output without writing to file")
parser.add_argument("--separator", default=".", help="Hierarchy separator (default: '.')")
parser.add_argument("--no-delete-hierarchy", dest="delete_hierarchy", action="store_false", default=True,
help="Disable deleteHierarchy for child tags")
parser.add_argument("--exclude-tags", help="Regular expression pattern to exclude tags (e.g., '^Discovered($|\\..*)')")
parser.add_argument("--include-tags", help="Regular expression pattern to include only matching tags")
parser.add_argument("--log-file", help="Save logs to a file")
parser.add_argument("-h", "--help", action="store_true", help="Show this help message and exit")
args = parser.parse_args()
if args.help:
print_usage()
sys.exit(0)
if not Path(args.input_file).is_file():
print(f"❌ Error: Input file '{args.input_file}' not found")
sys.exit(1)
success = convert_immuta_tags(
input_file=args.input_file,
output_file=args.output_file,
verbose=args.verbose,
dry_run=args.dry_run,
separator=args.separator,
delete_hierarchy=args.delete_hierarchy,
log_file=args.log_file,
exclude_pattern=args.exclude_tags,
include_pattern=args.include_tags
)
sys.exit(0 if success else 1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment