Last active
May 25, 2025 01:20
-
-
Save emctoo/98078b06b3b31bcc0845d5b0b9202f8f to your computer and use it in GitHub Desktop.
Kafka Replay
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Kafka connection settings | |
| BOOTSTRAP_SERVERS=localhost:9092 | |
| # Topic to replay messages from (required) | |
| topic=your-kafka-topic | |
| # Time range settings (timestamp format - default approach) | |
| START_TIME=2025-05-16T00:00:01 | |
| END_TIME=2025-05-16T23:59:59 | |
| # Alternative time range settings (date format - requires --use-date-range flag) | |
| START_DATE=2025-05-16 | |
| END_DATE=2025-05-16 | |
| # Optional filtering | |
| SYMBOLS=AAPL,MSFT,GOOG | |
| # Logging settings | |
| LOG_LEVEL=INFO |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| .venv/ | |
| .env | |
| .envrc |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| 3.12 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [project] | |
| name = "kafka-replay" | |
| version = "0.1.0" | |
| description = "Replay Kafka historical data" | |
| readme = "README.md" | |
| requires-python = ">=3.12" | |
| dependencies = [ | |
| "confluent-kafka>=2.10.0", | |
| "python-dotenv>=1.1.0", | |
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # coding: utf8 | |
| import os | |
| import argparse | |
| import json | |
| import logging | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from typing import Optional | |
| import dotenv | |
| from confluent_kafka import Consumer, TopicPartition | |
| dotenv.load_dotenv() | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(message)s') | |
| log = logging.getLogger('replay') | |
| kafka_conn_logger = logging.getLogger('confluent_kafka') | |
| kafka_conn_logger.propagate = False | |
| kafka_conn_logger.setLevel(logging.WARNING) | |
| class KafkaReplayer: | |
| """ | |
| A utility class for replaying Kafka messages within a specific time range. | |
| This class allows retrieving historical data from Kafka topics by searching | |
| for messages within a given time range. It uses binary search to efficiently | |
| locate the starting offsets for each partition. | |
| """ | |
| def __init__(self, topic_name: str, partitions=None, **configs): | |
| """ | |
| Initialize a KafkaReplayer instance. | |
| Args: | |
| topic_name: The Kafka topic to replay messages from | |
| partitions: Optional list of specific partitions to replay from | |
| **configs: Configuration options for the Kafka consumer | |
| Raises: | |
| ValueError: If topic_name is not provided | |
| """ | |
| if not topic_name: | |
| raise ValueError('topic_name is required') | |
| self._topic_name = topic_name | |
| self._partitions = partitions | |
| self._configs = self._configs_with_defaults(configs) | |
| self._log_interval = 10000 | |
| self._logger = self._create_logger() | |
| def _create_logger(self): | |
| """ | |
| Create and configure a logger instance for this class. | |
| Returns: | |
| A configured logger instance | |
| """ | |
| self._logger = logging.getLogger(__name__) | |
| null_handler = logging.NullHandler() | |
| null_handler.setLevel(logging.DEBUG) | |
| self._logger.addHandler(null_handler) | |
| return self._logger | |
| @staticmethod | |
| def _configs_with_defaults(configs): | |
| """ | |
| Convert and normalize Kafka configuration parameters. | |
| This method converts configuration parameters from kafka-python style | |
| to confluent_kafka style and adds sensible defaults. | |
| Args: | |
| configs: Dictionary of configuration parameters | |
| Returns: | |
| Dictionary of normalized confluent_kafka configuration parameters | |
| """ | |
| conf = {} | |
| # Handle bootstrap_servers specially | |
| if 'bootstrap_servers' in configs: | |
| if isinstance(configs['bootstrap_servers'], list): | |
| conf['bootstrap.servers'] = ','.join(configs['bootstrap_servers']) | |
| else: | |
| conf['bootstrap.servers'] = configs['bootstrap_servers'] | |
| # Add unique group id if not specified | |
| if 'group.id' not in configs and 'group_id' not in configs: | |
| conf['group.id'] = str(uuid.uuid4()) | |
| elif 'group_id' in configs: | |
| conf['group.id'] = configs['group_id'] | |
| # Add consumer timeout | |
| if 'consumer_timeout_ms' in configs: | |
| conf['session.timeout.ms'] = configs['consumer_timeout_ms'] | |
| else: | |
| conf['session.timeout.ms'] = 10000 | |
| # Add any other configs as-is with appropriate format conversion | |
| for key, value in configs.items(): | |
| if key not in ('bootstrap_servers', 'group_id', 'consumer_timeout_ms'): | |
| # Convert snake_case to dot.case | |
| conf_key = key.replace('_', '.') | |
| conf[conf_key] = value | |
| return conf | |
| @staticmethod | |
| def _get_time_millis(): | |
| return int(round(time.time() * 1000)) | |
| def _create_consumer(self) -> Consumer: | |
| return Consumer(self._configs) | |
| def _find_seek_points(self, start_time: int) -> dict: | |
| """ | |
| Find the initial seek points for each partition based on the start time. | |
| This method performs a binary search on each partition to find the offset | |
| of the message that is closest to the given start time but not earlier. | |
| Args: | |
| start_time: The start timestamp in milliseconds | |
| Returns: | |
| A dictionary mapping each TopicPartition to its corresponding seek offset | |
| """ | |
| seek_points = {} | |
| consumer: Consumer = self._create_consumer() | |
| try: | |
| # Get metadata for the topic | |
| cluster_metadata = consumer.list_topics(topic=self._topic_name) | |
| topic_metadata = cluster_metadata.topics[self._topic_name] | |
| all_partitions = [p.id for p in topic_metadata.partitions.values()] | |
| # Filter partitions if needed | |
| partitions = self._partitions if self._partitions else all_partitions | |
| topic_partitions = [TopicPartition(self._topic_name, p) for p in partitions] | |
| for topic_partition in topic_partitions: | |
| # Binary search to find the appropriate offset for each partition | |
| consumer.assign([topic_partition]) | |
| # Get beginning and end offsets | |
| low_offsets, high_offsets = consumer.get_watermark_offsets(topic_partition) | |
| start_offset = low_offsets | |
| end_offset = high_offsets | |
| target_offset = self._binary_search(consumer, topic_partition, start_offset, end_offset, start_time) | |
| seek_points[topic_partition] = target_offset | |
| self._logger.debug(f'Start offset for {topic_partition} is {target_offset}') | |
| self._logger.info(f'Start offsets: {seek_points}') | |
| return seek_points | |
| finally: | |
| consumer.close() | |
| def _get_next_if_available(self, consumer): | |
| return consumer.poll(1.0) | |
| def _binary_search(self, consumer: Consumer, tp: TopicPartition, start: int, end: int, target_time: int) -> int: | |
| """ | |
| Perform a binary search to find the offset of the message that is closest to the target time but not earlier. | |
| Args: | |
| consumer: The Kafka consumer instance | |
| tp: The TopicPartition to search within | |
| start: The starting offset for the search | |
| end: The ending offset for the search | |
| target_time: The target timestamp in milliseconds | |
| Returns: | |
| The offset of the found message, or the start offset if no suitable message is found | |
| """ | |
| # Performance optimization: if the range is too large, use a faster approximation first | |
| if end - start > 100000: | |
| # Do a faster preliminary search with larger steps to narrow down the range | |
| return self._accelerated_binary_search(consumer, tp, start, end, target_time) | |
| # Standard binary search for smaller ranges | |
| if start >= end - 1: | |
| consumer.seek(tp, start) | |
| msg = self._get_next_if_available(consumer) | |
| if msg and msg.timestamp()[1] < target_time: | |
| return start + 1 | |
| return start | |
| insertion_point = int(start + ((end - start) / 2)) | |
| consumer.seek(tp, insertion_point) | |
| msg = self._get_next_if_available(consumer) | |
| if msg: | |
| ts = msg.timestamp()[1] # timestamp() returns (timestamp_type, timestamp) | |
| if ts < target_time: | |
| return self._binary_search(consumer, tp, insertion_point + 1, end, target_time) | |
| else: | |
| return self._binary_search(consumer, tp, start, insertion_point, target_time) | |
| # If we couldn't get a message, try the beginning | |
| return start | |
| def _accelerated_binary_search(self, consumer, tp, start: int, end: int, target_time: int) -> int: | |
| """ | |
| A faster binary search approximation for large offset ranges. | |
| This method samples points in the range to quickly narrow down where | |
| the target timestamp might be, then uses standard binary search | |
| for more precision. | |
| Args: | |
| consumer: The Kafka consumer instance | |
| tp: The TopicPartition to search within | |
| start: The starting offset for the search | |
| end: The ending offset for the search | |
| target_time: The target timestamp in milliseconds | |
| Returns: | |
| The offset approximately matching the target time | |
| """ | |
| self._logger.debug(f"Using accelerated search for large range: {start} to {end}") | |
| # Sample several points in the range to build a time-to-offset mapping | |
| sample_points = 5 | |
| offsets = [] | |
| timestamps = [] | |
| # Include start and end points | |
| check_offsets = [start] | |
| # Add evenly spaced sample points | |
| step = (end - start) // (sample_points + 1) | |
| for i in range(1, sample_points + 1): | |
| check_offsets.append(start + i * step) | |
| # Check each offset and collect timestamps | |
| for offset in check_offsets: | |
| consumer.seek(tp, offset) | |
| msg = self._get_next_if_available(consumer) | |
| if msg: | |
| ts = msg.timestamp()[1] | |
| offsets.append(offset) | |
| timestamps.append(ts) | |
| if not timestamps: | |
| # If we couldn't get any timestamps, fall back to standard search | |
| return self._binary_search(consumer, tp, start, end, target_time) | |
| # Find the closest points that bracket our target time | |
| for i in range(len(timestamps) - 1): | |
| if timestamps[i] <= target_time <= timestamps[i + 1]: | |
| # Found bracketing timestamps, use standard binary search in this smaller range | |
| return self._binary_search(consumer, tp, offsets[i], offsets[i + 1], target_time) | |
| # If we didn't find bracketing points, check endpoints | |
| if target_time <= timestamps[0]: | |
| return start | |
| elif target_time >= timestamps[-1]: | |
| # Do a standard binary search from the last sample to the end | |
| return self._binary_search(consumer, tp, offsets[-1], end, target_time) | |
| # Fallback to standard binary search | |
| return self._binary_search(consumer, tp, start, end, target_time) | |
| def replay(self, start_time: int, end_time: int): | |
| """ | |
| Replay Kafka messages over the specified time range. | |
| This method yields messages from the specified partitions that fall within | |
| the given time range (inclusive). | |
| Args: | |
| start_time: The start timestamp in milliseconds | |
| end_time: The end timestamp in milliseconds | |
| Yields: | |
| Kafka messages found within the given time range | |
| Raises: | |
| ValueError: If the specified start or end time is invalid | |
| KafkaException: If there's an error communicating with Kafka | |
| """ | |
| # Validate input parameters | |
| if start_time < 0: | |
| raise ValueError('start_time must be non-negative') | |
| if end_time < 0: | |
| raise ValueError('end_time must be non-negative') | |
| if start_time > self._get_time_millis(): | |
| raise ValueError('start_time must not be in the future') | |
| if start_time > end_time: | |
| raise ValueError('end_time must be at least start_time') | |
| count = 0 | |
| last_timestamp = 0 | |
| start_time_iso = datetime.fromtimestamp(start_time/1000).isoformat() | |
| end_time_iso = datetime.fromtimestamp(end_time/1000).isoformat() | |
| self._logger.info(f'Starting replay from {start_time_iso} to {end_time_iso}') | |
| # Find the starting offsets for each partition | |
| seek_points = self._find_seek_points(start_time) | |
| consumer = self._create_consumer() | |
| try: | |
| # Get the topic partitions | |
| cluster_metadata = consumer.list_topics(topic=self._topic_name) | |
| topic_metadata = cluster_metadata.topics[self._topic_name] | |
| all_partitions = [p.id for p in topic_metadata.partitions.values()] | |
| partitions = self._partitions if self._partitions else all_partitions | |
| # Assign and seek to each partition | |
| topic_partitions = [TopicPartition(self._topic_name, p) for p in partitions] | |
| consumer.assign(topic_partitions) | |
| for tp, offset in seek_points.items(): | |
| consumer.seek(tp, offset) | |
| # Track partitions still active | |
| active_partitions = set(p.id for p in topic_partitions) | |
| message_counts_by_partition = {p: 0 for p in active_partitions} | |
| # Set poll timeout based on active partitions | |
| poll_timeout = 0.1 if len(active_partitions) > 10 else 1.0 | |
| last_log_time = time.time() | |
| while active_partitions: | |
| # Adjust poll timeout dynamically based on message flow | |
| msg = consumer.poll(poll_timeout) | |
| if not msg: | |
| continue | |
| if msg.error(): | |
| error_code = msg.error().code() | |
| self._logger.error(f"Consumer error: {msg.error()}") | |
| # Handle specific error codes if needed | |
| from confluent_kafka import KafkaException, KafkaError | |
| if error_code == KafkaError._PARTITION_EOF: | |
| # End of partition, mark as complete | |
| active_partitions.discard(msg.partition()) | |
| self._logger.debug(f'Reached end of partition {msg.partition()}') | |
| elif error_code in (KafkaError._TRANSPORT, KafkaError._ALL_BROKERS_DOWN): | |
| # Fatal error, re-raise | |
| raise KafkaException(msg.error()) | |
| continue | |
| timestamp_type, timestamp = msg.timestamp() | |
| last_timestamp = timestamp | |
| partition = msg.partition() | |
| # Update message count for this partition | |
| if partition in message_counts_by_partition: | |
| message_counts_by_partition[partition] += 1 | |
| if timestamp > end_time: | |
| # We've gone past the end time for this partition | |
| active_partitions.discard(partition) | |
| self._logger.debug(f'Completed partition {partition} with {message_counts_by_partition[partition]} messages') | |
| elif start_time <= timestamp <= end_time: | |
| # Message is within our time range | |
| yield msg | |
| count += 1 | |
| # Log progress periodically | |
| current_time = time.time() | |
| if count % self._log_interval == 0 or (current_time - last_log_time > 10): | |
| last_log_time = current_time | |
| progress_pct = 100.0 * (timestamp - start_time) / (end_time - start_time) if end_time > start_time else 100 | |
| self._logger.info( | |
| f'Processed {count:,} messages, last timestamp: {datetime.fromtimestamp(timestamp/1000).isoformat()}, ' | |
| f'progress: {min(100, max(0, progress_pct)):.1f}%, active partitions: {len(active_partitions)}' | |
| ) | |
| # If all partitions are done, exit | |
| if not active_partitions: | |
| self._logger.info('No more active partitions. Terminating.') | |
| break | |
| except Exception as e: | |
| self._logger.error(f'Unexpected exception during replay: {str(e)}', exc_info=True) | |
| raise | |
| finally: | |
| # Log summary statistics | |
| elapsed = time.time() - (start_time/1000) | |
| message_rate = count / elapsed if elapsed > 0 else 0 | |
| self._logger.info( | |
| f'Replay complete. Processed {count:,} messages in {elapsed:.1f} seconds ' | |
| f'({message_rate:.1f} msgs/sec), last timestamp: {datetime.fromtimestamp(last_timestamp/1000).isoformat()}' | |
| ) | |
| consumer.close() | |
| def deserializer(text: Optional[bytes]): | |
| return json.loads(text.decode('utf-8')) if text else None | |
| def parse_date(s): | |
| return datetime.strptime(s, '%Y-%m-%d') | |
| def parse_datetime(s): | |
| """ | |
| Parse a datetime string in ISO format. | |
| Accepts formats: | |
| - YYYY-MM-DD (interpreted as YYYY-MM-DD 00:00:00) | |
| - YYYY-MM-DDThh:mm:ss | |
| - YYYY-MM-DD hh:mm:ss | |
| """ | |
| if 'T' in s: | |
| return datetime.fromisoformat(s) | |
| elif ' ' in s and len(s) > 10: | |
| # Format with space as separator | |
| return datetime.strptime(s, '%Y-%m-%d %H:%M:%S') | |
| else: | |
| # Just a date | |
| return datetime.strptime(s, '%Y-%m-%d') | |
| def replay_raw_to_sqlite(replayer, start_dt: datetime, end_dt: datetime, soi=None, db_path: Optional[str] = None, | |
| dt_fmt='%Y-%m-%d %H:%M:%S', filename_fmt='%Y%m%d', batch_size=10000): | |
| """ | |
| Replay raw data and store it in SQLite database. | |
| Args: | |
| replayer: The KafkaReplayer instance | |
| start_dt: Start datetime | |
| end_dt: End datetime | |
| soi: Set of symbols of interest (in bytes format) | |
| db_path: Optional path to SQLite database file | |
| dt_fmt: Datetime format | |
| filename_fmt: Filename format for default database name | |
| batch_size: Number of records to insert in a single database transaction | |
| Returns: | |
| Path to the created SQLite database | |
| """ | |
| import sqlite3 | |
| from pathlib import Path | |
| start, stop = int(start_dt.timestamp() * 1000), int(end_dt.timestamp() * 1000) | |
| db_path = db_path or str(Path.cwd() / f"{start_dt.strftime(filename_fmt)}.db") | |
| db_dir = Path(db_path).parent | |
| if not db_dir.exists(): | |
| db_dir.mkdir(parents=True, exist_ok=True) | |
| log.info(f"Replaying data from {start_dt.isoformat()} to {end_dt.isoformat()}") | |
| log.info(f"Storing results in {db_path}") | |
| if soi: | |
| log.info(f"Filtering for {len(soi)} symbols") | |
| # Create connection and table | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| # Enable WAL mode for better performance | |
| cursor.execute("PRAGMA journal_mode=WAL") | |
| cursor.execute("PRAGMA synchronous=NORMAL") | |
| cursor.execute("PRAGMA temp_store=MEMORY") | |
| # Create table with appropriate columns | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS price_data ( | |
| record_offset INTEGER, | |
| record_timestamp REAL, | |
| payload_timestamp TEXT, | |
| price_date TEXT, | |
| price_time TEXT, | |
| sequence_number INTEGER, | |
| market_status TEXT, | |
| OSID INTEGER, | |
| symbol TEXT, | |
| high REAL, | |
| low REAL, | |
| last REAL, | |
| open REAL, | |
| volume INTEGER, | |
| price_time_original TEXT, | |
| sec INTEGER, | |
| nano_sec INTEGER | |
| ) | |
| ''') | |
| # Create index on symbol for faster queries | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_symbol ON price_data (symbol)') | |
| # Create index on timestamp for faster time-based queries | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON price_data (record_timestamp)') | |
| try: | |
| batch = [] | |
| total_records = 0 | |
| start_time = time.time() | |
| last_log_time = start_time | |
| for i, record in enumerate(replayer.replay(start, stop)): | |
| value = record.value() | |
| if value is None: | |
| continue | |
| try: | |
| payload, price_dt, ts_second, ts_nano = value.rsplit(b',', 3) | |
| # Parse payload data | |
| payload_parts = payload.split(b',') | |
| if len(payload_parts) < 4: # Ensure we have at least enough parts to get the symbol | |
| continue | |
| symbol = payload_parts[3].decode() | |
| # Check if symbol is in our set of interest | |
| if soi and symbol.encode() not in soi: | |
| continue | |
| # With confluent_kafka, record.timestamp() returns (timestamp_type, timestamp) | |
| _, record_timestamp = record.timestamp() | |
| record_timestamp = record_timestamp / 1000 # milliseconds to seconds | |
| payload_timestamp = f"{ts_second.decode()}.{ts_nano.decode():0>9}" | |
| price_date = price_dt[:8].decode() | |
| price_time = price_dt[8:].decode() | |
| # Prepare data for insertion | |
| row = ( | |
| record.offset(), | |
| record_timestamp, | |
| payload_timestamp, | |
| price_date, | |
| price_time, | |
| *[p.decode() if isinstance(p, bytes) else p for p in payload_parts], | |
| ts_second.decode(), | |
| ts_nano.decode() | |
| ) | |
| batch.append(row) | |
| # Execute batch insert when batch size is reached | |
| if len(batch) >= batch_size: | |
| cursor.executemany(''' | |
| INSERT INTO price_data VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', batch) | |
| conn.commit() | |
| total_records += len(batch) | |
| batch = [] | |
| current_time = time.time() | |
| if current_time - last_log_time >= 5: # Log every 5 seconds | |
| last_log_time = current_time | |
| elapsed = current_time - start_time | |
| rate = total_records / elapsed if elapsed > 0 else 0 | |
| log.info( | |
| f'Progress: {i} msgs processed, {total_records:,} records stored, ' | |
| f'{rate:.1f} records/sec, last timestamp: {datetime.fromtimestamp(record_timestamp).isoformat()}' | |
| ) | |
| except Exception as e: | |
| log.error(f"Error processing record: {e}", exc_info=True) | |
| continue | |
| # Insert any remaining records | |
| if batch: | |
| cursor.executemany(''' | |
| INSERT INTO price_data VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', batch) | |
| conn.commit() | |
| total_records += len(batch) | |
| # Create some useful views | |
| cursor.execute(''' | |
| CREATE VIEW IF NOT EXISTS latest_prices AS | |
| SELECT symbol, last, high, low, open, volume, price_date, price_time | |
| FROM price_data | |
| WHERE (symbol, record_timestamp) IN ( | |
| SELECT symbol, MAX(record_timestamp) | |
| FROM price_data | |
| GROUP BY symbol | |
| ) | |
| ''') | |
| # Add daily summary view | |
| cursor.execute(''' | |
| CREATE VIEW IF NOT EXISTS daily_summary AS | |
| SELECT | |
| symbol, | |
| price_date, | |
| MIN(low) as daily_low, | |
| MAX(high) as daily_high, | |
| SUM(volume) as daily_volume, | |
| (SELECT open FROM price_data p2 | |
| WHERE p2.symbol = p1.symbol AND p2.price_date = p1.price_date | |
| ORDER BY record_timestamp ASC LIMIT 1) as daily_open, | |
| (SELECT last FROM price_data p2 | |
| WHERE p2.symbol = p1.symbol AND p2.price_date = p1.price_date | |
| ORDER BY record_timestamp DESC LIMIT 1) as daily_close | |
| FROM price_data p1 | |
| GROUP BY symbol, price_date | |
| ''') | |
| # Create statistics table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS replay_stats ( | |
| start_time TEXT, | |
| end_time TEXT, | |
| total_records INTEGER, | |
| symbols_count INTEGER, | |
| processing_duration_seconds REAL | |
| ) | |
| ''') | |
| # Record statistics | |
| elapsed_time = time.time() - start_time | |
| cursor.execute( | |
| "INSERT INTO replay_stats VALUES (?, ?, ?, ?, ?)", | |
| ( | |
| start_dt.isoformat(), | |
| end_dt.isoformat(), | |
| total_records, | |
| len(soi) if soi else 0, | |
| elapsed_time | |
| ) | |
| ) | |
| conn.commit() | |
| # Log final statistics | |
| log.info(f"Total records saved to database: {total_records:,}") | |
| log.info(f"Processing time: {elapsed_time:.2f} seconds") | |
| log.info(f"Processing rate: {total_records / elapsed_time:.1f} records/sec") | |
| log.info(f"Database location: {db_path}") | |
| return db_path | |
| finally: | |
| # Switch back to delete mode which is better for reading | |
| cursor.execute("PRAGMA journal_mode=DELETE") | |
| conn.close() | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Replay messages from Kafka topic by time range') | |
| parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap_servers', | |
| type=str, default=os.getenv('BOOTSTRAP_SERVERS', 'localhost:9092'), | |
| help='Kafka bootstrap servers (comma-separated)') | |
| parser.add_argument('-t', '--topic', dest='topic', required=True if not os.getenv('TOPIC') else False, | |
| default=os.getenv('TOPIC'), help='Kafka topic to replay') | |
| parser.add_argument('--symbols', type=str, default=os.getenv('SYMBOLS'), | |
| help='Comma-separated list of symbols to filter') | |
| # Time selection arguments | |
| time_group = parser.add_argument_group('time range selection') | |
| time_group.add_argument('--start-time', type=str, default=os.getenv('START_TIME'), | |
| help='Start timestamp in ISO format: YYYY-MM-DDThh:mm:ss (default)') | |
| time_group.add_argument('--end-time', type=str, default=os.getenv('END_TIME'), | |
| help='End timestamp in ISO format: YYYY-MM-DDThh:mm:ss (default)') | |
| time_group.add_argument('--use-date-range', dest='use_date_range', action='store_true', | |
| help='Use date range (YYYY-MM-DD) instead of precise timestamps') | |
| time_group.add_argument('--start-date', type=parse_date, default=os.getenv('START_DATE'), | |
| help='Start date in YYYY-MM-DD format (when using --use-date-range)') | |
| time_group.add_argument('--end-date', type=parse_date, default=os.getenv('END_DATE'), | |
| help='End date in YYYY-MM-DD format (when using --use-date-range)') | |
| parser.add_argument('--db-path', type=str, default=None, | |
| help='Path to output SQLite database. Default: current directory with date as filename') | |
| parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], | |
| default='INFO', help='Set logging level') | |
| parser.add_argument('--batch-size', type=int, default=10000, | |
| help='Batch size for database inserts') | |
| parser.add_argument('--partitions', type=str, | |
| help='Comma-separated list of partitions to replay. Default: all partitions') | |
| args = parser.parse_args() | |
| # Set log level | |
| log.setLevel(getattr(logging, args.log_level)) | |
| # Validate arguments | |
| if not args.topic: | |
| parser.error("Topic is required. Specify with -t/--topic or set 'TOPIC' environment variable.") | |
| # Initialize datetime range | |
| start_date_time = None | |
| end_date_time = None | |
| # Process time range arguments | |
| if args.use_date_range: | |
| if not args.start_date: | |
| parser.error("Start date is required with --use-date-range. Specify with --start-date or set 'START_DATE' environment variable.") | |
| if not args.end_date: | |
| parser.error("End date is required with --use-date-range. Specify with --end-date or set 'END_DATE' environment variable.") | |
| # Ensure end date is not before start date | |
| if args.end_date < args.start_date: | |
| parser.error(f"End date {args.end_date} is before start date {args.start_date}") | |
| # Adjust datetime range to include full days | |
| start_date_time = datetime.combine(args.start_date, datetime.min.time()) | |
| end_date_time = datetime.combine(args.end_date, datetime.max.time()) | |
| else: | |
| # Default: Use timestamp range | |
| start_time = args.start_time or os.getenv('START_TIME') | |
| end_time = args.end_time or os.getenv('END_TIME') | |
| if not start_time: | |
| parser.error("Start timestamp is required. Specify with --start-time or set 'START_TIME' environment variable.") | |
| if not end_time: | |
| parser.error("End timestamp is required. Specify with --end-time or set 'END_TIME' environment variable.") | |
| try: | |
| start_date_time = parse_datetime(start_time) | |
| except ValueError as e: | |
| parser.error(f"Invalid start timestamp format: {e}. Use ISO format YYYY-MM-DDThh:mm:ss") | |
| try: | |
| end_date_time = parse_datetime(end_time) | |
| except ValueError as e: | |
| parser.error(f"Invalid end timestamp format: {e}. Use ISO format YYYY-MM-DDThh:mm:ss") | |
| # Ensure end time is not before start time | |
| if end_date_time < start_date_time: | |
| parser.error(f"End time {end_time} is before start time {start_time}") | |
| # Process other arguments | |
| topic = args.topic | |
| bootstrap_servers = args.bootstrap_servers | |
| # Handle partitions | |
| partitions = None | |
| if args.partitions: | |
| try: | |
| partitions = [int(p.strip()) for p in args.partitions.split(',')] | |
| log.info(f"Replaying from partitions: {partitions}") | |
| except ValueError: | |
| parser.error("Partitions must be comma-separated integers") | |
| # Create replayer | |
| try: | |
| replayer = KafkaReplayer(topic, partitions=partitions, **{'bootstrap.servers': bootstrap_servers}) | |
| # Process symbols | |
| soi = None | |
| if args.symbols: | |
| symbols = [s.strip() for s in args.symbols.split(',')] | |
| soi = set(s.upper().encode() for s in symbols) | |
| log.info(f"Filtering for symbols: {', '.join(symbols)}") | |
| # Run replay | |
| db_path = replay_raw_to_sqlite( | |
| replayer, | |
| start_date_time, | |
| end_date_time, | |
| soi=soi, | |
| db_path=args.db_path, | |
| batch_size=args.batch_size | |
| ) | |
| log.info(f"Data saved to SQLite database: {db_path}") | |
| except KeyboardInterrupt: | |
| log.info("Process interrupted by user. Exiting.") | |
| return 1 | |
| except Exception as e: | |
| log.error(f"Error during replay: {str(e)}", exc_info=True) | |
| return 1 | |
| return 0 | |
| if __name__ == '__main__': | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| version = 1 | |
| revision = 2 | |
| requires-python = ">=3.12" | |
| [[package]] | |
| name = "confluent-kafka" | |
| version = "2.10.0" | |
| source = { registry = "https://pypi.org/simple" } | |
| sdist = { url = "https://files.pythonhosted.org/packages/a0/c5/22087627478d2cc97b864dd1774c1e2d4007acc22b8f78aec5a7a41f6436/confluent_kafka-2.10.0.tar.gz", hash = "sha256:30a346908f3ad49c4bc1cb5557e7a8ce484190f8633aa18f9b87b2620809ac13", size = 193775, upload-time = "2025-04-17T21:18:43.965Z" } | |
| wheels = [ | |
| { url = "https://files.pythonhosted.org/packages/c1/fa/00398f8906da8fbefd8732501cffc5910b1a55af03a292b6720c19258c97/confluent_kafka-2.10.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:2268a607d374325e3a8fc779d8639de4344ce4d1ad2b88e8f9d13d97baae4e71", size = 3537479, upload-time = "2025-04-17T21:17:02.417Z" }, | |
| { url = "https://files.pythonhosted.org/packages/03/7b/f940f1cd8c25d61cecc5f6ca044871f90370a80bd0d0a6595852151e4669/confluent_kafka-2.10.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a89e9ba5c8e18891b3bbf333001a9e12af43bd5bc3c3353065109582f3f078fc", size = 3060467, upload-time = "2025-04-17T21:17:05.342Z" }, | |
| { url = "https://files.pythonhosted.org/packages/8a/f9/a30b62c78178be818a30fab23e826d3c5cdf60f53e2ebb31fa3e5fdded00/confluent_kafka-2.10.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:7252fee8d7287bae71f1a506967b3782e67ba5b699c4d095d16fabce92a63cfd", size = 15260056, upload-time = "2025-04-17T21:17:10.918Z" }, | |
| { url = "https://files.pythonhosted.org/packages/6e/00/b334804360b6490f5dfa910432ad9dbba7f91baa76168299458d03c50e40/confluent_kafka-2.10.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:ddb9400786eedd8a6c0cbcceaddb8329a2597f4c02dc72389e83859da47d22d6", size = 3852443, upload-time = "2025-04-17T21:17:16.996Z" }, | |
| { url = "https://files.pythonhosted.org/packages/c1/21/8c2b37cdc5ef60dee2ed9ac05dd89eaef822d4dd284b7d7e4678aa18aba6/confluent_kafka-2.10.0-cp312-cp312-win_amd64.whl", hash = "sha256:ef5aad8c04c54fe6ef89dc08a4ba24a80be90b05bbbc3348370c1c51222e0a5e", size = 3977821, upload-time = "2025-04-17T21:17:21.253Z" }, | |
| { url = "https://files.pythonhosted.org/packages/e2/3c/db66f729b129e8e35af92571602f522091f753da8f42a7e217d5bed23ad6/confluent_kafka-2.10.0-cp313-cp313-macosx_13_0_arm64.whl", hash = "sha256:10230c3da4cd047cf488cef405f6e209da4e8403c0e52e45b89b1d878420082b", size = 3064500, upload-time = "2025-04-17T21:17:24.272Z" }, | |
| { url = "https://files.pythonhosted.org/packages/ee/9f/569976c299ec40347588b40e61e8bcd065d958741a63159a4d77ca7859df/confluent_kafka-2.10.0-cp313-cp313-macosx_13_0_x86_64.whl", hash = "sha256:514c4a4dcb522aacdc17fa3e2751ae8670553850c9e22844ac997ec6459b3a48", size = 3540404, upload-time = "2025-04-17T21:17:32.051Z" }, | |
| { url = "https://files.pythonhosted.org/packages/48/ce/b44b68ac417cf8602c5dee1115fb27ac174585039f58527ab07584ae1ce9/confluent_kafka-2.10.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:bf706b1ebb540276791b5c42ac624c93195c34782848b24b2da1467caeef479b", size = 15260481, upload-time = "2025-04-17T21:17:38.697Z" }, | |
| { url = "https://files.pythonhosted.org/packages/ba/2e/bd4edabda58dbe94c235b587205ac222633abd6da9c6677832071f85b66c/confluent_kafka-2.10.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:6b8b057d436d82c1b2d819915d03e2b446bfa55403acd440f9222509ba2147de", size = 3852813, upload-time = "2025-04-17T21:17:43.045Z" }, | |
| { url = "https://files.pythonhosted.org/packages/84/5f/5d68af39ed6f1cdbcc49dd412436894303cc03bb851e3f540a9c23d384ac/confluent_kafka-2.10.0-cp313-cp313-win_amd64.whl", hash = "sha256:649ccde18b4f2ff509a16093a06ca13629e4f4b3146d4ef5a82805c869cf8cbd", size = 4037452, upload-time = "2025-04-17T21:17:45.545Z" }, | |
| ] | |
| [[package]] | |
| name = "kafka-replay" | |
| version = "0.1.0" | |
| source = { virtual = "." } | |
| dependencies = [ | |
| { name = "confluent-kafka" }, | |
| { name = "python-dotenv" }, | |
| ] | |
| [package.metadata] | |
| requires-dist = [ | |
| { name = "confluent-kafka", specifier = ">=2.10.0" }, | |
| { name = "python-dotenv", specifier = ">=1.1.0" }, | |
| ] | |
| [[package]] | |
| name = "python-dotenv" | |
| version = "1.1.0" | |
| source = { registry = "https://pypi.org/simple" } | |
| sdist = { url = "https://files.pythonhosted.org/packages/88/2c/7bb1416c5620485aa793f2de31d3df393d3686aa8a8506d11e10e13c5baf/python_dotenv-1.1.0.tar.gz", hash = "sha256:41f90bc6f5f177fb41f53e87666db362025010eb28f60a01c9143bfa33a2b2d5", size = 39920, upload-time = "2025-03-25T10:14:56.835Z" } | |
| wheels = [ | |
| { url = "https://files.pythonhosted.org/packages/1e/18/98a99ad95133c6a6e2005fe89faedf294a748bd5dc803008059409ac9b1e/python_dotenv-1.1.0-py3-none-any.whl", hash = "sha256:d7c01d9e2293916c18baf562d95698754b0dbbb5e74d457c45d4f6561fb9d55d", size = 20256, upload-time = "2025-03-25T10:14:55.034Z" }, | |
| ] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment