Last active
September 8, 2022 08:41
-
-
Save OlegSchwann/07d4b136fd776e725dbe6a3bb06e30fa to your computer and use it in GitHub Desktop.
ClickHouse: Can window view read from kafka engine?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
package traffic_distribution; | |
message CampaignTraffic { | |
uint32 CampaignID = 1; | |
string RotatorName = 2; | |
int64 LastActionAt = 3; // timestamp | |
uint64 Count = 4; | |
} | |
message CampaignTrafficAggregated { | |
uint32 CampaignID = 1; | |
string RotatorName = 2; | |
int64 LastActionAt = 3; // timestamp | |
float Part = 5; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE DATABASE IF NOT EXISTS traffic_distribution; | |
CREATE TABLE traffic_distribution.campaign_traffic( | |
CampaignID UInt32, | |
RotatorName String, | |
LastActionAt DateTime, | |
Count UInt64 | |
) | |
-- ENGINE = MergeTree ORDER BY tuple(); -- for regular tables everything works | |
ENGINE = Kafka | |
SETTINGS | |
kafka_broker_list = 'localhost:9092', | |
kafka_topic_list = 'campaigns_traffic', | |
kafka_group_name = 'campaigns_traffic', -- TODO: read all from Kafka every time | |
format_schema='traffic_distribution.proto:CampaignTraffic', | |
kafka_format = 'Protobuf'; | |
CREATE TABLE traffic_distribution.campaign_traffic_aggregated( | |
CampaignID UInt32, | |
RotatorName String, | |
LastActionAt DateTime, | |
Part Float32 | |
) | |
-- ENGINE=MergeTree ORDER BY tuple(); -- for regular tables everything works | |
ENGINE = Kafka | |
SETTINGS | |
kafka_broker_list = 'localhost:9092', | |
kafka_topic_list = 'campaigns_traffic_aggregated', | |
kafka_group_name = 'campaigns_traffic', | |
format_schema='traffic_distribution.proto:CampaignTrafficAggregated', | |
kafka_format = 'Protobuf'; | |
CREATE WINDOW VIEW traffic_distribution.window_view | |
TO traffic_distribution.campaign_traffic_aggregated | |
WATERMARK=ASCENDING | |
AS | |
SELECT | |
CampaignID, | |
RotatorName, | |
tumbleEnd(wid) AS LastActionAt, | |
(sum(Count) as InOneRotator) / (sum(InOneRotator) over (partition by CampaignID) as InAllRotators) as Part | |
FROM traffic_distribution.campaign_traffic | |
GROUP BY | |
tumble(campaign_traffic.LastActionAt, INTERVAL '5' SECOND) AS wid, | |
CampaignID, | |
RotatorName | |
SETTINGS allow_experimental_window_view = 1; | |
INSERT INTO traffic_distribution.campaign_traffic VALUES | |
(1, 'us', now(), 1), | |
(1, 'us', now(), 2), | |
(1, 'gb', now(), 2), | |
(1, 'au', now(), 3), | |
(1, 'nz', now(), 4), | |
(1, 'ca', now(), 5), | |
(2, 'us', now(), 6), | |
(2, 'us', now(), 7), | |
(2, 'gb', now(), 7), | |
(2, 'au', now(), 8), | |
(2, 'nz', now(), 9), | |
(2, 'ca', now(), 10); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2022.09.08 08:39:33.820921 [ 261 ] {} <Error> void DB::StorageKafka::threadFunc(size_t): Code: 49. DB::Exception: Block structure mismatch in function connect between CopyingDataToViewsTransform and PushingToWindowViewSink stream: different number of columns: | |
CampaignID UInt32 UInt32(size = 0), RotatorName String String(size = 0), LastActionAt DateTime UInt32(size = 0), Count UInt64 UInt64(size = 0), _topic LowCardinality(String) ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1))), _key String String(size = 0), _offset UInt64 UInt64(size = 0), _partition UInt64 UInt64(size = 0), _timestamp Nullable(DateTime) Nullable(size = 0, UInt32(size = 0), UInt8(size = 0)), _timestamp_ms Nullable(DateTime64(3)) Nullable(size = 0, DateTime64(size = 0), UInt8(size = 0)), _headers.name Array(String) Array(size = 0, UInt64(size = 0), String(size = 0)), _headers.value Array(String) Array(size = 0, UInt64(size = 0), String(size = 0)) | |
CampaignID UInt32 UInt32(size = 0), RotatorName String String(size = 0), LastActionAt DateTime UInt32(size = 0), Count UInt64 UInt64(size = 0). (LOGICAL_ERROR), Stack trace (when copying this message, always include the lines below): | |
0. DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) in /usr/bin/clickhouse | |
1. ? in /usr/bin/clickhouse | |
2. DB::connect(DB::OutputPort&, DB::InputPort&) in /usr/bin/clickhouse | |
3. DB::buildPushingToViewsChain(std::__1::shared_ptr<DB::IStorage> const&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const> const&, std::__1::shared_ptr<DB::Context const>, std::__1::shared_ptr<DB::IAST> const&, bool, DB::ThreadStatus*, std::__1::atomic<unsigned long>*, DB::Block const&) in /usr/bin/clickhouse | |
4. DB::InterpreterInsertQuery::buildChainImpl(std::__1::shared_ptr<DB::IStorage> const&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const> const&, DB::Block const&, DB::ThreadStatus*, std::__1::atomic<unsigned long>*) in /usr/bin/clickhouse | |
5. DB::InterpreterInsertQuery::execute() in /usr/bin/clickhouse | |
6. DB::StorageKafka::streamToViews() in /usr/bin/clickhouse | |
7. DB::StorageKafka::threadFunc(unsigned long) in /usr/bin/clickhouse | |
8. DB::BackgroundSchedulePoolTaskInfo::execute() in /usr/bin/clickhouse | |
9. DB::BackgroundSchedulePool::threadFunction() in /usr/bin/clickhouse | |
10. ? in /usr/bin/clickhouse | |
11. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) in /usr/bin/clickhouse | |
12. ? in /usr/bin/clickhouse | |
13. ? in ? | |
14. __clone in ? | |
(version 22.8.4.7 (official build)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment