Created
April 20, 2020 03:18
-
-
Save benesch/ce41b5f57f751972314830aef2486a79 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/src/coord/timestamp.rs b/src/coord/timestamp.rs | |
| index e598528b..90a74aaf 100644 | |
| --- a/src/coord/timestamp.rs | |
| +++ b/src/coord/timestamp.rs | |
| @@ -20,7 +20,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; | |
| use avro::schema::Schema; | |
| use avro::types::Value; | |
| - | |
| use lazy_static::lazy_static; | |
| use log::{error, info}; | |
| use rdkafka::consumer::{BaseConsumer, Consumer}; | |
| @@ -191,7 +190,7 @@ struct ByoTimestampConsumer { | |
| // Source Connector | |
| connector: ByoTimestampConnector, | |
| // The name of the source with which this connector is associated | |
| - source_name: PathBuf, | |
| + source_name: String, | |
| // The format of the connector | |
| envelope: ConsistencyFormatting, | |
| // The last timestamp assigned per partition | |
| @@ -371,7 +370,7 @@ fn byo_extract_update_from_bytes( | |
| continue; | |
| } | |
| }; | |
| - if PathBuf::from(topic_name.trim()) == consumer.source_name { | |
| + if topic_name.trim() == consumer.source_name { | |
| updates.push((partition_count, partition, ts, offset)) | |
| } | |
| } | |
| @@ -726,7 +725,7 @@ impl Timestamper { | |
| } | |
| } | |
| Consistency::BringYourOwn(consistency_topic) => { | |
| - info!("Timestamping Source {} with BYO Consistency.", id); | |
| + info!("Timestamping Source {} with BYO Consistency. Consistency Source: {}", id, consistency_topic); | |
| let consumer = | |
| self.create_byo_connector(id, sc, enc, env, consistency_topic); | |
| if let Some(consumer) = consumer { | |
| @@ -852,7 +851,7 @@ impl Timestamper { | |
| } | |
| }; | |
| for (topic, count) in results { | |
| - if byo_consumer.source_name == PathBuf::from(topic.trim()) { | |
| + if byo_consumer.source_name == topic.trim() { | |
| // TODO(natacha): consistency topic for Debezium currently supports only one partition | |
| byo_consumer.last_offset += count; | |
| byo_consumer.last_ts += 1; | |
| @@ -893,7 +892,7 @@ impl Timestamper { | |
| panic!("Incorrect Avro Format. This should never happen"); | |
| }; | |
| for (topic, count) in results { | |
| - if byo_consumer.source_name == PathBuf::from(topic.trim()) { | |
| + if byo_consumer.source_name == topic.trim() { | |
| // TODO(natacha): consistency topic for Debezium currently supports only one partition | |
| byo_consumer.last_offset += count; | |
| byo_consumer.last_ts += 1; | |
| @@ -945,7 +944,7 @@ impl Timestamper { | |
| continue; | |
| } | |
| }; | |
| - if PathBuf::from(topic) == byo_consumer.source_name { | |
| + if topic == byo_consumer.source_name { | |
| if is_ts_valid(byo_consumer, partition_count, &partition, timestamp) { | |
| match byo_consumer.connector { | |
| ByoTimestampConnector::Kafka(_) | |
| @@ -1019,7 +1018,7 @@ impl Timestamper { | |
| error!("Incorrect Avro format. Expected Record"); | |
| continue; | |
| }; | |
| - if PathBuf::from(topic) == byo_consumer.source_name { | |
| + if topic == byo_consumer.source_name { | |
| if is_ts_valid(byo_consumer, partition_count, &partition, timestamp) { | |
| match byo_consumer.connector { | |
| ByoTimestampConnector::Ocf(_) => { | |
| @@ -1151,7 +1150,7 @@ impl Timestamper { | |
| &self, | |
| _id: SourceInstanceId, | |
| fc: &FileSourceConnector, | |
| - timestamp_topic: PathBuf, | |
| + timestamp_topic: String, | |
| ) -> Option<ByoFileConnector<std::vec::Vec<u8>>> { | |
| let ctor = |fi| Ok(std::io::BufReader::new(fi).split(b'\n')); | |
| let (tx, rx) = std::sync::mpsc::sync_channel(self.max_increment_size as usize); | |
| @@ -1161,7 +1160,7 @@ impl Timestamper { | |
| FileReadStyle::ReadOnce | |
| }; | |
| std::thread::spawn(move || { | |
| - read_file_task(timestamp_topic, tx, None, tail, ctor); | |
| + read_file_task(PathBuf::from(timestamp_topic), tx, None, tail, ctor); | |
| }); | |
| Some(ByoFileConnector { stream: rx }) | |
| @@ -1267,7 +1266,7 @@ impl Timestamper { | |
| &self, | |
| _id: SourceInstanceId, | |
| fc: &FileSourceConnector, | |
| - timestamp_topic: PathBuf, | |
| + timestamp_topic: String, | |
| ) -> Option<ByoFileConnector<avro::types::Value>> { | |
| let ctor = move |file| avro::Reader::new(file); | |
| let tail = if fc.tail { | |
| @@ -1277,7 +1276,7 @@ impl Timestamper { | |
| }; | |
| let (tx, rx) = std::sync::mpsc::sync_channel(self.max_increment_size as usize); | |
| std::thread::spawn(move || { | |
| - read_file_task(timestamp_topic, tx, None, tail, ctor); | |
| + read_file_task(PathBuf::from(timestamp_topic), tx, None, tail, ctor); | |
| }); | |
| Some(ByoFileConnector { stream: rx }) | |
| @@ -1290,14 +1289,14 @@ impl Timestamper { | |
| sc: ExternalSourceConnector, | |
| enc: DataEncoding, | |
| env: Envelope, | |
| - timestamp_topic: PathBuf, | |
| + timestamp_topic: String, | |
| ) -> Option<ByoTimestampConsumer> { | |
| match sc { | |
| ExternalSourceConnector::Kafka(kc) => { | |
| let topic = kc.topic.clone(); | |
| match self.create_byo_kafka_connector(id, &kc, timestamp_topic) { | |
| Some(connector) => Some(ByoTimestampConsumer { | |
| - source_name: PathBuf::from(topic), | |
| + source_name: topic, | |
| connector: ByoTimestampConnector::Kafka(connector), | |
| envelope: identify_consistency_format(enc, env), | |
| last_partition_ts: HashMap::new(), | |
| @@ -1311,7 +1310,7 @@ impl Timestamper { | |
| ExternalSourceConnector::File(fc) => { | |
| match self.create_byo_file_connector(id, &fc, timestamp_topic) { | |
| Some(consumer) => Some(ByoTimestampConsumer { | |
| - source_name: fc.path, | |
| + source_name: fc.path.to_string_lossy().into_owned(), | |
| connector: ByoTimestampConnector::File(consumer), | |
| envelope: identify_consistency_format(enc, env), | |
| last_partition_ts: HashMap::new(), | |
| @@ -1325,7 +1324,7 @@ impl Timestamper { | |
| ExternalSourceConnector::AvroOcf(fc) => { | |
| match self.create_byo_ocf_connector(id, &fc, timestamp_topic) { | |
| Some(consumer) => Some(ByoTimestampConsumer { | |
| - source_name: fc.path, | |
| + source_name: fc.path.to_string_lossy().into_owned(), | |
| connector: ByoTimestampConnector::Ocf(consumer), | |
| envelope: identify_consistency_format(enc, env), | |
| last_partition_ts: HashMap::new(), | |
| @@ -1339,7 +1338,7 @@ impl Timestamper { | |
| ExternalSourceConnector::Kinesis(kinc) => { | |
| match self.create_byo_kinesis_connector(id, &kinc, timestamp_topic) { | |
| Some(consumer) => Some(ByoTimestampConsumer { | |
| - source_name: PathBuf::from(kinc.stream_name), | |
| + source_name: kinc.stream_name, | |
| connector: ByoTimestampConnector::Kinesis(consumer), | |
| envelope: identify_consistency_format(enc, env), | |
| last_partition_ts: HashMap::new(), | |
| @@ -1357,7 +1356,7 @@ impl Timestamper { | |
| &self, | |
| _id: SourceInstanceId, | |
| _kinc: &KinesisSourceConnector, | |
| - _timestamp_topic: PathBuf, | |
| + _timestamp_topic: String, | |
| ) -> Option<ByoKinesisConnector> { | |
| unimplemented!(); | |
| } | |
| @@ -1366,18 +1365,13 @@ impl Timestamper { | |
| &self, | |
| id: SourceInstanceId, | |
| kc: &KafkaSourceConnector, | |
| - timestamp_topic: PathBuf, | |
| + timestamp_topic: String, | |
| ) -> Option<ByoKafkaConnector> { | |
| - let timestamp = String::from( | |
| - timestamp_topic | |
| - .to_str() | |
| - .expect("Converting the consistency topic to a string failed"), | |
| - ); | |
| let mut config = ClientConfig::new(); | |
| config | |
| .set( | |
| "group.id", | |
| - &format!("materialize-byo-{}-{}", ×tamp, id), | |
| + &format!("materialize-byo-{}-{}", ×tamp_topic, id), | |
| ) | |
| .set("enable.auto.commit", "false") | |
| .set("enable.partition.eof", "false") | |
| @@ -1394,9 +1388,9 @@ impl Timestamper { | |
| match config.create() { | |
| Ok(consumer) => { | |
| let consumer = ByoKafkaConnector { consumer }; | |
| - consumer.consumer.subscribe(&[×tamp]).unwrap(); | |
| + consumer.consumer.subscribe(&[×tamp_topic]).unwrap(); | |
| - let partitions = get_kafka_partitions(&consumer.consumer, ×tamp); | |
| + let partitions = get_kafka_partitions(&consumer.consumer, ×tamp_topic); | |
| if partitions.len() != 1 { | |
| error!( | |
| "Consistency topic should contain a single partition. Contains {}", | |
| diff --git a/src/dataflow-types/types.rs b/src/dataflow-types/types.rs | |
| index 1555e69e..d8b7c80c 100644 | |
| --- a/src/dataflow-types/types.rs | |
| +++ b/src/dataflow-types/types.rs | |
| @@ -458,7 +458,7 @@ impl ExternalSourceConnector { | |
| #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] | |
| pub enum Consistency { | |
| - BringYourOwn(PathBuf), | |
| + BringYourOwn(String), | |
| RealTime, | |
| } | |
| diff --git a/src/sql/statement.rs b/src/sql/statement.rs | |
| index d4be9592..24b82f5d 100644 | |
| --- a/src/sql/statement.rs | |
| +++ b/src/sql/statement.rs | |
| @@ -1136,9 +1136,7 @@ fn handle_create_source(scx: &StatementContext, stmt: Statement) -> Result<Plan, | |
| consistency = match with_options.remove("consistency") { | |
| None => Consistency::RealTime, | |
| - Some(Value::SingleQuotedString(topic)) => { | |
| - Consistency::BringYourOwn(topic.into()) | |
| - } | |
| + Some(Value::SingleQuotedString(topic)) => Consistency::BringYourOwn(topic), | |
| Some(_) => bail!("consistency must be a string"), | |
| }; | |
| @@ -1245,15 +1243,6 @@ fn handle_create_source(scx: &StatementContext, stmt: Statement) -> Result<Plan, | |
| Some(Value::Boolean(b)) => b, | |
| Some(_) => bail!("tail must be a boolean"), | |
| }; | |
| - | |
| - consistency = match with_options.remove("consistency") { | |
| - None => Consistency::RealTime, | |
| - Some(Value::SingleQuotedString(topic)) => { | |
| - Consistency::BringYourOwn(topic.into()) | |
| - } | |
| - Some(_) => bail!("consistency must be a string"), | |
| - }; | |
| - | |
| let connector = ExternalSourceConnector::File(FileSourceConnector { | |
| path: path.clone().into(), | |
| tail, | |
| @@ -1267,15 +1256,6 @@ fn handle_create_source(scx: &StatementContext, stmt: Statement) -> Result<Plan, | |
| Some(Value::Boolean(b)) => b, | |
| Some(_) => bail!("tail must be a boolean"), | |
| }; | |
| - | |
| - consistency = match with_options.remove("consistency") { | |
| - None => Consistency::RealTime, | |
| - Some(Value::SingleQuotedString(topic)) => { | |
| - Consistency::BringYourOwn(topic.into()) | |
| - } | |
| - Some(_) => bail!("consistency must be a string"), | |
| - }; | |
| - | |
| let connector = ExternalSourceConnector::AvroOcf(FileSourceConnector { | |
| path: path.clone().into(), | |
| tail, |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment