Last active
May 29, 2025 04:30
-
-
Save davidlu1001/092123be251e982c3b1bef9073c3a242 to your computer and use it in GitHub Desktop.
column_tag_del.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Enterprise Immuta Column Tag Removal Tool | |
## 📋 Overview | |
The **Enterprise Immuta Column Tag Removal Tool** (`column_tag_del.py`) is a high-performance, production-ready Python script designed to remove tags from Immuta columns based on CSV input. This tool is the reverse operation of `column_tag_add.py` and features enterprise-grade reliability, comprehensive error handling, and advanced monitoring capabilities. | |
### 🎯 Key Features | |
- **🏢 Enterprise-Grade**: Circuit breaker, retry logic, adaptive rate limiting | |
- **📊 Progress Tracking**: SQLite-based persistence with resume functionality | |
- **🔄 Resume Operations**: Continue from where you left off after interruptions | |
- **🛡️ Robust Error Handling**: Comprehensive validation and graceful degradation | |
- **📈 Performance Optimized**: Intelligent caching and batched operations | |
- **🔍 Dry Run Mode**: Preview operations without making changes | |
- **📝 Detailed Logging**: Multi-level logging with rotation and error tracking | |
- **🚀 High Performance**: Parallel processing with optimized API usage | |
## 🏗️ Architecture | |
### Core Components | |
``` | |
┌─────────────────────────────────────────────────────────────┐ | |
│ Main Orchestrator │ | |
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │ | |
│ │ CSV Loader │ │ Validator │ │ Processor │ │ | |
│ └─────────────────┘ └─────────────────┘ └──────────────┘ │ | |
└─────────────────────────────────────────────────────────────┘ | |
│ │ │ | |
▼ ▼ ▼ | |
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ | |
│ Progress Tracker│ │ Rate Limiter │ │ API Client │ | |
│ (SQLite DB) │ │ (Adaptive) │ │ (Circuit Breaker)│ | |
└─────────────────┘ └─────────────────┘ └─────────────────┘ | |
``` | |
### Enterprise Patterns | |
1. **Circuit Breaker**: Prevents cascade failures during API outages | |
2. **Adaptive Rate Limiting**: Adjusts speed based on error patterns | |
3. **Progress Persistence**: SQLite database tracks all operations | |
4. **Graceful Shutdown**: Signal handling for clean termination | |
5. **Health Monitoring**: Periodic API health checks | |
## 📋 Requirements | |
### System Requirements | |
- **Python**: 3.8 or higher | |
- **Memory**: Minimum 512MB RAM (2GB+ recommended for large datasets) | |
- **Storage**: 100MB free space for logs and progress database | |
- **Network**: Stable connection to Immuta instance | |
### Python Dependencies | |
```bash | |
pip install requests | |
# All other dependencies are built-in Python modules | |
``` | |
### Immuta Requirements | |
- **API Version**: Immuta v1 API version 2024.2 | |
- **Permissions**: Tag management permissions on target data sources | |
- **Authentication**: Valid API key with appropriate access | |
## 📊 CSV Input Format | |
### Required Columns | |
| Column Name | Description | Example | | |
| ----------------------- | ----------------------- | ------------------------- | | |
| `database` | Source database name | `PROD_DB` | | |
| `schema` | Schema name | `SALES` | | |
| `table` | Table name | `CUSTOMERS` | | |
| `column` | Column name | `EMAIL` | | |
| `immuta_tag` | Tag to remove | `XXX-Classification.PII` | | |
| `immuta_schema_project` | Immuta schema project | `PROD_DB-SALES` | | |
| `immuta_datasource` | Immuta data source name | `PROD_DB-SALES-CUSTOMERS` | | |
| `immuta_sql_schema` | SQL schema identifier | `prod_db-sales` | | |
### Sample CSV File | |
```csv | |
database,schema,table,column,immuta_tag,immuta_schema_project,immuta_datasource,immuta_sql_schema | |
PROD_DB,SALES,CUSTOMERS,EMAIL,XXX-Classification.PII,PROD_DB-SALES,PROD_DB-SALES-CUSTOMERS,prod_db-sales | |
PROD_DB,SALES,CUSTOMERS,PHONE,XXX-Classification.PII,PROD_DB-SALES,PROD_DB-SALES-CUSTOMERS,prod_db-sales | |
PROD_DB,HR,EMPLOYEES,SSN,XXX-Classification.Sensitive,PROD_DB-HR,PROD_DB-HR-EMPLOYEES,prod_db-hr | |
``` | |
## 🚀 Usage | |
### Basic Usage | |
```bash | |
# Remove tags with default settings | |
python column_tag_del.py input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY | |
``` | |
### Advanced Usage | |
```bash | |
# Dry run to preview operations | |
python column_tag_del.py input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY \ | |
--dry-run | |
# Custom rate limiting and progress tracking | |
python column_tag_del.py input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY \ | |
--sleep 0.5 \ | |
--progress-db custom_progress.db | |
# Resume from previous run | |
python column_tag_del.py input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY \ | |
--resume | |
# Export results for analysis | |
python column_tag_del.py input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY \ | |
--export-results removal_results.csv | |
``` | |
## ⚙️ Configuration Options | |
### Command Line Arguments | |
| Argument | Type | Default | Description | | |
| -------------------------- | -------- | ------------------------- | ------------------------------------------ | | |
| `csv_file` | Required | - | CSV file containing tag removal data | | |
| `--immuta-url` | Required | - | Immuta instance URL | | |
| `--api-key` | Required | - | Immuta API key | | |
| `--dry-run` | Flag | False | Preview operations without changes | | |
| `--sleep` | Float | 0.2 | Sleep interval between API calls (seconds) | | |
| `--adaptive-rate-limiting` | Flag | True | Enable adaptive rate limiting | | |
| `--progress-db` | String | `tag_removal_progress.db` | SQLite progress database | | |
| `--resume` | Flag | False | Resume from previous run | | |
| `--log-level` | Choice | INFO | Logging level (DEBUG/INFO/WARNING/ERROR) | | |
| `--log-dir` | String | `./logs` | Directory for log files | | |
| `--export-results` | String | - | Export results to CSV file | | |
### Environment Variables | |
```bash | |
# Optional: Set environment variables | |
export IMMUTA_URL="https://immuta.company.com" | |
export IMMUTA_API_KEY="your-api-key-here" | |
# Then use without --immuta-url and --api-key flags | |
python column_tag_del.py input.csv | |
``` | |
## 📊 Progress Tracking | |
### SQLite Database Schema | |
The tool creates a SQLite database to track progress: | |
```sql | |
-- Main progress tracking table | |
CREATE TABLE tag_removal_status ( | |
row_id INTEGER PRIMARY KEY, | |
database_name TEXT, | |
schema_name TEXT, | |
table_name TEXT, | |
column_name TEXT, | |
tag_name TEXT, | |
status TEXT, -- 'pending', 'success', 'failed', 'skipped', 'not_found' | |
error_message TEXT, | |
processed_at TEXT, | |
retry_count INTEGER DEFAULT 0, | |
tag_existed BOOLEAN DEFAULT FALSE | |
); | |
-- Execution metadata table | |
CREATE TABLE removal_execution_metadata ( | |
id INTEGER PRIMARY KEY, | |
session_id TEXT, | |
start_time TEXT, | |
end_time TEXT, | |
total_records INTEGER, | |
completed_records INTEGER, | |
failed_records INTEGER, | |
skipped_records INTEGER, | |
not_found_records INTEGER, | |
environment TEXT, | |
script_version TEXT | |
); | |
``` | |
### Status Values | |
| Status | Description | | |
| ----------- | -------------------------------------------- | | |
| `pending` | Not yet processed | | |
| `success` | Tag successfully removed | | |
| `failed` | Removal failed with error | | |
| `skipped` | Skipped due to validation failure | | |
| `not_found` | Tag not found on column (considered success) | | |
## 📝 Logging | |
### Log Files | |
The tool creates multiple log files for different purposes: | |
``` | |
logs/ | |
├── immuta_tag_removal_20241128_143022.log # Main log with all details | |
├── immuta_tag_removal_errors_20241128_143022.log # Error-only log | |
└── ... | |
``` | |
### Log Levels | |
- **DEBUG**: Detailed API calls and internal operations | |
- **INFO**: Progress updates and general information | |
- **WARNING**: Non-critical issues and warnings | |
- **ERROR**: Failures and critical issues | |
### Sample Log Output | |
``` | |
2024-11-28 14:30:22,580 - INFO - ================================================================================ | |
2024-11-28 14:30:22,580 - INFO - ENTERPRISE IMMUTA COLUMN TAG REMOVAL TOOL | |
2024-11-28 14:30:22,580 - INFO - ================================================================================ | |
2024-11-28 14:30:22,580 - INFO - CSV File: input.csv | |
2024-11-28 14:30:22,580 - INFO - Immuta URL: https://immuta.company.com | |
2024-11-28 14:30:22,580 - INFO - Dry Run: False | |
2024-11-28 14:30:22,580 - INFO - Sleep Interval: 0.2s | |
2024-11-28 14:30:22,580 - INFO - Progress Database: tag_removal_progress.db | |
2024-11-28 14:30:22,580 - INFO - ================================================================================ | |
2024-11-28 14:30:22,581 - INFO - Successfully loaded 150 tag removal records from CSV | |
2024-11-28 14:30:22,582 - INFO - Starting comprehensive validation of reference data for tag removal... | |
2024-11-28 14:30:22,582 - INFO - Validating 5 schema projects, 12 data sources, 8 tags for removal | |
## 🔄 Operation Flow | |
### 1. Initialization Phase | |
``` | |
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ | |
│ Parse Args │ -> │ Setup Logging │ -> │ Load CSV Data │ | |
└─────────────────┘ └─────────────────┘ └─────────────────┘ | |
``` | |
### 2. Validation Phase | |
``` | |
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ | |
│ Validate Schema │ -> │ Validate Data │ -> │ Validate Tags │ | |
│ Projects │ │ Sources │ │ for Removal │ | |
└─────────────────┘ └─────────────────┘ └─────────────────┘ | |
``` | |
### 3. Processing Phase | |
``` | |
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ | |
│ Group by │ -> │ Process Each │ -> │ Update Progress│ | |
│ Column │ │ Tag Removal │ │ Database │ | |
└─────────────────┘ └─────────────────┘ └─────────────────┘ | |
``` | |
## 🛡️ Error Handling | |
### Error Categories | |
#### 1. **Validation Errors** | |
- **Schema Project Not Found**: Invalid or deleted schema project | |
- **Data Source Not Found**: Data source doesn't exist or is deleted | |
- **Column Not Found**: Column doesn't exist in the data source | |
- **Tag Not Found**: Tag doesn't exist in the system (warning, not failure) | |
#### 2. **API Errors** | |
- **Authentication Errors**: Invalid API key or expired token | |
- **Rate Limiting**: Too many requests (handled with backoff) | |
- **Server Errors**: Immuta server issues (retried with exponential backoff) | |
- **Network Errors**: Connection timeouts or network issues | |
#### 3. **System Errors** | |
- **File Not Found**: CSV file doesn't exist | |
- **Permission Errors**: Insufficient file system permissions | |
- **Memory Errors**: Insufficient system memory | |
### Error Recovery Strategies | |
#### Circuit Breaker Pattern | |
```python | |
# Automatic circuit breaker activation | |
if consecutive_failures >= 10: | |
circuit_breaker_opened = True | |
wait_time = 5_minutes | |
# Automatic recovery attempt | |
if time_since_opened > circuit_breaker_timeout: | |
circuit_breaker_opened = False | |
``` | |
#### Retry Logic | |
```python | |
# Exponential backoff for retries | |
for attempt in range(max_retries): | |
try: | |
result = api_call() | |
break | |
except Exception: | |
wait_time = min(2 ** attempt, 60) # Max 60 seconds | |
time.sleep(wait_time) | |
``` | |
#### Adaptive Rate Limiting | |
```python | |
# Adjust rate based on error patterns | |
if error_rate > 10%: | |
sleep_interval *= 2 # Slow down | |
elif error_rate < 1%: | |
sleep_interval *= 0.8 # Speed up | |
``` | |
## 📈 Performance Optimization | |
### Caching Strategy | |
#### 1. **Reference Data Caching** | |
- Schema projects cached after validation | |
- Data source IDs cached to avoid repeated lookups | |
- Column metadata cached per data source | |
#### 2. **Column Tags Caching** | |
```python | |
# Cache key format: "data_source_id|column_name" | |
cache_key = f"{data_source_id}|{column_name}" | |
existing_tags = column_tags_cache.get(cache_key) | |
``` | |
#### 3. **Batch Processing** | |
- Records grouped by column for efficient processing | |
- Single API call to get all tags for a column | |
- Batch validation of reference data | |
### Performance Metrics | |
| Metric | Typical Value | Optimized Value | | |
| ----------------------------- | ----------------- | ------------------ | | |
| **API Calls per Tag Removal** | 2-3 calls | 1 call | | |
| **Processing Rate** | 100-200 tags/hour | 500-1000 tags/hour | | |
| **Memory Usage** | 50-100MB | 20-50MB | | |
| **Cache Hit Rate** | N/A | 80-95% | | |
## 🔍 Monitoring and Analytics | |
### Real-Time Progress Monitoring | |
```bash | |
# Monitor progress in real-time | |
tail -f logs/immuta_tag_removal_*.log | grep "Progress:" | |
# Example output: | |
# Progress: 250/1000 column groups (25.0%) - Rate: 450.2 columns/hour - ETA: 0:01:40 | |
``` | |
### Progress Database Queries | |
```sql | |
-- Check current status distribution | |
SELECT status, COUNT(*) as count | |
FROM tag_removal_status | |
GROUP BY status; | |
-- Find failed records | |
SELECT row_id, database_name, table_name, column_name, tag_name, error_message | |
FROM tag_removal_status | |
WHERE status = 'failed' | |
ORDER BY retry_count DESC; | |
-- Calculate success rate | |
SELECT | |
COUNT(*) as total, | |
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful, | |
ROUND(100.0 * SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) / COUNT(*), 2) as success_rate | |
FROM tag_removal_status; | |
``` | |
### Export and Analysis | |
```bash | |
# Export results for analysis | |
python column_tag_del.py input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY \ | |
--export-results analysis.csv | |
# The exported CSV contains: | |
# - All processing results | |
# - Error messages | |
# - Retry counts | |
# - Processing timestamps | |
``` | |
## 🚨 Troubleshooting | |
### Common Issues and Solutions | |
#### 1. **"Circuit breaker is open" Error** | |
**Cause**: Too many consecutive API failures | |
**Solution**: | |
```bash | |
# Wait 5 minutes for automatic reset, or | |
# Check Immuta server status | |
# Verify API key permissions | |
# Reduce rate limiting with --sleep 1.0 | |
``` | |
#### 2. **"Tag not found" Warnings** | |
**Cause**: Tag already removed or doesn't exist | |
**Solution**: This is normal and considered success | |
#### 3. **High Memory Usage** | |
**Cause**: Large CSV files or extensive caching | |
**Solution**: | |
```bash | |
# Process in smaller batches | |
# Increase system memory | |
# Monitor with: ps aux | grep python | |
``` | |
#### 4. **Slow Processing Speed** | |
**Cause**: Rate limiting or network latency | |
**Solution**: | |
```bash | |
# Reduce sleep interval (carefully) | |
python column_tag_del.py input.csv --sleep 0.1 | |
# Enable adaptive rate limiting | |
python column_tag_del.py input.csv --adaptive-rate-limiting | |
``` | |
### Debug Mode | |
```bash | |
# Enable detailed debugging | |
python column_tag_del.py input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY \ | |
--log-level DEBUG | |
# This provides: | |
# - Detailed API request/response logs | |
# - Cache hit/miss statistics | |
# - Internal state information | |
# - Performance timing data | |
``` | |
## 🔐 Security Considerations | |
### API Key Management | |
#### Best Practices | |
```bash | |
# Use environment variables (recommended) | |
export IMMUTA_API_KEY="your-secure-api-key" | |
python column_tag_del.py input.csv --immuta-url https://immuta.company.com | |
# Use secure file permissions | |
chmod 600 .env | |
echo "IMMUTA_API_KEY=your-secure-api-key" > .env | |
source .env | |
``` | |
#### Security Features | |
- **No API Key Logging**: API keys are never written to log files | |
- **Secure Headers**: Proper authentication headers with Bearer tokens | |
- **HTTPS Only**: All API communications use HTTPS | |
- **Session Management**: Proper session handling and cleanup | |
### Data Privacy | |
#### Sensitive Data Handling | |
- **No Data Logging**: Column data is never logged or cached | |
- **Metadata Only**: Only column names and tag names are processed | |
- **Secure Cleanup**: Temporary data is securely cleaned up | |
- **Progress Encryption**: Consider encrypting progress database for sensitive environments | |
## 📋 Best Practices | |
### Pre-Execution Checklist | |
#### 1. **Environment Preparation** | |
- [ ] Verify Python 3.8+ installation | |
- [ ] Install required dependencies | |
- [ ] Test network connectivity to Immuta | |
- [ ] Validate API key permissions | |
#### 2. **Data Preparation** | |
- [ ] Validate CSV format and required columns | |
- [ ] Check for duplicate records | |
- [ ] Verify tag names and data source names | |
- [ ] Backup original CSV file | |
#### 3. **Execution Planning** | |
- [ ] Start with dry-run mode | |
- [ ] Plan for appropriate rate limiting | |
- [ ] Ensure sufficient disk space for logs | |
- [ ] Schedule during low-usage periods | |
### Operational Guidelines | |
#### 1. **Small Scale Testing** | |
```bash | |
# Test with a small subset first | |
head -10 large_input.csv > test_input.csv | |
python column_tag_del.py test_input.csv --dry-run | |
``` | |
#### 2. **Production Execution** | |
```bash | |
# Use conservative settings for production | |
python column_tag_del.py input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY \ | |
--sleep 0.5 \ | |
--adaptive-rate-limiting \ | |
--export-results results.csv | |
``` | |
#### 3. **Monitoring During Execution** | |
```bash | |
# Monitor in separate terminal | |
watch -n 30 'tail -5 logs/immuta_tag_removal_*.log' | |
# Check progress database | |
sqlite3 tag_removal_progress.db "SELECT status, COUNT(*) FROM tag_removal_status GROUP BY status;" | |
``` | |
## 🔄 Integration Examples | |
### CI/CD Pipeline Integration | |
#### GitHub Actions Example | |
```yaml | |
name: Immuta Tag Removal | |
on: | |
workflow_dispatch: | |
inputs: | |
csv_file: | |
description: 'CSV file path' | |
required: true | |
dry_run: | |
description: 'Dry run mode' | |
type: boolean | |
default: true | |
jobs: | |
remove_tags: | |
runs-on: ubuntu-latest | |
steps: | |
- uses: actions/checkout@v3 | |
- name: Setup Python | |
uses: actions/setup-python@v4 | |
with: | |
python-version: '3.9' | |
- name: Install dependencies | |
run: pip install requests | |
- name: Remove Immuta tags | |
env: | |
IMMUTA_API_KEY: ${{ secrets.IMMUTA_API_KEY }} | |
run: | | |
python column_tag_del.py ${{ github.event.inputs.csv_file }} \ | |
--immuta-url ${{ vars.IMMUTA_URL }} \ | |
--api-key $IMMUTA_API_KEY \ | |
${{ github.event.inputs.dry_run && '--dry-run' || '' }} \ | |
--export-results results.csv | |
- name: Upload results | |
uses: actions/upload-artifact@v3 | |
with: | |
name: tag-removal-results | |
path: | | |
results.csv | |
logs/ | |
``` | |
### Cron Job Example | |
```bash | |
#!/bin/bash | |
# /etc/cron.d/immuta-tag-cleanup | |
# Run daily tag cleanup at 2 AM | |
0 2 * * * /usr/bin/python3 /opt/scripts/column_tag_del.py \ | |
/data/daily_tag_removals.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key $(cat /secure/immuta_api_key) \ | |
--log-dir /var/log/immuta \ | |
--export-results /data/results/$(date +%Y%m%d)_removal_results.csv | |
``` | |
### Docker Integration | |
```dockerfile | |
FROM python:3.9-slim | |
WORKDIR /app | |
COPY column_tag_del.py . | |
RUN pip install requests | |
ENTRYPOINT ["python", "column_tag_del.py"] | |
``` | |
```bash | |
# Build and run with Docker | |
docker build -t immuta-tag-removal . | |
docker run -v $(pwd)/data:/data immuta-tag-removal \ | |
/data/input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY \ | |
--export-results /data/results.csv | |
``` | |
## 📊 Performance Tuning | |
### Optimization Strategies | |
#### 1. **Rate Limiting Optimization** | |
```bash | |
# Conservative (safe for production) | |
--sleep 0.5 | |
# Moderate (good balance) | |
--sleep 0.2 | |
# Aggressive (use with caution) | |
--sleep 0.1 | |
``` | |
#### 2. **Memory Optimization** | |
```bash | |
# For large datasets, process in chunks | |
split -l 1000 large_input.csv chunk_ | |
for file in chunk_*; do | |
python column_tag_del.py "$file" --resume | |
done | |
``` | |
#### 3. **Parallel Processing** | |
```bash | |
# Split by data source for parallel processing | |
awk -F',' 'NR==1{header=$0; next} {print header > $7".csv"; print >> $7".csv"}' input.csv | |
# Process each data source in parallel | |
for file in *.csv; do | |
python column_tag_del.py "$file" & | |
done | |
wait | |
``` | |
### Performance Monitoring | |
#### System Resource Monitoring | |
```bash | |
# Monitor CPU and memory usage | |
top -p $(pgrep -f column_tag_del.py) | |
# Monitor network usage | |
nethogs | |
# Monitor disk I/O | |
iotop | |
``` | |
#### Application Metrics | |
```bash | |
# Extract performance metrics from logs | |
grep "Rate:" logs/immuta_tag_removal_*.log | tail -10 | |
grep "cache_hits\|cache_misses" logs/immuta_tag_removal_*.log | |
``` | |
## 🆘 Support and Maintenance | |
### Regular Maintenance Tasks | |
#### 1. **Log Rotation** | |
```bash | |
# Clean old log files (older than 30 days) | |
find logs/ -name "*.log" -mtime +30 -delete | |
# Compress old logs | |
find logs/ -name "*.log" -mtime +7 -exec gzip {} \; | |
``` | |
#### 2. **Progress Database Cleanup** | |
```bash | |
# Archive old progress databases | |
mv tag_removal_progress.db archive/tag_removal_$(date +%Y%m%d).db | |
# Clean up old archives | |
find archive/ -name "tag_removal_*.db" -mtime +90 -delete | |
``` | |
#### 3. **Performance Analysis** | |
```sql | |
-- Analyze processing patterns | |
SELECT | |
DATE(processed_at) as date, | |
COUNT(*) as total_processed, | |
AVG(retry_count) as avg_retries, | |
COUNT(CASE WHEN status = 'success' THEN 1 END) as successful | |
FROM tag_removal_status | |
WHERE processed_at IS NOT NULL | |
GROUP BY DATE(processed_at) | |
ORDER BY date DESC; | |
``` | |
### Getting Help | |
#### 1. **Documentation** | |
- Review this README for comprehensive guidance | |
- Check inline code comments for technical details | |
- Refer to Immuta API documentation for API-specific issues | |
#### 2. **Debugging Steps** | |
1. Enable DEBUG logging: `--log-level DEBUG` | |
2. Run with dry-run mode: `--dry-run` | |
3. Test with small dataset first | |
4. Check API key permissions in Immuta | |
5. Verify network connectivity | |
#### 3. **Common Solutions** | |
- **Slow performance**: Reduce `--sleep` value or enable `--adaptive-rate-limiting` | |
- **Memory issues**: Process smaller batches or increase system memory | |
- **API errors**: Check Immuta server status and API key validity | |
- **File errors**: Verify CSV format and file permissions | |
## 📄 License and Attribution | |
### License | |
This tool is released under the MIT License. | |
### Attribution | |
- **Version**: 2.0.0 | |
- **Compatible with**: Immuta v1 API version 2024.2 | |
- **Based on**: Enterprise patterns and Python best practices | |
### Changelog | |
#### Version 1.0.0 (2024-11-28) | |
- Initial release | |
- Enterprise-grade tag removal functionality | |
- Comprehensive error handling and recovery | |
- Progress tracking with SQLite persistence | |
- Adaptive rate limiting and circuit breaker patterns | |
- Multi-level logging and monitoring | |
- Resume functionality for interrupted operations | |
- Export capabilities for analysis and reporting | |
--- | |
## 🎯 Quick Start Summary | |
```bash | |
# 1. Install dependencies | |
pip install requests | |
# 2. Prepare your CSV file with required columns | |
# 3. Test with dry run | |
python column_tag_del.py input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY \ | |
--dry-run | |
# 4. Execute actual removal | |
python column_tag_del.py input.csv \ | |
--immuta-url https://immuta.company.com \ | |
--api-key YOUR_API_KEY \ | |
--export-results results.csv | |
# 5. Monitor progress | |
tail -f logs/immuta_tag_removal_*.log | |
``` | |
**🎉 You're ready to efficiently remove Immuta column tags at enterprise scale!** |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Enterprise Immuta Column Tag Removal Tool | |
A high-performance, enterprise-grade tool for removing tags from Immuta columns | |
based on CSV input. Features comprehensive error handling, progress tracking, | |
resume functionality, and enterprise patterns like circuit breaker and adaptive | |
rate limiting. | |
Version: 1.0.0 | |
Compatible with: Immuta v1 API version 2024.2 | |
License: MIT | |
""" | |
import csv | |
import logging | |
import sys | |
import os | |
import requests | |
import argparse | |
from sys import exit | |
import traceback | |
from dataclasses import dataclass | |
from typing import Set, Dict, List, Optional | |
from collections import defaultdict | |
import urllib.parse | |
import time | |
import hashlib | |
import sqlite3 | |
from datetime import datetime, timedelta | |
import signal | |
import atexit | |
from contextlib import contextmanager | |
# Script metadata | |
__version__ = "1.0.0" | |
__license__ = "MIT" | |
__immuta_api_version__ = "2024.2" | |
# Global variables | |
logger = None | |
shutdown_requested = False | |
def setup_logging(log_level=logging.INFO, log_dir="./logs"): | |
"""Setup comprehensive logging with rotation for large scale operations""" | |
os.makedirs(log_dir, exist_ok=True) | |
# Create timestamp for log files | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Configure multiple log handlers | |
logger = logging.getLogger("immuta_enterprise_tag_removal") | |
logger.setLevel(log_level) | |
# Clear existing handlers | |
for handler in logger.handlers[:]: | |
logger.removeHandler(handler) | |
# Console handler with concise format | |
console_handler = logging.StreamHandler(sys.stdout) | |
console_handler.setLevel(logging.INFO) | |
console_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") | |
console_handler.setFormatter(console_formatter) | |
logger.addHandler(console_handler) | |
# Main log file with detailed format | |
main_log_file = os.path.join(log_dir, f"immuta_tag_removal_{timestamp}.log") | |
file_handler = logging.FileHandler(main_log_file, encoding="utf-8") | |
file_handler.setLevel(log_level) | |
file_formatter = logging.Formatter( | |
"%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s" | |
) | |
file_handler.setFormatter(file_formatter) | |
logger.addHandler(file_handler) | |
# Error-only log file | |
error_log_file = os.path.join(log_dir, f"immuta_tag_removal_errors_{timestamp}.log") | |
error_handler = logging.FileHandler(error_log_file, encoding="utf-8") | |
error_handler.setLevel(logging.ERROR) | |
error_handler.setFormatter(file_formatter) | |
logger.addHandler(error_handler) | |
return logger, main_log_file, error_log_file | |
class ImmutaggingError(Exception): | |
"""Custom exception for Immuta tag removal operations""" | |
pass | |
@dataclass | |
class ColumnTagRemoval: | |
"""Enhanced data class for tag removal operations with enterprise features""" | |
row_id: int | |
database: str | |
schema: str | |
table: str | |
column: str | |
immuta_tag: str | |
immuta_schema_project: str | |
immuta_datasource: str | |
immuta_sql_schema: str | |
immuta_column_key: str | |
# Additional enterprise fields | |
source_file_line: int = 0 | |
checksum: str = "" | |
def __post_init__(self): | |
# Generate checksum for data integrity | |
data_str = f"{self.database}|{self.schema}|{self.table}|{self.column}|{self.immuta_tag}" | |
self.checksum = hashlib.md5(data_str.encode()).hexdigest()[:8] | |
def __str__(self) -> str: | |
return f"Row {self.row_id}: {self.database}.{self.schema}.{self.table}.{self.column} -> REMOVE {self.immuta_tag}" | |
class ProgressTracker: | |
"""Enterprise-grade progress tracking with persistence and recovery for tag removal""" | |
def __init__(self, db_path: str): | |
self.db_path = db_path | |
self.init_database() | |
self.start_time = time.time() | |
def init_database(self): | |
"""Initialize SQLite database for tag removal progress tracking""" | |
with sqlite3.connect(self.db_path) as conn: | |
conn.execute( | |
""" | |
CREATE TABLE IF NOT EXISTS tag_removal_status ( | |
row_id INTEGER PRIMARY KEY, | |
database_name TEXT, | |
schema_name TEXT, | |
table_name TEXT, | |
column_name TEXT, | |
tag_name TEXT, | |
status TEXT, -- 'pending', 'success', 'failed', 'skipped', 'not_found' | |
error_message TEXT, | |
processed_at TEXT, | |
retry_count INTEGER DEFAULT 0, | |
tag_existed BOOLEAN DEFAULT FALSE | |
) | |
""" | |
) | |
conn.execute( | |
""" | |
CREATE TABLE IF NOT EXISTS removal_execution_metadata ( | |
id INTEGER PRIMARY KEY, | |
session_id TEXT, | |
start_time TEXT, | |
end_time TEXT, | |
total_records INTEGER, | |
completed_records INTEGER, | |
failed_records INTEGER, | |
skipped_records INTEGER, | |
not_found_records INTEGER, | |
environment TEXT, | |
script_version TEXT | |
) | |
""" | |
) | |
# Create indexes for better performance | |
conn.execute( | |
"CREATE INDEX IF NOT EXISTS idx_removal_status ON tag_removal_status(status)" | |
) | |
conn.execute( | |
"CREATE INDEX IF NOT EXISTS idx_removal_row_id ON tag_removal_status(row_id)" | |
) | |
conn.commit() | |
def load_existing_progress(self) -> Dict[int, str]: | |
"""Load existing progress to support resume functionality""" | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.execute( | |
""" | |
SELECT row_id, status FROM tag_removal_status | |
WHERE status IN ('success', 'skipped', 'not_found') | |
""" | |
) | |
result = {row_id: status for row_id, status in cursor.fetchall()} | |
logger.info( | |
f"Loaded {len(result)} previously processed tag removal records" | |
) | |
return result | |
def update_record_status( | |
self, | |
row_id: int, | |
database: str, | |
schema: str, | |
table: str, | |
column: str, | |
tag: str, | |
status: str, | |
error_message: str = None, | |
tag_existed: bool = False, | |
): | |
"""Update processing status for a tag removal record""" | |
with sqlite3.connect(self.db_path) as conn: | |
conn.execute( | |
""" | |
INSERT OR REPLACE INTO tag_removal_status | |
(row_id, database_name, schema_name, table_name, column_name, tag_name, | |
status, error_message, processed_at, retry_count, tag_existed) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, | |
COALESCE((SELECT retry_count FROM tag_removal_status WHERE row_id = ?), 0) + | |
CASE WHEN ? = 'failed' THEN 1 ELSE 0 END, ?) | |
""", | |
( | |
row_id, | |
database, | |
schema, | |
table, | |
column, | |
tag, | |
status, | |
error_message, | |
datetime.now().isoformat(), | |
row_id, | |
status, | |
tag_existed, | |
), | |
) | |
conn.commit() | |
def get_progress_summary(self) -> Dict: | |
"""Get current progress summary for tag removal""" | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.execute( | |
""" | |
SELECT status, COUNT(*) FROM tag_removal_status GROUP BY status | |
""" | |
) | |
status_counts = dict(cursor.fetchall()) | |
cursor = conn.execute("SELECT COUNT(*) FROM tag_removal_status") | |
total = cursor.fetchone()[0] | |
return { | |
"total": total, | |
"success": status_counts.get("success", 0), | |
"failed": status_counts.get("failed", 0), | |
"skipped": status_counts.get("skipped", 0), | |
"not_found": status_counts.get("not_found", 0), | |
"pending": status_counts.get("pending", 0), | |
} | |
def get_failed_records(self) -> List[Dict]: | |
"""Get all failed tag removal records for retry or analysis""" | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.execute( | |
""" | |
SELECT row_id, database_name, schema_name, table_name, column_name, | |
tag_name, error_message, retry_count, tag_existed | |
FROM tag_removal_status | |
WHERE status = 'failed' | |
ORDER BY retry_count, row_id | |
""" | |
) | |
columns = [desc[0] for desc in cursor.description] | |
return [dict(zip(columns, row)) for row in cursor.fetchall()] | |
def export_results(self, output_file: str): | |
"""Export all tag removal results to CSV for analysis""" | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.execute( | |
""" | |
SELECT * FROM tag_removal_status ORDER BY row_id | |
""" | |
) | |
with open(output_file, "w", newline="", encoding="utf-8") as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerow([desc[0] for desc in cursor.description]) | |
writer.writerows(cursor.fetchall()) | |
class EnterpriseRateLimiter: | |
"""Enterprise-grade rate limiter with adaptive behavior for tag removal operations""" | |
def __init__(self, sleep_interval: float = 0.2, adaptive: bool = True): | |
self.base_sleep_interval = sleep_interval | |
self.current_sleep_interval = sleep_interval | |
self.adaptive = adaptive | |
self.last_call_time = 0 | |
self.consecutive_errors = 0 | |
self.success_count = 0 | |
# Statistics tracking | |
self.call_history = [] | |
self.error_history = [] | |
calls_per_second = 1.0 / sleep_interval if sleep_interval > 0 else 5.0 | |
logger.info( | |
f"Enterprise rate limiter initialized for tag removal: {sleep_interval}s base interval, " | |
f"~{calls_per_second:.1f} calls/second, adaptive={adaptive}" | |
) | |
def wait_if_needed(self): | |
"""Wait with adaptive rate limiting based on error patterns""" | |
current_time = time.time() | |
# Clean old history (keep last 5 minutes) | |
cutoff_time = current_time - 300 | |
self.call_history = [t for t in self.call_history if t > cutoff_time] | |
self.error_history = [t for t in self.error_history if t > cutoff_time] | |
# Adaptive rate limiting based on recent errors | |
if self.adaptive: | |
recent_error_rate = len(self.error_history) / max(len(self.call_history), 1) | |
if recent_error_rate > 0.1: # More than 10% error rate | |
self.current_sleep_interval = min(self.base_sleep_interval * 2, 2.0) | |
elif recent_error_rate < 0.01: # Less than 1% error rate | |
self.current_sleep_interval = max(self.base_sleep_interval * 0.8, 0.1) | |
else: | |
self.current_sleep_interval = self.base_sleep_interval | |
# Enforce minimum interval | |
time_since_last_call = current_time - self.last_call_time | |
if time_since_last_call < self.current_sleep_interval: | |
sleep_time = self.current_sleep_interval - time_since_last_call | |
time.sleep(sleep_time) | |
self.last_call_time = time.time() | |
self.call_history.append(self.last_call_time) | |
def record_error(self): | |
"""Record an error for adaptive rate limiting""" | |
self.consecutive_errors += 1 | |
self.error_history.append(time.time()) | |
# Exponential backoff for consecutive errors | |
if self.consecutive_errors > 3: | |
backoff_time = min(2 ** (self.consecutive_errors - 3), 30) | |
logger.warning( | |
f"Consecutive errors detected. Backing off for {backoff_time}s" | |
) | |
time.sleep(backoff_time) | |
def record_success(self): | |
"""Record a success to reset error counters""" | |
self.consecutive_errors = 0 | |
self.success_count += 1 | |
class EnterpriseImmutaggingClient: | |
"""Enterprise-grade client with circuit breaker, retry logic, and health monitoring for tag removal""" | |
def __init__( | |
self, immuta_url: str, api_key: str, rate_limiter: EnterpriseRateLimiter | |
): | |
self.immuta_url = immuta_url.rstrip("/") | |
self.headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json", | |
"x-api-key": api_key, | |
} | |
self.rate_limiter = rate_limiter | |
self.session = requests.Session() | |
self.session.headers.update(self.headers) | |
# Circuit breaker state | |
self.circuit_breaker_failures = 0 | |
self.circuit_breaker_opened_at = None | |
self.circuit_breaker_threshold = 10 | |
self.circuit_breaker_timeout = 300 # 5 minutes | |
# Health monitoring | |
self.health_check_interval = 600 # 10 minutes | |
self.last_health_check = 0 | |
def _circuit_breaker_check(self): | |
"""Check if circuit breaker should prevent API calls""" | |
if self.circuit_breaker_opened_at: | |
if ( | |
time.time() - self.circuit_breaker_opened_at | |
> self.circuit_breaker_timeout | |
): | |
logger.info("Circuit breaker timeout expired, attempting to reset") | |
self.circuit_breaker_opened_at = None | |
self.circuit_breaker_failures = 0 | |
else: | |
raise ImmutaggingError( | |
"Circuit breaker is open - too many consecutive failures" | |
) | |
def _health_check(self): | |
"""Perform periodic health check""" | |
current_time = time.time() | |
if current_time - self.last_health_check > self.health_check_interval: | |
try: | |
# Simple health check - get tag list with minimal parameters | |
health_response = self.session.get( | |
f"{self.immuta_url}/tag", params={"limit": 1}, timeout=10 | |
) | |
health_response.raise_for_status() | |
logger.debug("Health check passed") | |
self.last_health_check = current_time | |
except Exception as e: | |
logger.warning(f"Health check failed: {str(e)}") | |
def make_request( | |
self, | |
method: str, | |
endpoint: str, | |
data: Optional[Dict] = None, | |
params: Optional[Dict] = None, | |
retries: int = 5, | |
) -> requests.Response: | |
"""Make HTTP request with comprehensive error handling and retry logic""" | |
self._circuit_breaker_check() | |
self._health_check() | |
self.rate_limiter.wait_if_needed() | |
url = f"{self.immuta_url}{endpoint}" | |
for attempt in range(retries): | |
try: | |
if method.upper() == "GET": | |
response = self.session.get(url, params=params, timeout=60) | |
elif method.upper() == "POST": | |
response = self.session.post( | |
url, json=data, params=params, timeout=60 | |
) | |
elif method.upper() == "DELETE": | |
response = self.session.delete( | |
url, json=data, params=params, timeout=60 | |
) | |
else: | |
raise ValueError(f"Unsupported HTTP method: {method}") | |
# Handle different HTTP status codes | |
if response.status_code == 429: | |
retry_after = int(response.headers.get("Retry-After", 60)) | |
logger.warning( | |
f"Rate limited (429). Waiting {retry_after} seconds before retry {attempt + 1}" | |
) | |
time.sleep(retry_after) | |
self.rate_limiter.record_error() | |
continue | |
elif response.status_code >= 500: | |
# Server errors - retry with exponential backoff | |
backoff_time = min(2**attempt, 60) | |
logger.warning( | |
f"Server error {response.status_code}. Retrying in {backoff_time}s (attempt {attempt + 1})" | |
) | |
time.sleep(backoff_time) | |
self.rate_limiter.record_error() | |
continue | |
elif response.status_code >= 400: | |
# Client errors - usually don't retry, but 404 is expected for tag removal | |
if response.status_code == 404: | |
# Tag not found - this is expected in removal operations | |
self.rate_limiter.record_success() | |
return response | |
error_msg = ( | |
f"Client error {response.status_code}: {response.text[:200]}" | |
) | |
logger.error(error_msg) | |
self.rate_limiter.record_error() | |
raise ImmutaggingError(error_msg) | |
# Success | |
response.raise_for_status() | |
self.rate_limiter.record_success() | |
self.circuit_breaker_failures = 0 | |
return response | |
except requests.exceptions.Timeout: | |
logger.warning(f"Request timeout (attempt {attempt + 1}/{retries})") | |
self.rate_limiter.record_error() | |
if attempt == retries - 1: | |
self.circuit_breaker_failures += 1 | |
if self.circuit_breaker_failures >= self.circuit_breaker_threshold: | |
self.circuit_breaker_opened_at = time.time() | |
logger.error("Circuit breaker opened due to excessive timeouts") | |
raise ImmutaggingError(f"Request timeout after {retries} attempts") | |
time.sleep(2**attempt) | |
except requests.exceptions.ConnectionError as e: | |
logger.warning( | |
f"Connection error (attempt {attempt + 1}/{retries}): {str(e)}" | |
) | |
self.rate_limiter.record_error() | |
if attempt == retries - 1: | |
self.circuit_breaker_failures += 1 | |
if self.circuit_breaker_failures >= self.circuit_breaker_threshold: | |
self.circuit_breaker_opened_at = time.time() | |
logger.error("Circuit breaker opened due to connection errors") | |
raise ImmutaggingError( | |
f"Connection failed after {retries} attempts: {str(e)}" | |
) | |
time.sleep(2**attempt) | |
except Exception as e: | |
logger.error( | |
f"Unexpected error (attempt {attempt + 1}/{retries}): {str(e)}" | |
) | |
self.rate_limiter.record_error() | |
if attempt == retries - 1: | |
raise ImmutaggingError( | |
f"Unexpected error after {retries} attempts: {str(e)}" | |
) | |
time.sleep(2**attempt) | |
raise ImmutaggingError("Exhausted all retry attempts") | |
class EnterpriseTagRemovalOrchestrator: | |
"""Main orchestrator for enterprise-scale tag removal operations""" | |
def __init__(self, args, progress_tracker): | |
self.args = args | |
self.progress_tracker = progress_tracker | |
# Initialize API client | |
rate_limiter = EnterpriseRateLimiter( | |
sleep_interval=args.sleep, adaptive=args.adaptive_rate_limiting | |
) | |
self.api_client = EnterpriseImmutaggingClient( | |
args.immuta_url, args.api_key, rate_limiter | |
) | |
# State management | |
self.validation_cache = {} | |
self.column_tags_cache = {} | |
# Statistics | |
self.stats = { | |
"start_time": time.time(), | |
"validation_time": 0, | |
"processing_time": 0, | |
"total_api_calls": 0, | |
"cache_hits": 0, | |
"cache_misses": 0, | |
} | |
# Graceful shutdown handling | |
signal.signal(signal.SIGINT, self._signal_handler) | |
signal.signal(signal.SIGTERM, self._signal_handler) | |
atexit.register(self._cleanup) | |
self.shutdown_requested = False | |
global shutdown_requested | |
def _signal_handler(self, signum, frame): | |
"""Handle graceful shutdown""" | |
logger.info(f"Received signal {signum}. Initiating graceful shutdown...") | |
self.shutdown_requested = True | |
global shutdown_requested | |
shutdown_requested = True | |
def _cleanup(self): | |
"""Cleanup function called on exit""" | |
if hasattr(self, "progress_tracker"): | |
summary = self.progress_tracker.get_progress_summary() | |
logger.info(f"Final tag removal progress summary: {summary}") | |
@contextmanager | |
def timer(self, operation_name: str): | |
"""Context manager for timing operations""" | |
start_time = time.time() | |
try: | |
yield | |
finally: | |
elapsed = time.time() - start_time | |
logger.info(f"{operation_name} completed in {elapsed:.2f} seconds") | |
def validate_and_cache_references(self, records: List[ColumnTagRemoval]) -> bool: | |
"""Validate and cache all reference data for tag removal operations""" | |
logger.info( | |
"Starting comprehensive validation of reference data for tag removal..." | |
) | |
with self.timer("Reference validation"): | |
# Extract unique sets | |
schema_projects = {r.immuta_schema_project for r in records} | |
data_sources = {r.immuta_datasource for r in records} | |
tags = {r.immuta_tag for r in records} | |
logger.info( | |
f"Validating {len(schema_projects)} schema projects, " | |
f"{len(data_sources)} data sources, {len(tags)} tags for removal" | |
) | |
# Validate each category with progress tracking | |
validation_results = {} | |
try: | |
# Schema projects | |
logger.info("Validating schema projects...") | |
validation_results["schema_projects"] = self._validate_schema_projects( | |
schema_projects | |
) | |
# Data sources | |
logger.info("Validating data sources...") | |
validation_results["data_sources"] = self._validate_data_sources( | |
data_sources | |
) | |
# Columns (only if we have valid data sources) | |
if validation_results["data_sources"]: | |
logger.info("Validating columns...") | |
validation_results["columns"] = self._validate_columns( | |
validation_results["data_sources"] | |
) | |
else: | |
validation_results["columns"] = set() | |
# Tags - for removal, we need to check if they exist | |
logger.info("Validating tags for removal...") | |
validation_results["tags"] = self._validate_tags_for_removal(tags) | |
# Cache results | |
self.validation_cache = validation_results | |
# Validate individual records | |
return self._validate_records(records) | |
except Exception as e: | |
logger.error(f"Validation failed: {str(e)}") | |
return False | |
def _validate_schema_projects(self, schema_projects: Set[str]) -> Set[str]: | |
"""Validate schema projects with detailed progress tracking""" | |
valid_projects = set() | |
for i, project in enumerate(schema_projects, 1): | |
if self.shutdown_requested: | |
break | |
try: | |
logger.debug( | |
f"Validating schema project {i}/{len(schema_projects)}: {project}" | |
) | |
params = { | |
"size": 999999999, | |
"sortField": "name", | |
"sortOrder": "asc", | |
"nameOnly": "true", | |
"searchText": project, | |
} | |
response = self.api_client.make_request( | |
"GET", "/project", params=params | |
) | |
data = response.json() | |
if data.get("count", 0) > 0: | |
for proj in data.get("hits", []): | |
if ( | |
project == proj.get("name") | |
and proj.get("type", "").lower() == "schema" | |
and not proj.get("deleted", True) | |
): | |
valid_projects.add(project) | |
break | |
if i % 10 == 0: | |
logger.info( | |
f"Schema project validation progress: {i}/{len(schema_projects)}" | |
) | |
except Exception as e: | |
logger.error(f"Error validating schema project {project}: {str(e)}") | |
continue | |
logger.info( | |
f"Schema project validation complete: {len(valid_projects)}/{len(schema_projects)} valid" | |
) | |
return valid_projects | |
def _validate_data_sources(self, data_sources: Set[str]) -> Dict[str, int]: | |
"""Validate data sources and return mapping to IDs""" | |
valid_sources = {} | |
for i, data_source in enumerate(data_sources, 1): | |
if self.shutdown_requested: | |
break | |
try: | |
logger.debug( | |
f"Validating data source {i}/{len(data_sources)}: {data_source}" | |
) | |
encoded_name = urllib.parse.quote(data_source, safe="") | |
response = self.api_client.make_request( | |
"GET", f"/dataSource/name/{encoded_name}" | |
) | |
data = response.json() | |
if not data.get("deleted", True): | |
valid_sources[data["name"]] = data["id"] | |
if i % 10 == 0: | |
logger.info( | |
f"Data source validation progress: {i}/{len(data_sources)}" | |
) | |
except requests.exceptions.HTTPError as e: | |
if e.response.status_code != 404: | |
logger.error( | |
f"Error validating data source {data_source}: {str(e)}" | |
) | |
continue | |
except Exception as e: | |
logger.error(f"Error validating data source {data_source}: {str(e)}") | |
continue | |
logger.info( | |
f"Data source validation complete: {len(valid_sources)}/{len(data_sources)} valid" | |
) | |
return valid_sources | |
def _validate_columns(self, data_source_map: Dict[str, int]) -> Set[str]: | |
"""Validate columns for all data sources""" | |
valid_columns = set() | |
for i, (data_source_name, data_source_id) in enumerate( | |
data_source_map.items(), 1 | |
): | |
if self.shutdown_requested: | |
break | |
try: | |
logger.debug( | |
f"Fetching columns for data source {i}/{len(data_source_map)}: {data_source_name}" | |
) | |
response = self.api_client.make_request( | |
"GET", f"/dictionary/{data_source_id}" | |
) | |
data = response.json() | |
for column in data.get("metadata", []): | |
column_name = column.get("name", "") | |
if column_name: | |
valid_columns.add(f"{data_source_name}|{column_name}") | |
if i % 10 == 0: | |
logger.info( | |
f"Column validation progress: {i}/{len(data_source_map)}" | |
) | |
except Exception as e: | |
logger.error(f"Error fetching columns for {data_source_name}: {str(e)}") | |
continue | |
logger.info( | |
f"Column validation complete: {len(valid_columns)} valid columns found" | |
) | |
return valid_columns | |
def _validate_tags_for_removal(self, tags: Set[str]) -> Set[str]: | |
"""Validate tags exist in the system (for removal operations)""" | |
valid_tags = set() | |
for i, tag in enumerate(tags, 1): | |
if self.shutdown_requested: | |
break | |
try: | |
logger.debug(f"Validating tag for removal {i}/{len(tags)}: {tag}") | |
if tag.startswith("XXX-Classification."): | |
params = { | |
"source": "curated", | |
"searchText": tag, | |
"excludedHierarchies": '["Discovered","New","Skip Stats Job","XXX-DataSource","DataProperties"]', | |
"includeAllSystemTags": "false", | |
"limit": 999999999, | |
} | |
response = self.api_client.make_request( | |
"GET", "/tag", params=params | |
) | |
data = response.json() | |
if data and isinstance(data, list): | |
for tag_info in data: | |
if tag == tag_info.get("name") and ( | |
( | |
not tag_info.get("hasLeafNodes", True) | |
and not tag_info.get("deleted", True) | |
) | |
or tag_info.get("hasLeafNodes", True) | |
): | |
valid_tags.add(tag) | |
break | |
if i % 10 == 0: | |
logger.info(f"Tag validation progress: {i}/{len(tags)}") | |
except Exception as e: | |
logger.error(f"Error validating tag {tag}: {str(e)}") | |
continue | |
logger.info( | |
f"Tag validation complete: {len(valid_tags)}/{len(tags)} valid for removal" | |
) | |
return valid_tags | |
def _validate_records(self, records: List[ColumnTagRemoval]) -> bool: | |
"""Validate individual records against cached reference data""" | |
valid_schema_projects = self.validation_cache.get("schema_projects", set()) | |
valid_data_sources = self.validation_cache.get("data_sources", {}) | |
valid_columns = self.validation_cache.get("columns", set()) | |
valid_tags = self.validation_cache.get("tags", set()) | |
failed_records = [] | |
for record in records: | |
errors = [] | |
if record.immuta_schema_project not in valid_schema_projects: | |
errors.append(f"Invalid schema project: {record.immuta_schema_project}") | |
if record.immuta_datasource not in valid_data_sources: | |
errors.append(f"Invalid data source: {record.immuta_datasource}") | |
if record.immuta_column_key not in valid_columns: | |
errors.append(f"Invalid column: {record.immuta_column_key}") | |
# For tag removal, we warn but don't fail if tag doesn't exist | |
if record.immuta_tag not in valid_tags: | |
logger.warning( | |
f"Row {record.row_id}: Tag {record.immuta_tag} not found in system - may already be removed" | |
) | |
if errors: | |
failed_records.append((record, errors)) | |
self.progress_tracker.update_record_status( | |
record.row_id, | |
record.database, | |
record.schema, | |
record.table, | |
record.column, | |
record.immuta_tag, | |
"validation_failed", | |
"; ".join(errors), | |
) | |
if failed_records: | |
logger.error(f"Validation failed for {len(failed_records)} records") | |
for record, errors in failed_records[:10]: # Log first 10 failures | |
logger.error(f"Row {record.row_id}: {'; '.join(errors)}") | |
if len(failed_records) > 10: | |
logger.error( | |
f"... and {len(failed_records) - 10} more validation failures" | |
) | |
return False | |
logger.info(f"All {len(records)} records passed validation for tag removal") | |
return True | |
def process_records(self, records: List[ColumnTagRemoval]) -> Dict[str, int]: | |
"""Process tag removal records with enterprise-grade error handling and recovery""" | |
logger.info(f"Starting tag removal processing of {len(records)} records...") | |
# Load existing progress for resume functionality | |
existing_progress = self.progress_tracker.load_existing_progress() | |
# Filter out already processed records | |
pending_records = [r for r in records if r.row_id not in existing_progress] | |
if len(pending_records) < len(records): | |
logger.info( | |
f"Resuming from previous run. Processing {len(pending_records)} remaining records " | |
f"(skipping {len(records) - len(pending_records)} already processed)" | |
) | |
# Group records by column for efficient processing | |
column_groups = defaultdict(list) | |
for record in pending_records: | |
column_key = f"{record.immuta_datasource}|{record.column}" | |
column_groups[column_key].append(record) | |
logger.info( | |
f"Grouped records into {len(column_groups)} unique columns for tag removal" | |
) | |
# Statistics tracking | |
stats = {"success": 0, "failed": 0, "skipped": 0, "not_found": 0, "retried": 0} | |
processed_columns = 0 | |
total_columns = len(column_groups) | |
# Process each column group | |
for column_key, column_records in column_groups.items(): | |
if self.shutdown_requested: | |
logger.info( | |
"Shutdown requested. Stopping tag removal processing gracefully." | |
) | |
break | |
processed_columns += 1 | |
try: | |
logger.debug( | |
f"Processing column group {processed_columns}/{total_columns}: {column_key}" | |
) | |
# Process all tag removals for this column | |
column_stats = self._process_column_group_removal(column_records) | |
# Update statistics | |
for key in stats: | |
stats[key] += column_stats.get(key, 0) | |
# Progress reporting | |
if processed_columns % 100 == 0: | |
elapsed = time.time() - self.stats["start_time"] | |
rate = processed_columns / elapsed * 3600 # columns per hour | |
eta = (total_columns - processed_columns) / max(rate / 3600, 0.1) | |
logger.info( | |
f"Progress: {processed_columns}/{total_columns} column groups " | |
f"({processed_columns/total_columns*100:.1f}%) - " | |
f"Rate: {rate:.1f} columns/hour - " | |
f"ETA: {timedelta(seconds=int(eta))}" | |
) | |
# Progress summary | |
logger.info( | |
f"Current stats: Success={stats['success']}, " | |
f"Failed={stats['failed']}, Skipped={stats['skipped']}, " | |
f"Not Found={stats['not_found']}" | |
) | |
except Exception as e: | |
logger.error(f"Error processing column group {column_key}: {str(e)}") | |
# Mark all records in this group as failed | |
for record in column_records: | |
self.progress_tracker.update_record_status( | |
record.row_id, | |
record.database, | |
record.schema, | |
record.table, | |
record.column, | |
record.immuta_tag, | |
"failed", | |
str(e), | |
) | |
stats["failed"] += len(column_records) | |
# Final statistics | |
total_processed = sum(stats.values()) | |
logger.info( | |
f"Tag removal processing complete. Total processed: {total_processed}, " | |
f"Success: {stats['success']}, Failed: {stats['failed']}, " | |
f"Skipped: {stats['skipped']}, Not Found: {stats['not_found']}" | |
) | |
return stats | |
def _process_column_group_removal( | |
self, column_records: List[ColumnTagRemoval] | |
) -> Dict[str, int]: | |
"""Process all tag removals for a single column with optimized API usage""" | |
if not column_records: | |
return {"success": 0, "failed": 0, "skipped": 0, "not_found": 0} | |
first_record = column_records[0] | |
data_source_id = self.validation_cache["data_sources"][ | |
first_record.immuta_datasource | |
] | |
stats = {"success": 0, "failed": 0, "skipped": 0, "not_found": 0} | |
try: | |
# Get existing tags for this column (with caching) | |
existing_tags = self._get_existing_column_tags_cached( | |
data_source_id, first_record.column | |
) | |
# Process each tag removal for this column | |
for record in column_records: | |
try: | |
# Check if tag exists to be removed | |
if record.immuta_tag not in existing_tags: | |
stats["not_found"] += 1 | |
self.progress_tracker.update_record_status( | |
record.row_id, | |
record.database, | |
record.schema, | |
record.table, | |
record.column, | |
record.immuta_tag, | |
"not_found", | |
"Tag not found on column - may already be removed", | |
False, | |
) | |
logger.debug( | |
f"Row {record.row_id}: Tag {record.immuta_tag} not found on column" | |
) | |
continue | |
# Skip actual tag removal in dry-run mode | |
if self.args.dry_run: | |
stats["success"] += 1 | |
self.progress_tracker.update_record_status( | |
record.row_id, | |
record.database, | |
record.schema, | |
record.table, | |
record.column, | |
record.immuta_tag, | |
"success", | |
"DRY RUN - would have removed tag", | |
True, | |
) | |
logger.debug( | |
f"DRY RUN: Would remove tag {record.immuta_tag} from column {record.column}" | |
) | |
continue | |
# Remove the tag | |
success = self._remove_single_tag(record, data_source_id) | |
if success: | |
stats["success"] += 1 | |
existing_tags.discard(record.immuta_tag) # Update cache | |
self.progress_tracker.update_record_status( | |
record.row_id, | |
record.database, | |
record.schema, | |
record.table, | |
record.column, | |
record.immuta_tag, | |
"success", | |
None, | |
True, | |
) | |
logger.debug( | |
f"Row {record.row_id}: Successfully removed tag {record.immuta_tag}" | |
) | |
else: | |
stats["failed"] += 1 | |
self.progress_tracker.update_record_status( | |
record.row_id, | |
record.database, | |
record.schema, | |
record.table, | |
record.column, | |
record.immuta_tag, | |
"failed", | |
"Tag removal failed - see logs", | |
True, | |
) | |
except Exception as e: | |
stats["failed"] += 1 | |
error_msg = f"Error processing tag removal record: {str(e)}" | |
logger.error(f"Row {record.row_id}: {error_msg}") | |
self.progress_tracker.update_record_status( | |
record.row_id, | |
record.database, | |
record.schema, | |
record.table, | |
record.column, | |
record.immuta_tag, | |
"failed", | |
error_msg, | |
) | |
except Exception as e: | |
# Catastrophic failure for entire column group | |
logger.error(f"Catastrophic failure for column group: {str(e)}") | |
for record in column_records: | |
self.progress_tracker.update_record_status( | |
record.row_id, | |
record.database, | |
record.schema, | |
record.table, | |
record.column, | |
record.immuta_tag, | |
"failed", | |
f"Column group processing failed: {str(e)}", | |
) | |
stats["failed"] = len(column_records) | |
return stats | |
def _get_existing_column_tags_cached( | |
self, data_source_id: int, column_name: str | |
) -> Set[str]: | |
"""Get existing tags for a column with caching for performance""" | |
cache_key = f"{data_source_id}|{column_name}" | |
if cache_key in self.column_tags_cache: | |
self.stats["cache_hits"] += 1 | |
return self.column_tags_cache[cache_key] | |
self.stats["cache_misses"] += 1 | |
try: | |
response = self.api_client.make_request( | |
"GET", f"/dictionary/{data_source_id}" | |
) | |
data = response.json() | |
existing_tags = set() | |
for column in data.get("metadata", []): | |
if column.get("name") == column_name: | |
for tag in column.get("tags", []): | |
tag_name = tag.get("name", "") | |
if tag_name: | |
existing_tags.add(tag_name) | |
break | |
# Cache the result | |
self.column_tags_cache[cache_key] = existing_tags | |
return existing_tags | |
except Exception as e: | |
logger.error( | |
f"Error fetching existing tags for column {column_name}: {str(e)}" | |
) | |
return set() | |
def _remove_single_tag(self, record: ColumnTagRemoval, data_source_id: int) -> bool: | |
"""Remove a single tag from a column using the correct Immuta v1 API""" | |
max_retries = 3 | |
for attempt in range(max_retries): | |
try: | |
# Remove the tag using the correct endpoint format | |
# Based on Immuta v1 API: DELETE /tag/column/{column_identifier} with tag name in body | |
column_identifier = f"{data_source_id}.{record.column}" | |
endpoint = f"/tag/column/{column_identifier}" | |
# Request body with tag name for removal | |
request_data = {"name": record.immuta_tag} | |
response = self.api_client.make_request( | |
"DELETE", endpoint, data=request_data | |
) | |
if response.status_code in [200, 204]: | |
logger.debug( | |
f"Successfully removed tag {record.immuta_tag} from column {record.column}" | |
) | |
return True | |
elif response.status_code == 404: | |
logger.warning( | |
f"Tag {record.immuta_tag} not found on column {record.column} - may already be removed" | |
) | |
return True # Consider this success | |
else: | |
logger.error( | |
f"Failed to remove tag {record.immuta_tag} from column {record.column}: " | |
f"HTTP {response.status_code} - {response.text[:200]}" | |
) | |
if attempt < max_retries - 1: | |
time.sleep(2**attempt) # Exponential backoff | |
continue | |
return False | |
except Exception as e: | |
logger.warning( | |
f"Row {record.row_id}: Attempt {attempt + 1} failed for tag removal {record.immuta_tag}: {str(e)}" | |
) | |
if attempt < max_retries - 1: | |
time.sleep(2**attempt) # Exponential backoff | |
else: | |
logger.error( | |
f"Row {record.row_id}: All {max_retries} attempts failed for tag removal {record.immuta_tag}" | |
) | |
return False | |
return False | |
def load_csv_data(file_path: str) -> List[ColumnTagRemoval]: | |
"""Load and validate CSV data for tag removal operations""" | |
if logger: | |
logger.info(f"Loading tag removal data from CSV file: {file_path}") | |
if not os.path.exists(file_path): | |
raise ImmutaggingError(f"CSV file not found: {file_path}") | |
records = [] | |
required_columns = [ | |
"database", | |
"schema", | |
"table", | |
"column", | |
"immuta_tag", | |
"immuta_schema_project", | |
"immuta_datasource", | |
"immuta_sql_schema", | |
] | |
try: | |
with open(file_path, "r", encoding="utf-8") as csvfile: | |
# Detect delimiter | |
sample = csvfile.read(1024) | |
csvfile.seek(0) | |
sniffer = csv.Sniffer() | |
delimiter = sniffer.sniff(sample).delimiter | |
reader = csv.DictReader(csvfile, delimiter=delimiter) | |
# Validate headers | |
missing_columns = [ | |
col for col in required_columns if col not in reader.fieldnames | |
] | |
if missing_columns: | |
raise ImmutaggingError( | |
f"Missing required columns in CSV: {missing_columns}" | |
) | |
# Load records | |
for row_num, row in enumerate(reader, start=2): # Start at 2 for header | |
try: | |
# Validate required fields | |
for col in required_columns: | |
if not row.get(col, "").strip(): | |
raise ValueError( | |
f"Empty required field '{col}' in row {row_num}" | |
) | |
# Create immuta_column_key | |
immuta_column_key = f"{row['immuta_datasource']}|{row['column']}" | |
record = ColumnTagRemoval( | |
row_id=row_num - 1, # Adjust for header | |
database=row["database"].strip(), | |
schema=row["schema"].strip(), | |
table=row["table"].strip(), | |
column=row["column"].strip(), | |
immuta_tag=row["immuta_tag"].strip(), | |
immuta_schema_project=row["immuta_schema_project"].strip(), | |
immuta_datasource=row["immuta_datasource"].strip(), | |
immuta_sql_schema=row["immuta_sql_schema"].strip(), | |
immuta_column_key=immuta_column_key, | |
source_file_line=row_num, | |
) | |
records.append(record) | |
except ValueError as e: | |
if logger: | |
logger.error(f"Invalid data in row {row_num}: {str(e)}") | |
continue | |
except Exception as e: | |
if logger: | |
logger.error(f"Error processing row {row_num}: {str(e)}") | |
continue | |
except Exception as e: | |
raise ImmutaggingError(f"Error reading CSV file: {str(e)}") | |
if logger: | |
logger.info(f"Successfully loaded {len(records)} tag removal records from CSV") | |
return records | |
def parse_arguments(): | |
"""Parse command line arguments for tag removal script""" | |
parser = argparse.ArgumentParser( | |
description="Enterprise Immuta Column Tag Removal Tool - Remove tags from columns based on CSV input", | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=""" | |
Examples: | |
# Remove tags with default settings | |
python column_tag_del.py input.csv --immuta-url https://immuta.company.com --api-key YOUR_KEY | |
# Dry run to preview what would be removed | |
python column_tag_del.py input.csv --immuta-url https://immuta.company.com --api-key YOUR_KEY --dry-run | |
# Custom rate limiting and progress tracking | |
python column_tag_del.py input.csv --immuta-url https://immuta.company.com --api-key YOUR_KEY --sleep 0.5 --progress-db removal_progress.db | |
# Resume from previous run | |
python column_tag_del.py input.csv --immuta-url https://immuta.company.com --api-key YOUR_KEY --resume | |
""", | |
) | |
# Required arguments | |
parser.add_argument("csv_file", help="CSV file containing tag removal data") | |
parser.add_argument("--immuta-url", required=True, help="Immuta instance URL") | |
parser.add_argument("--api-key", required=True, help="Immuta API key") | |
# Optional arguments | |
parser.add_argument( | |
"--dry-run", | |
action="store_true", | |
help="Preview operations without making changes", | |
) | |
parser.add_argument( | |
"--sleep", | |
type=float, | |
default=0.2, | |
help="Sleep interval between API calls (default: 0.2s)", | |
) | |
parser.add_argument( | |
"--adaptive-rate-limiting", | |
action="store_true", | |
default=True, | |
help="Enable adaptive rate limiting based on error patterns", | |
) | |
parser.add_argument( | |
"--progress-db", | |
default="tag_removal_progress.db", | |
help="SQLite database for progress tracking (default: tag_removal_progress.db)", | |
) | |
parser.add_argument( | |
"--resume", | |
action="store_true", | |
help="Resume from previous run using progress database", | |
) | |
parser.add_argument( | |
"--log-level", | |
choices=["DEBUG", "INFO", "WARNING", "ERROR"], | |
default="INFO", | |
help="Logging level (default: INFO)", | |
) | |
parser.add_argument( | |
"--log-dir", default="./logs", help="Directory for log files (default: ./logs)" | |
) | |
parser.add_argument("--export-results", help="Export final results to CSV file") | |
return parser.parse_args() | |
def main(): | |
"""Main function for enterprise tag removal operations""" | |
global logger | |
try: | |
# Parse arguments | |
args = parse_arguments() | |
# Setup logging | |
log_level = getattr(logging, args.log_level.upper()) | |
logger, main_log_file, error_log_file = setup_logging(log_level, args.log_dir) | |
logger.info("=" * 80) | |
logger.info("ENTERPRISE IMMUTA COLUMN TAG REMOVAL TOOL") | |
logger.info("=" * 80) | |
logger.info(f"CSV File: {args.csv_file}") | |
logger.info(f"Immuta URL: {args.immuta_url}") | |
logger.info(f"Dry Run: {args.dry_run}") | |
logger.info(f"Sleep Interval: {args.sleep}s") | |
logger.info(f"Progress Database: {args.progress_db}") | |
logger.info(f"Log Files: {main_log_file}, {error_log_file}") | |
logger.info("=" * 80) | |
# Load CSV data | |
records = load_csv_data(args.csv_file) | |
# Initialize progress tracker | |
progress_tracker = ProgressTracker(args.progress_db) | |
# Initialize orchestrator | |
orchestrator = EnterpriseTagRemovalOrchestrator(args, progress_tracker) | |
# Validate references | |
logger.info("Starting reference validation...") | |
if not orchestrator.validate_and_cache_references(records): | |
logger.error("Reference validation failed. Aborting tag removal operation.") | |
return 1 | |
# Process records | |
logger.info("Starting tag removal processing...") | |
final_stats = orchestrator.process_records(records) | |
# Export results if requested | |
if args.export_results: | |
progress_tracker.export_results(args.export_results) | |
logger.info(f"Results exported to: {args.export_results}") | |
# Final summary | |
logger.info("=" * 80) | |
logger.info("TAG REMOVAL OPERATION COMPLETE") | |
logger.info("=" * 80) | |
logger.info(f"Total Records Processed: {sum(final_stats.values())}") | |
logger.info(f"Successful Removals: {final_stats['success']}") | |
logger.info(f"Failed Removals: {final_stats['failed']}") | |
logger.info(f"Tags Not Found: {final_stats['not_found']}") | |
logger.info(f"Skipped Records: {final_stats['skipped']}") | |
# Check for failures | |
if final_stats["failed"] > 0: | |
logger.warning( | |
f"⚠️ {final_stats['failed']} tag removals failed. Check error logs for details." | |
) | |
failed_records = progress_tracker.get_failed_records() | |
if failed_records: | |
logger.info("Failed records summary:") | |
for record in failed_records[:10]: # Show first 10 | |
logger.info(f" Row {record['row_id']}: {record['error_message']}") | |
if len(failed_records) > 10: | |
logger.info(f" ... and {len(failed_records) - 10} more failures") | |
if final_stats["success"] > 0: | |
logger.info( | |
f"✅ Successfully removed {final_stats['success']} tags from columns" | |
) | |
logger.info("=" * 80) | |
return 0 if final_stats["failed"] == 0 else 1 | |
except KeyboardInterrupt: | |
logger.info("Operation interrupted by user") | |
return 130 | |
except Exception as e: | |
if logger: | |
logger.error(f"Fatal error: {str(e)}") | |
logger.error(traceback.format_exc()) | |
else: | |
print(f"Fatal error: {str(e)}") | |
traceback.print_exc() | |
return 1 | |
if __name__ == "__main__": | |
exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment