Skip to content

Instantly share code, notes, and snippets.

@emctoo
Last active May 25, 2025 01:20
Show Gist options
  • Select an option

  • Save emctoo/98078b06b3b31bcc0845d5b0b9202f8f to your computer and use it in GitHub Desktop.

Select an option

Save emctoo/98078b06b3b31bcc0845d5b0b9202f8f to your computer and use it in GitHub Desktop.
Kafka Replay
# Kafka connection settings
BOOTSTRAP_SERVERS=localhost:9092
# Topic to replay messages from (required)
topic=your-kafka-topic
# Time range settings (timestamp format - default approach)
START_TIME=2025-05-16T00:00:01
END_TIME=2025-05-16T23:59:59
# Alternative time range settings (date format - requires --use-date-range flag)
START_DATE=2025-05-16
END_DATE=2025-05-16
# Optional filtering
SYMBOLS=AAPL,MSFT,GOOG
# Logging settings
LOG_LEVEL=INFO
.venv/
.env
.envrc
[project]
name = "kafka-replay"
version = "0.1.0"
description = "Replay Kafka historical data"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"confluent-kafka>=2.10.0",
"python-dotenv>=1.1.0",
]
#!/usr/bin/env python
# coding: utf8
import os
import argparse
import json
import logging
import time
import uuid
from datetime import datetime
from typing import Optional
import dotenv
from confluent_kafka import Consumer, TopicPartition
dotenv.load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(message)s')
log = logging.getLogger('replay')
kafka_conn_logger = logging.getLogger('confluent_kafka')
kafka_conn_logger.propagate = False
kafka_conn_logger.setLevel(logging.WARNING)
class KafkaReplayer:
"""
A utility class for replaying Kafka messages within a specific time range.
This class allows retrieving historical data from Kafka topics by searching
for messages within a given time range. It uses binary search to efficiently
locate the starting offsets for each partition.
"""
def __init__(self, topic_name: str, partitions=None, **configs):
"""
Initialize a KafkaReplayer instance.
Args:
topic_name: The Kafka topic to replay messages from
partitions: Optional list of specific partitions to replay from
**configs: Configuration options for the Kafka consumer
Raises:
ValueError: If topic_name is not provided
"""
if not topic_name:
raise ValueError('topic_name is required')
self._topic_name = topic_name
self._partitions = partitions
self._configs = self._configs_with_defaults(configs)
self._log_interval = 10000
self._logger = self._create_logger()
def _create_logger(self):
"""
Create and configure a logger instance for this class.
Returns:
A configured logger instance
"""
self._logger = logging.getLogger(__name__)
null_handler = logging.NullHandler()
null_handler.setLevel(logging.DEBUG)
self._logger.addHandler(null_handler)
return self._logger
@staticmethod
def _configs_with_defaults(configs):
"""
Convert and normalize Kafka configuration parameters.
This method converts configuration parameters from kafka-python style
to confluent_kafka style and adds sensible defaults.
Args:
configs: Dictionary of configuration parameters
Returns:
Dictionary of normalized confluent_kafka configuration parameters
"""
conf = {}
# Handle bootstrap_servers specially
if 'bootstrap_servers' in configs:
if isinstance(configs['bootstrap_servers'], list):
conf['bootstrap.servers'] = ','.join(configs['bootstrap_servers'])
else:
conf['bootstrap.servers'] = configs['bootstrap_servers']
# Add unique group id if not specified
if 'group.id' not in configs and 'group_id' not in configs:
conf['group.id'] = str(uuid.uuid4())
elif 'group_id' in configs:
conf['group.id'] = configs['group_id']
# Add consumer timeout
if 'consumer_timeout_ms' in configs:
conf['session.timeout.ms'] = configs['consumer_timeout_ms']
else:
conf['session.timeout.ms'] = 10000
# Add any other configs as-is with appropriate format conversion
for key, value in configs.items():
if key not in ('bootstrap_servers', 'group_id', 'consumer_timeout_ms'):
# Convert snake_case to dot.case
conf_key = key.replace('_', '.')
conf[conf_key] = value
return conf
@staticmethod
def _get_time_millis():
return int(round(time.time() * 1000))
def _create_consumer(self) -> Consumer:
return Consumer(self._configs)
def _find_seek_points(self, start_time: int) -> dict:
"""
Find the initial seek points for each partition based on the start time.
This method performs a binary search on each partition to find the offset
of the message that is closest to the given start time but not earlier.
Args:
start_time: The start timestamp in milliseconds
Returns:
A dictionary mapping each TopicPartition to its corresponding seek offset
"""
seek_points = {}
consumer: Consumer = self._create_consumer()
try:
# Get metadata for the topic
cluster_metadata = consumer.list_topics(topic=self._topic_name)
topic_metadata = cluster_metadata.topics[self._topic_name]
all_partitions = [p.id for p in topic_metadata.partitions.values()]
# Filter partitions if needed
partitions = self._partitions if self._partitions else all_partitions
topic_partitions = [TopicPartition(self._topic_name, p) for p in partitions]
for topic_partition in topic_partitions:
# Binary search to find the appropriate offset for each partition
consumer.assign([topic_partition])
# Get beginning and end offsets
low_offsets, high_offsets = consumer.get_watermark_offsets(topic_partition)
start_offset = low_offsets
end_offset = high_offsets
target_offset = self._binary_search(consumer, topic_partition, start_offset, end_offset, start_time)
seek_points[topic_partition] = target_offset
self._logger.debug(f'Start offset for {topic_partition} is {target_offset}')
self._logger.info(f'Start offsets: {seek_points}')
return seek_points
finally:
consumer.close()
def _get_next_if_available(self, consumer):
return consumer.poll(1.0)
def _binary_search(self, consumer: Consumer, tp: TopicPartition, start: int, end: int, target_time: int) -> int:
"""
Perform a binary search to find the offset of the message that is closest to the target time but not earlier.
Args:
consumer: The Kafka consumer instance
tp: The TopicPartition to search within
start: The starting offset for the search
end: The ending offset for the search
target_time: The target timestamp in milliseconds
Returns:
The offset of the found message, or the start offset if no suitable message is found
"""
# Performance optimization: if the range is too large, use a faster approximation first
if end - start > 100000:
# Do a faster preliminary search with larger steps to narrow down the range
return self._accelerated_binary_search(consumer, tp, start, end, target_time)
# Standard binary search for smaller ranges
if start >= end - 1:
consumer.seek(tp, start)
msg = self._get_next_if_available(consumer)
if msg and msg.timestamp()[1] < target_time:
return start + 1
return start
insertion_point = int(start + ((end - start) / 2))
consumer.seek(tp, insertion_point)
msg = self._get_next_if_available(consumer)
if msg:
ts = msg.timestamp()[1] # timestamp() returns (timestamp_type, timestamp)
if ts < target_time:
return self._binary_search(consumer, tp, insertion_point + 1, end, target_time)
else:
return self._binary_search(consumer, tp, start, insertion_point, target_time)
# If we couldn't get a message, try the beginning
return start
def _accelerated_binary_search(self, consumer, tp, start: int, end: int, target_time: int) -> int:
"""
A faster binary search approximation for large offset ranges.
This method samples points in the range to quickly narrow down where
the target timestamp might be, then uses standard binary search
for more precision.
Args:
consumer: The Kafka consumer instance
tp: The TopicPartition to search within
start: The starting offset for the search
end: The ending offset for the search
target_time: The target timestamp in milliseconds
Returns:
The offset approximately matching the target time
"""
self._logger.debug(f"Using accelerated search for large range: {start} to {end}")
# Sample several points in the range to build a time-to-offset mapping
sample_points = 5
offsets = []
timestamps = []
# Include start and end points
check_offsets = [start]
# Add evenly spaced sample points
step = (end - start) // (sample_points + 1)
for i in range(1, sample_points + 1):
check_offsets.append(start + i * step)
# Check each offset and collect timestamps
for offset in check_offsets:
consumer.seek(tp, offset)
msg = self._get_next_if_available(consumer)
if msg:
ts = msg.timestamp()[1]
offsets.append(offset)
timestamps.append(ts)
if not timestamps:
# If we couldn't get any timestamps, fall back to standard search
return self._binary_search(consumer, tp, start, end, target_time)
# Find the closest points that bracket our target time
for i in range(len(timestamps) - 1):
if timestamps[i] <= target_time <= timestamps[i + 1]:
# Found bracketing timestamps, use standard binary search in this smaller range
return self._binary_search(consumer, tp, offsets[i], offsets[i + 1], target_time)
# If we didn't find bracketing points, check endpoints
if target_time <= timestamps[0]:
return start
elif target_time >= timestamps[-1]:
# Do a standard binary search from the last sample to the end
return self._binary_search(consumer, tp, offsets[-1], end, target_time)
# Fallback to standard binary search
return self._binary_search(consumer, tp, start, end, target_time)
def replay(self, start_time: int, end_time: int):
"""
Replay Kafka messages over the specified time range.
This method yields messages from the specified partitions that fall within
the given time range (inclusive).
Args:
start_time: The start timestamp in milliseconds
end_time: The end timestamp in milliseconds
Yields:
Kafka messages found within the given time range
Raises:
ValueError: If the specified start or end time is invalid
KafkaException: If there's an error communicating with Kafka
"""
# Validate input parameters
if start_time < 0:
raise ValueError('start_time must be non-negative')
if end_time < 0:
raise ValueError('end_time must be non-negative')
if start_time > self._get_time_millis():
raise ValueError('start_time must not be in the future')
if start_time > end_time:
raise ValueError('end_time must be at least start_time')
count = 0
last_timestamp = 0
start_time_iso = datetime.fromtimestamp(start_time/1000).isoformat()
end_time_iso = datetime.fromtimestamp(end_time/1000).isoformat()
self._logger.info(f'Starting replay from {start_time_iso} to {end_time_iso}')
# Find the starting offsets for each partition
seek_points = self._find_seek_points(start_time)
consumer = self._create_consumer()
try:
# Get the topic partitions
cluster_metadata = consumer.list_topics(topic=self._topic_name)
topic_metadata = cluster_metadata.topics[self._topic_name]
all_partitions = [p.id for p in topic_metadata.partitions.values()]
partitions = self._partitions if self._partitions else all_partitions
# Assign and seek to each partition
topic_partitions = [TopicPartition(self._topic_name, p) for p in partitions]
consumer.assign(topic_partitions)
for tp, offset in seek_points.items():
consumer.seek(tp, offset)
# Track partitions still active
active_partitions = set(p.id for p in topic_partitions)
message_counts_by_partition = {p: 0 for p in active_partitions}
# Set poll timeout based on active partitions
poll_timeout = 0.1 if len(active_partitions) > 10 else 1.0
last_log_time = time.time()
while active_partitions:
# Adjust poll timeout dynamically based on message flow
msg = consumer.poll(poll_timeout)
if not msg:
continue
if msg.error():
error_code = msg.error().code()
self._logger.error(f"Consumer error: {msg.error()}")
# Handle specific error codes if needed
from confluent_kafka import KafkaException, KafkaError
if error_code == KafkaError._PARTITION_EOF:
# End of partition, mark as complete
active_partitions.discard(msg.partition())
self._logger.debug(f'Reached end of partition {msg.partition()}')
elif error_code in (KafkaError._TRANSPORT, KafkaError._ALL_BROKERS_DOWN):
# Fatal error, re-raise
raise KafkaException(msg.error())
continue
timestamp_type, timestamp = msg.timestamp()
last_timestamp = timestamp
partition = msg.partition()
# Update message count for this partition
if partition in message_counts_by_partition:
message_counts_by_partition[partition] += 1
if timestamp > end_time:
# We've gone past the end time for this partition
active_partitions.discard(partition)
self._logger.debug(f'Completed partition {partition} with {message_counts_by_partition[partition]} messages')
elif start_time <= timestamp <= end_time:
# Message is within our time range
yield msg
count += 1
# Log progress periodically
current_time = time.time()
if count % self._log_interval == 0 or (current_time - last_log_time > 10):
last_log_time = current_time
progress_pct = 100.0 * (timestamp - start_time) / (end_time - start_time) if end_time > start_time else 100
self._logger.info(
f'Processed {count:,} messages, last timestamp: {datetime.fromtimestamp(timestamp/1000).isoformat()}, '
f'progress: {min(100, max(0, progress_pct)):.1f}%, active partitions: {len(active_partitions)}'
)
# If all partitions are done, exit
if not active_partitions:
self._logger.info('No more active partitions. Terminating.')
break
except Exception as e:
self._logger.error(f'Unexpected exception during replay: {str(e)}', exc_info=True)
raise
finally:
# Log summary statistics
elapsed = time.time() - (start_time/1000)
message_rate = count / elapsed if elapsed > 0 else 0
self._logger.info(
f'Replay complete. Processed {count:,} messages in {elapsed:.1f} seconds '
f'({message_rate:.1f} msgs/sec), last timestamp: {datetime.fromtimestamp(last_timestamp/1000).isoformat()}'
)
consumer.close()
def deserializer(text: Optional[bytes]):
return json.loads(text.decode('utf-8')) if text else None
def parse_date(s):
return datetime.strptime(s, '%Y-%m-%d')
def parse_datetime(s):
"""
Parse a datetime string in ISO format.
Accepts formats:
- YYYY-MM-DD (interpreted as YYYY-MM-DD 00:00:00)
- YYYY-MM-DDThh:mm:ss
- YYYY-MM-DD hh:mm:ss
"""
if 'T' in s:
return datetime.fromisoformat(s)
elif ' ' in s and len(s) > 10:
# Format with space as separator
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S')
else:
# Just a date
return datetime.strptime(s, '%Y-%m-%d')
def replay_raw_to_sqlite(replayer, start_dt: datetime, end_dt: datetime, soi=None, db_path: Optional[str] = None,
dt_fmt='%Y-%m-%d %H:%M:%S', filename_fmt='%Y%m%d', batch_size=10000):
"""
Replay raw data and store it in SQLite database.
Args:
replayer: The KafkaReplayer instance
start_dt: Start datetime
end_dt: End datetime
soi: Set of symbols of interest (in bytes format)
db_path: Optional path to SQLite database file
dt_fmt: Datetime format
filename_fmt: Filename format for default database name
batch_size: Number of records to insert in a single database transaction
Returns:
Path to the created SQLite database
"""
import sqlite3
from pathlib import Path
start, stop = int(start_dt.timestamp() * 1000), int(end_dt.timestamp() * 1000)
db_path = db_path or str(Path.cwd() / f"{start_dt.strftime(filename_fmt)}.db")
db_dir = Path(db_path).parent
if not db_dir.exists():
db_dir.mkdir(parents=True, exist_ok=True)
log.info(f"Replaying data from {start_dt.isoformat()} to {end_dt.isoformat()}")
log.info(f"Storing results in {db_path}")
if soi:
log.info(f"Filtering for {len(soi)} symbols")
# Create connection and table
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Enable WAL mode for better performance
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA synchronous=NORMAL")
cursor.execute("PRAGMA temp_store=MEMORY")
# Create table with appropriate columns
cursor.execute('''
CREATE TABLE IF NOT EXISTS price_data (
record_offset INTEGER,
record_timestamp REAL,
payload_timestamp TEXT,
price_date TEXT,
price_time TEXT,
sequence_number INTEGER,
market_status TEXT,
OSID INTEGER,
symbol TEXT,
high REAL,
low REAL,
last REAL,
open REAL,
volume INTEGER,
price_time_original TEXT,
sec INTEGER,
nano_sec INTEGER
)
''')
# Create index on symbol for faster queries
cursor.execute('CREATE INDEX IF NOT EXISTS idx_symbol ON price_data (symbol)')
# Create index on timestamp for faster time-based queries
cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON price_data (record_timestamp)')
try:
batch = []
total_records = 0
start_time = time.time()
last_log_time = start_time
for i, record in enumerate(replayer.replay(start, stop)):
value = record.value()
if value is None:
continue
try:
payload, price_dt, ts_second, ts_nano = value.rsplit(b',', 3)
# Parse payload data
payload_parts = payload.split(b',')
if len(payload_parts) < 4: # Ensure we have at least enough parts to get the symbol
continue
symbol = payload_parts[3].decode()
# Check if symbol is in our set of interest
if soi and symbol.encode() not in soi:
continue
# With confluent_kafka, record.timestamp() returns (timestamp_type, timestamp)
_, record_timestamp = record.timestamp()
record_timestamp = record_timestamp / 1000 # milliseconds to seconds
payload_timestamp = f"{ts_second.decode()}.{ts_nano.decode():0>9}"
price_date = price_dt[:8].decode()
price_time = price_dt[8:].decode()
# Prepare data for insertion
row = (
record.offset(),
record_timestamp,
payload_timestamp,
price_date,
price_time,
*[p.decode() if isinstance(p, bytes) else p for p in payload_parts],
ts_second.decode(),
ts_nano.decode()
)
batch.append(row)
# Execute batch insert when batch size is reached
if len(batch) >= batch_size:
cursor.executemany('''
INSERT INTO price_data VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', batch)
conn.commit()
total_records += len(batch)
batch = []
current_time = time.time()
if current_time - last_log_time >= 5: # Log every 5 seconds
last_log_time = current_time
elapsed = current_time - start_time
rate = total_records / elapsed if elapsed > 0 else 0
log.info(
f'Progress: {i} msgs processed, {total_records:,} records stored, '
f'{rate:.1f} records/sec, last timestamp: {datetime.fromtimestamp(record_timestamp).isoformat()}'
)
except Exception as e:
log.error(f"Error processing record: {e}", exc_info=True)
continue
# Insert any remaining records
if batch:
cursor.executemany('''
INSERT INTO price_data VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', batch)
conn.commit()
total_records += len(batch)
# Create some useful views
cursor.execute('''
CREATE VIEW IF NOT EXISTS latest_prices AS
SELECT symbol, last, high, low, open, volume, price_date, price_time
FROM price_data
WHERE (symbol, record_timestamp) IN (
SELECT symbol, MAX(record_timestamp)
FROM price_data
GROUP BY symbol
)
''')
# Add daily summary view
cursor.execute('''
CREATE VIEW IF NOT EXISTS daily_summary AS
SELECT
symbol,
price_date,
MIN(low) as daily_low,
MAX(high) as daily_high,
SUM(volume) as daily_volume,
(SELECT open FROM price_data p2
WHERE p2.symbol = p1.symbol AND p2.price_date = p1.price_date
ORDER BY record_timestamp ASC LIMIT 1) as daily_open,
(SELECT last FROM price_data p2
WHERE p2.symbol = p1.symbol AND p2.price_date = p1.price_date
ORDER BY record_timestamp DESC LIMIT 1) as daily_close
FROM price_data p1
GROUP BY symbol, price_date
''')
# Create statistics table
cursor.execute('''
CREATE TABLE IF NOT EXISTS replay_stats (
start_time TEXT,
end_time TEXT,
total_records INTEGER,
symbols_count INTEGER,
processing_duration_seconds REAL
)
''')
# Record statistics
elapsed_time = time.time() - start_time
cursor.execute(
"INSERT INTO replay_stats VALUES (?, ?, ?, ?, ?)",
(
start_dt.isoformat(),
end_dt.isoformat(),
total_records,
len(soi) if soi else 0,
elapsed_time
)
)
conn.commit()
# Log final statistics
log.info(f"Total records saved to database: {total_records:,}")
log.info(f"Processing time: {elapsed_time:.2f} seconds")
log.info(f"Processing rate: {total_records / elapsed_time:.1f} records/sec")
log.info(f"Database location: {db_path}")
return db_path
finally:
# Switch back to delete mode which is better for reading
cursor.execute("PRAGMA journal_mode=DELETE")
conn.close()
def main():
parser = argparse.ArgumentParser(description='Replay messages from Kafka topic by time range')
parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap_servers',
type=str, default=os.getenv('BOOTSTRAP_SERVERS', 'localhost:9092'),
help='Kafka bootstrap servers (comma-separated)')
parser.add_argument('-t', '--topic', dest='topic', required=True if not os.getenv('TOPIC') else False,
default=os.getenv('TOPIC'), help='Kafka topic to replay')
parser.add_argument('--symbols', type=str, default=os.getenv('SYMBOLS'),
help='Comma-separated list of symbols to filter')
# Time selection arguments
time_group = parser.add_argument_group('time range selection')
time_group.add_argument('--start-time', type=str, default=os.getenv('START_TIME'),
help='Start timestamp in ISO format: YYYY-MM-DDThh:mm:ss (default)')
time_group.add_argument('--end-time', type=str, default=os.getenv('END_TIME'),
help='End timestamp in ISO format: YYYY-MM-DDThh:mm:ss (default)')
time_group.add_argument('--use-date-range', dest='use_date_range', action='store_true',
help='Use date range (YYYY-MM-DD) instead of precise timestamps')
time_group.add_argument('--start-date', type=parse_date, default=os.getenv('START_DATE'),
help='Start date in YYYY-MM-DD format (when using --use-date-range)')
time_group.add_argument('--end-date', type=parse_date, default=os.getenv('END_DATE'),
help='End date in YYYY-MM-DD format (when using --use-date-range)')
parser.add_argument('--db-path', type=str, default=None,
help='Path to output SQLite database. Default: current directory with date as filename')
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
default='INFO', help='Set logging level')
parser.add_argument('--batch-size', type=int, default=10000,
help='Batch size for database inserts')
parser.add_argument('--partitions', type=str,
help='Comma-separated list of partitions to replay. Default: all partitions')
args = parser.parse_args()
# Set log level
log.setLevel(getattr(logging, args.log_level))
# Validate arguments
if not args.topic:
parser.error("Topic is required. Specify with -t/--topic or set 'TOPIC' environment variable.")
# Initialize datetime range
start_date_time = None
end_date_time = None
# Process time range arguments
if args.use_date_range:
if not args.start_date:
parser.error("Start date is required with --use-date-range. Specify with --start-date or set 'START_DATE' environment variable.")
if not args.end_date:
parser.error("End date is required with --use-date-range. Specify with --end-date or set 'END_DATE' environment variable.")
# Ensure end date is not before start date
if args.end_date < args.start_date:
parser.error(f"End date {args.end_date} is before start date {args.start_date}")
# Adjust datetime range to include full days
start_date_time = datetime.combine(args.start_date, datetime.min.time())
end_date_time = datetime.combine(args.end_date, datetime.max.time())
else:
# Default: Use timestamp range
start_time = args.start_time or os.getenv('START_TIME')
end_time = args.end_time or os.getenv('END_TIME')
if not start_time:
parser.error("Start timestamp is required. Specify with --start-time or set 'START_TIME' environment variable.")
if not end_time:
parser.error("End timestamp is required. Specify with --end-time or set 'END_TIME' environment variable.")
try:
start_date_time = parse_datetime(start_time)
except ValueError as e:
parser.error(f"Invalid start timestamp format: {e}. Use ISO format YYYY-MM-DDThh:mm:ss")
try:
end_date_time = parse_datetime(end_time)
except ValueError as e:
parser.error(f"Invalid end timestamp format: {e}. Use ISO format YYYY-MM-DDThh:mm:ss")
# Ensure end time is not before start time
if end_date_time < start_date_time:
parser.error(f"End time {end_time} is before start time {start_time}")
# Process other arguments
topic = args.topic
bootstrap_servers = args.bootstrap_servers
# Handle partitions
partitions = None
if args.partitions:
try:
partitions = [int(p.strip()) for p in args.partitions.split(',')]
log.info(f"Replaying from partitions: {partitions}")
except ValueError:
parser.error("Partitions must be comma-separated integers")
# Create replayer
try:
replayer = KafkaReplayer(topic, partitions=partitions, **{'bootstrap.servers': bootstrap_servers})
# Process symbols
soi = None
if args.symbols:
symbols = [s.strip() for s in args.symbols.split(',')]
soi = set(s.upper().encode() for s in symbols)
log.info(f"Filtering for symbols: {', '.join(symbols)}")
# Run replay
db_path = replay_raw_to_sqlite(
replayer,
start_date_time,
end_date_time,
soi=soi,
db_path=args.db_path,
batch_size=args.batch_size
)
log.info(f"Data saved to SQLite database: {db_path}")
except KeyboardInterrupt:
log.info("Process interrupted by user. Exiting.")
return 1
except Exception as e:
log.error(f"Error during replay: {str(e)}", exc_info=True)
return 1
return 0
if __name__ == '__main__':
main()
version = 1
revision = 2
requires-python = ">=3.12"
[[package]]
name = "confluent-kafka"
version = "2.10.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a0/c5/22087627478d2cc97b864dd1774c1e2d4007acc22b8f78aec5a7a41f6436/confluent_kafka-2.10.0.tar.gz", hash = "sha256:30a346908f3ad49c4bc1cb5557e7a8ce484190f8633aa18f9b87b2620809ac13", size = 193775, upload-time = "2025-04-17T21:18:43.965Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c1/fa/00398f8906da8fbefd8732501cffc5910b1a55af03a292b6720c19258c97/confluent_kafka-2.10.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:2268a607d374325e3a8fc779d8639de4344ce4d1ad2b88e8f9d13d97baae4e71", size = 3537479, upload-time = "2025-04-17T21:17:02.417Z" },
{ url = "https://files.pythonhosted.org/packages/03/7b/f940f1cd8c25d61cecc5f6ca044871f90370a80bd0d0a6595852151e4669/confluent_kafka-2.10.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a89e9ba5c8e18891b3bbf333001a9e12af43bd5bc3c3353065109582f3f078fc", size = 3060467, upload-time = "2025-04-17T21:17:05.342Z" },
{ url = "https://files.pythonhosted.org/packages/8a/f9/a30b62c78178be818a30fab23e826d3c5cdf60f53e2ebb31fa3e5fdded00/confluent_kafka-2.10.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:7252fee8d7287bae71f1a506967b3782e67ba5b699c4d095d16fabce92a63cfd", size = 15260056, upload-time = "2025-04-17T21:17:10.918Z" },
{ url = "https://files.pythonhosted.org/packages/6e/00/b334804360b6490f5dfa910432ad9dbba7f91baa76168299458d03c50e40/confluent_kafka-2.10.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:ddb9400786eedd8a6c0cbcceaddb8329a2597f4c02dc72389e83859da47d22d6", size = 3852443, upload-time = "2025-04-17T21:17:16.996Z" },
{ url = "https://files.pythonhosted.org/packages/c1/21/8c2b37cdc5ef60dee2ed9ac05dd89eaef822d4dd284b7d7e4678aa18aba6/confluent_kafka-2.10.0-cp312-cp312-win_amd64.whl", hash = "sha256:ef5aad8c04c54fe6ef89dc08a4ba24a80be90b05bbbc3348370c1c51222e0a5e", size = 3977821, upload-time = "2025-04-17T21:17:21.253Z" },
{ url = "https://files.pythonhosted.org/packages/e2/3c/db66f729b129e8e35af92571602f522091f753da8f42a7e217d5bed23ad6/confluent_kafka-2.10.0-cp313-cp313-macosx_13_0_arm64.whl", hash = "sha256:10230c3da4cd047cf488cef405f6e209da4e8403c0e52e45b89b1d878420082b", size = 3064500, upload-time = "2025-04-17T21:17:24.272Z" },
{ url = "https://files.pythonhosted.org/packages/ee/9f/569976c299ec40347588b40e61e8bcd065d958741a63159a4d77ca7859df/confluent_kafka-2.10.0-cp313-cp313-macosx_13_0_x86_64.whl", hash = "sha256:514c4a4dcb522aacdc17fa3e2751ae8670553850c9e22844ac997ec6459b3a48", size = 3540404, upload-time = "2025-04-17T21:17:32.051Z" },
{ url = "https://files.pythonhosted.org/packages/48/ce/b44b68ac417cf8602c5dee1115fb27ac174585039f58527ab07584ae1ce9/confluent_kafka-2.10.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:bf706b1ebb540276791b5c42ac624c93195c34782848b24b2da1467caeef479b", size = 15260481, upload-time = "2025-04-17T21:17:38.697Z" },
{ url = "https://files.pythonhosted.org/packages/ba/2e/bd4edabda58dbe94c235b587205ac222633abd6da9c6677832071f85b66c/confluent_kafka-2.10.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:6b8b057d436d82c1b2d819915d03e2b446bfa55403acd440f9222509ba2147de", size = 3852813, upload-time = "2025-04-17T21:17:43.045Z" },
{ url = "https://files.pythonhosted.org/packages/84/5f/5d68af39ed6f1cdbcc49dd412436894303cc03bb851e3f540a9c23d384ac/confluent_kafka-2.10.0-cp313-cp313-win_amd64.whl", hash = "sha256:649ccde18b4f2ff509a16093a06ca13629e4f4b3146d4ef5a82805c869cf8cbd", size = 4037452, upload-time = "2025-04-17T21:17:45.545Z" },
]
[[package]]
name = "kafka-replay"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "confluent-kafka" },
{ name = "python-dotenv" },
]
[package.metadata]
requires-dist = [
{ name = "confluent-kafka", specifier = ">=2.10.0" },
{ name = "python-dotenv", specifier = ">=1.1.0" },
]
[[package]]
name = "python-dotenv"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/88/2c/7bb1416c5620485aa793f2de31d3df393d3686aa8a8506d11e10e13c5baf/python_dotenv-1.1.0.tar.gz", hash = "sha256:41f90bc6f5f177fb41f53e87666db362025010eb28f60a01c9143bfa33a2b2d5", size = 39920, upload-time = "2025-03-25T10:14:56.835Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/18/98a99ad95133c6a6e2005fe89faedf294a748bd5dc803008059409ac9b1e/python_dotenv-1.1.0-py3-none-any.whl", hash = "sha256:d7c01d9e2293916c18baf562d95698754b0dbbb5e74d457c45d4f6561fb9d55d", size = 20256, upload-time = "2025-03-25T10:14:55.034Z" },
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment