Skip to content

Instantly share code, notes, and snippets.

@benesch
Created April 20, 2020 03:18
Show Gist options
  • Select an option

  • Save benesch/ce41b5f57f751972314830aef2486a79 to your computer and use it in GitHub Desktop.

Select an option

Save benesch/ce41b5f57f751972314830aef2486a79 to your computer and use it in GitHub Desktop.
diff --git a/src/coord/timestamp.rs b/src/coord/timestamp.rs
index e598528b..90a74aaf 100644
--- a/src/coord/timestamp.rs
+++ b/src/coord/timestamp.rs
@@ -20,7 +20,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use avro::schema::Schema;
use avro::types::Value;
-
use lazy_static::lazy_static;
use log::{error, info};
use rdkafka::consumer::{BaseConsumer, Consumer};
@@ -191,7 +190,7 @@ struct ByoTimestampConsumer {
// Source Connector
connector: ByoTimestampConnector,
// The name of the source with which this connector is associated
- source_name: PathBuf,
+ source_name: String,
// The format of the connector
envelope: ConsistencyFormatting,
// The last timestamp assigned per partition
@@ -371,7 +370,7 @@ fn byo_extract_update_from_bytes(
continue;
}
};
- if PathBuf::from(topic_name.trim()) == consumer.source_name {
+ if topic_name.trim() == consumer.source_name {
updates.push((partition_count, partition, ts, offset))
}
}
@@ -726,7 +725,7 @@ impl Timestamper {
}
}
Consistency::BringYourOwn(consistency_topic) => {
- info!("Timestamping Source {} with BYO Consistency.", id);
+ info!("Timestamping Source {} with BYO Consistency. Consistency Source: {}", id, consistency_topic);
let consumer =
self.create_byo_connector(id, sc, enc, env, consistency_topic);
if let Some(consumer) = consumer {
@@ -852,7 +851,7 @@ impl Timestamper {
}
};
for (topic, count) in results {
- if byo_consumer.source_name == PathBuf::from(topic.trim()) {
+ if byo_consumer.source_name == topic.trim() {
// TODO(natacha): consistency topic for Debezium currently supports only one partition
byo_consumer.last_offset += count;
byo_consumer.last_ts += 1;
@@ -893,7 +892,7 @@ impl Timestamper {
panic!("Incorrect Avro Format. This should never happen");
};
for (topic, count) in results {
- if byo_consumer.source_name == PathBuf::from(topic.trim()) {
+ if byo_consumer.source_name == topic.trim() {
// TODO(natacha): consistency topic for Debezium currently supports only one partition
byo_consumer.last_offset += count;
byo_consumer.last_ts += 1;
@@ -945,7 +944,7 @@ impl Timestamper {
continue;
}
};
- if PathBuf::from(topic) == byo_consumer.source_name {
+ if topic == byo_consumer.source_name {
if is_ts_valid(byo_consumer, partition_count, &partition, timestamp) {
match byo_consumer.connector {
ByoTimestampConnector::Kafka(_)
@@ -1019,7 +1018,7 @@ impl Timestamper {
error!("Incorrect Avro format. Expected Record");
continue;
};
- if PathBuf::from(topic) == byo_consumer.source_name {
+ if topic == byo_consumer.source_name {
if is_ts_valid(byo_consumer, partition_count, &partition, timestamp) {
match byo_consumer.connector {
ByoTimestampConnector::Ocf(_) => {
@@ -1151,7 +1150,7 @@ impl Timestamper {
&self,
_id: SourceInstanceId,
fc: &FileSourceConnector,
- timestamp_topic: PathBuf,
+ timestamp_topic: String,
) -> Option<ByoFileConnector<std::vec::Vec<u8>>> {
let ctor = |fi| Ok(std::io::BufReader::new(fi).split(b'\n'));
let (tx, rx) = std::sync::mpsc::sync_channel(self.max_increment_size as usize);
@@ -1161,7 +1160,7 @@ impl Timestamper {
FileReadStyle::ReadOnce
};
std::thread::spawn(move || {
- read_file_task(timestamp_topic, tx, None, tail, ctor);
+ read_file_task(PathBuf::from(timestamp_topic), tx, None, tail, ctor);
});
Some(ByoFileConnector { stream: rx })
@@ -1267,7 +1266,7 @@ impl Timestamper {
&self,
_id: SourceInstanceId,
fc: &FileSourceConnector,
- timestamp_topic: PathBuf,
+ timestamp_topic: String,
) -> Option<ByoFileConnector<avro::types::Value>> {
let ctor = move |file| avro::Reader::new(file);
let tail = if fc.tail {
@@ -1277,7 +1276,7 @@ impl Timestamper {
};
let (tx, rx) = std::sync::mpsc::sync_channel(self.max_increment_size as usize);
std::thread::spawn(move || {
- read_file_task(timestamp_topic, tx, None, tail, ctor);
+ read_file_task(PathBuf::from(timestamp_topic), tx, None, tail, ctor);
});
Some(ByoFileConnector { stream: rx })
@@ -1290,14 +1289,14 @@ impl Timestamper {
sc: ExternalSourceConnector,
enc: DataEncoding,
env: Envelope,
- timestamp_topic: PathBuf,
+ timestamp_topic: String,
) -> Option<ByoTimestampConsumer> {
match sc {
ExternalSourceConnector::Kafka(kc) => {
let topic = kc.topic.clone();
match self.create_byo_kafka_connector(id, &kc, timestamp_topic) {
Some(connector) => Some(ByoTimestampConsumer {
- source_name: PathBuf::from(topic),
+ source_name: topic,
connector: ByoTimestampConnector::Kafka(connector),
envelope: identify_consistency_format(enc, env),
last_partition_ts: HashMap::new(),
@@ -1311,7 +1310,7 @@ impl Timestamper {
ExternalSourceConnector::File(fc) => {
match self.create_byo_file_connector(id, &fc, timestamp_topic) {
Some(consumer) => Some(ByoTimestampConsumer {
- source_name: fc.path,
+ source_name: fc.path.to_string_lossy().into_owned(),
connector: ByoTimestampConnector::File(consumer),
envelope: identify_consistency_format(enc, env),
last_partition_ts: HashMap::new(),
@@ -1325,7 +1324,7 @@ impl Timestamper {
ExternalSourceConnector::AvroOcf(fc) => {
match self.create_byo_ocf_connector(id, &fc, timestamp_topic) {
Some(consumer) => Some(ByoTimestampConsumer {
- source_name: fc.path,
+ source_name: fc.path.to_string_lossy().into_owned(),
connector: ByoTimestampConnector::Ocf(consumer),
envelope: identify_consistency_format(enc, env),
last_partition_ts: HashMap::new(),
@@ -1339,7 +1338,7 @@ impl Timestamper {
ExternalSourceConnector::Kinesis(kinc) => {
match self.create_byo_kinesis_connector(id, &kinc, timestamp_topic) {
Some(consumer) => Some(ByoTimestampConsumer {
- source_name: PathBuf::from(kinc.stream_name),
+ source_name: kinc.stream_name,
connector: ByoTimestampConnector::Kinesis(consumer),
envelope: identify_consistency_format(enc, env),
last_partition_ts: HashMap::new(),
@@ -1357,7 +1356,7 @@ impl Timestamper {
&self,
_id: SourceInstanceId,
_kinc: &KinesisSourceConnector,
- _timestamp_topic: PathBuf,
+ _timestamp_topic: String,
) -> Option<ByoKinesisConnector> {
unimplemented!();
}
@@ -1366,18 +1365,13 @@ impl Timestamper {
&self,
id: SourceInstanceId,
kc: &KafkaSourceConnector,
- timestamp_topic: PathBuf,
+ timestamp_topic: String,
) -> Option<ByoKafkaConnector> {
- let timestamp = String::from(
- timestamp_topic
- .to_str()
- .expect("Converting the consistency topic to a string failed"),
- );
let mut config = ClientConfig::new();
config
.set(
"group.id",
- &format!("materialize-byo-{}-{}", &timestamp, id),
+ &format!("materialize-byo-{}-{}", &timestamp_topic, id),
)
.set("enable.auto.commit", "false")
.set("enable.partition.eof", "false")
@@ -1394,9 +1388,9 @@ impl Timestamper {
match config.create() {
Ok(consumer) => {
let consumer = ByoKafkaConnector { consumer };
- consumer.consumer.subscribe(&[&timestamp]).unwrap();
+ consumer.consumer.subscribe(&[&timestamp_topic]).unwrap();
- let partitions = get_kafka_partitions(&consumer.consumer, &timestamp);
+ let partitions = get_kafka_partitions(&consumer.consumer, &timestamp_topic);
if partitions.len() != 1 {
error!(
"Consistency topic should contain a single partition. Contains {}",
diff --git a/src/dataflow-types/types.rs b/src/dataflow-types/types.rs
index 1555e69e..d8b7c80c 100644
--- a/src/dataflow-types/types.rs
+++ b/src/dataflow-types/types.rs
@@ -458,7 +458,7 @@ impl ExternalSourceConnector {
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Consistency {
- BringYourOwn(PathBuf),
+ BringYourOwn(String),
RealTime,
}
diff --git a/src/sql/statement.rs b/src/sql/statement.rs
index d4be9592..24b82f5d 100644
--- a/src/sql/statement.rs
+++ b/src/sql/statement.rs
@@ -1136,9 +1136,7 @@ fn handle_create_source(scx: &StatementContext, stmt: Statement) -> Result<Plan,
consistency = match with_options.remove("consistency") {
None => Consistency::RealTime,
- Some(Value::SingleQuotedString(topic)) => {
- Consistency::BringYourOwn(topic.into())
- }
+ Some(Value::SingleQuotedString(topic)) => Consistency::BringYourOwn(topic),
Some(_) => bail!("consistency must be a string"),
};
@@ -1245,15 +1243,6 @@ fn handle_create_source(scx: &StatementContext, stmt: Statement) -> Result<Plan,
Some(Value::Boolean(b)) => b,
Some(_) => bail!("tail must be a boolean"),
};
-
- consistency = match with_options.remove("consistency") {
- None => Consistency::RealTime,
- Some(Value::SingleQuotedString(topic)) => {
- Consistency::BringYourOwn(topic.into())
- }
- Some(_) => bail!("consistency must be a string"),
- };
-
let connector = ExternalSourceConnector::File(FileSourceConnector {
path: path.clone().into(),
tail,
@@ -1267,15 +1256,6 @@ fn handle_create_source(scx: &StatementContext, stmt: Statement) -> Result<Plan,
Some(Value::Boolean(b)) => b,
Some(_) => bail!("tail must be a boolean"),
};
-
- consistency = match with_options.remove("consistency") {
- None => Consistency::RealTime,
- Some(Value::SingleQuotedString(topic)) => {
- Consistency::BringYourOwn(topic.into())
- }
- Some(_) => bail!("consistency must be a string"),
- };
-
let connector = ExternalSourceConnector::AvroOcf(FileSourceConnector {
path: path.clone().into(),
tail,
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment