Created
June 20, 2024 04:57
-
-
Save milo2012/0da0c406cf9fdf7f4603350feb94fd90 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import requests | |
import re | |
def main(): | |
# Parse command line arguments | |
parser = argparse.ArgumentParser(description='Dump data from a specific measurement/table in an InfluxDB instance via HTTP API.') | |
parser.add_argument('-u', '--url', type=str, required=True, help='URL of the InfluxDB query endpoint') | |
parser.add_argument('-d', '--database', type=str, help='Name of the database') | |
parser.add_argument('-t', '--table', type=str, help='Name of the measurement/table') | |
parser.add_argument('-n', '--num_rows', type=int, default=50, help='Number of rows to return (default: 50)') | |
parser.add_argument('-s', '--search', type=str, help='Search for columns containing this text') | |
parser.add_argument('--list-databases', action='store_true', help='List all databases') | |
parser.add_argument('--list-tables', action='store_true', help='List all tables (measurements) under a specified database') | |
parser.add_argument('--list-version', action='store_true', help='Show InfluxDB version') | |
args = parser.parse_args() | |
if args.list_databases: | |
list_databases(args) | |
elif args.list_tables: | |
list_tables(args) | |
elif args.list_version: | |
list_version(args) | |
elif args.database and args.table: | |
fetch_data(args) | |
else: | |
# Default behavior: Dump all data | |
dump_all_data(args) | |
def list_databases(args): | |
# Construct URL for SHOW DATABASES query | |
url = f"{args.url}/query?db=&q=SHOW DATABASES" | |
try: | |
# Perform GET request to fetch databases | |
response = requests.get(url) | |
response.raise_for_status() # Raise exception for bad status codes | |
# Parse JSON response | |
data = response.json() | |
# Check if there are results | |
if 'results' in data and len(data['results']) > 0: | |
for result in data['results']: | |
if 'series' in result: | |
for series in result['series']: | |
if 'values' in series: | |
# Extract database names from the response | |
databases = [db[0] for db in series['values']] | |
for database_name in databases: | |
print(f"Database: {database_name}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching databases: {e}") | |
def list_tables(args): | |
if args.database: | |
# Construct URL for SHOW MEASUREMENTS query | |
url = f"{args.url}/query?db={args.database}&q=SHOW MEASUREMENTS" | |
else: | |
# Construct URL for SHOW MEASUREMENTS query across all databases | |
url = f"{args.url}/query?db=&q=SHOW MEASUREMENTS" | |
try: | |
# Perform GET request to fetch measurements | |
response = requests.get(url) | |
response.raise_for_status() # Raise exception for bad status codes | |
# Parse JSON response | |
data = response.json() | |
# Check if there are results | |
if 'results' in data and len(data['results']) > 0: | |
for result in data['results']: | |
if 'series' in result: | |
for series in result['series']: | |
if 'values' in series: | |
# Extract measurement names from the response | |
measurements = [measurement[0] for measurement in series['values']] | |
# Filter out non-alphanumeric measurements | |
measurements = [m for m in measurements if is_valid_measurement_name(m)] | |
for measurement_name in measurements: | |
print(f"Database: {args.database}, Table: {measurement_name}") | |
except requests.exceptions.RequestException as e: | |
if args.database: | |
print(f"Error fetching measurements for database '{args.database}': {e}") | |
else: | |
print(f"Error fetching measurements across databases: {e}") | |
def list_version(args): | |
# Construct URL for SHOW VERSION query | |
url = f"{args.url}/ping" | |
try: | |
# Perform GET request to fetch version | |
response = requests.get(url) | |
response.raise_for_status() # Raise exception for bad status codes | |
# Extract and print InfluxDB version | |
version_info = response.json() | |
print(f"InfluxDB Version: {version_info['version']}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching InfluxDB version: {e}") | |
def fetch_data(args): | |
# Fetch data for a specific measurement/table | |
# Construct URL for SELECT * query with LIMIT | |
url = f"{args.url}/query?db={args.database}&q=SELECT * FROM \"{args.table}\" LIMIT {args.num_rows}" | |
try: | |
# Perform GET request to fetch data | |
response = requests.get(url) | |
response.raise_for_status() # Raise exception for bad status codes | |
# Parse JSON response for data | |
data = response.json() | |
# Check if there are results | |
if 'results' in data and len(data['results']) > 0: | |
for result in data['results']: | |
if 'series' in result: | |
for series in result['series']: | |
if 'columns' in series and 'values' in series: | |
# Filter columns based on search text | |
if args.search: | |
filtered_columns = [col for col in series['columns'] if args.search.lower() in col.lower()] | |
else: | |
filtered_columns = series['columns'] | |
# Print column names as headers | |
print(",".join(filtered_columns)) | |
# Print retrieved data points | |
for record in series['values']: | |
print(",".join(map(str, record))) | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching data for measurement '{args.table}' in database '{args.database}': {e}") | |
def dump_all_data(args): | |
# Fetch and dump all data from all measurements in all databases | |
# Construct URL for SHOW DATABASES query | |
url_databases = f"{args.url}/query?db=&q=SHOW DATABASES" | |
try: | |
# Perform GET request to fetch databases | |
response_databases = requests.get(url_databases) | |
response_databases.raise_for_status() # Raise exception for bad status codes | |
# Parse JSON response for databases | |
data_databases = response_databases.json() | |
# Check if there are results | |
if 'results' in data_databases and len(data_databases['results']) > 0: | |
for result_db in data_databases['results']: | |
if 'series' in result_db: | |
for series_db in result_db['series']: | |
if 'values' in series_db: | |
# Extract database names from the response | |
databases = [db[0] for db in series_db['values']] | |
for database_name in databases: | |
#print(f"Database: {database_name}") | |
# Construct URL for SHOW MEASUREMENTS query | |
url_measurements = f"{args.url}/query?db={database_name}&q=SHOW MEASUREMENTS" | |
try: | |
# Perform GET request to fetch measurements | |
response_measurements = requests.get(url_measurements) | |
response_measurements.raise_for_status() # Raise exception for bad status codes | |
# Parse JSON response for measurements | |
data_measurements = response_measurements.json() | |
# Check if there are results | |
if 'results' in data_measurements and len(data_measurements['results']) > 0: | |
for result_meas in data_measurements['results']: | |
if 'series' in result_meas: | |
for series_meas in result_meas['series']: | |
if 'values' in series_meas: | |
# Extract measurement names from the response | |
measurements = [meas[0] for meas in series_meas['values']] | |
# Filter out non-alphanumeric measurements | |
measurements = [m for m in measurements if is_valid_measurement_name(m)] | |
for measurement_name in measurements: | |
print(f"Database: {database_name} Table: {measurement_name}") | |
# Construct URL for SELECT * query with LIMIT | |
url_data = f"{args.url}/query?db={database_name}&q=SELECT * FROM \"{measurement_name}\" LIMIT {args.num_rows}" | |
try: | |
# Perform GET request to fetch data | |
response_data = requests.get(url_data) | |
response_data.raise_for_status() # Raise exception for bad status codes | |
# Parse JSON response for data | |
data_data = response_data.json() | |
# Check if there are results | |
if 'results' in data_data and len(data_data['results']) > 0: | |
for result_data in data_data['results']: | |
if 'series' in result_data: | |
for series_data in result_data['series']: | |
if 'columns' in series_data and 'values' in series_data: | |
# Filter columns based on search text | |
if args.search: | |
filtered_columns = [col for col in series_data['columns'] if args.search.lower() in col.lower()] | |
#print(filtered_columns) | |
else: | |
filtered_columns = series_data['columns'] | |
# Print column names as headers | |
if len(filtered_columns)>0: | |
print(",".join(filtered_columns)) | |
# Print retrieved data points | |
for record in series_data['values']: | |
print(",".join(map(str, record))) | |
print(f"\n") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching data for measurement '{measurement_name}' in database '{database_name}': {e}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching measurements for database '{database_name}': {e}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching databases: {e}") | |
def is_valid_measurement_name(name): | |
# Regular expression to check if name consists only of alphanumeric characters and underscores | |
return re.match(r'^[a-zA-Z0-9_]+$', name) is not None | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment