Skip to content

Instantly share code, notes, and snippets.

@BugenZhao
Created May 17, 2023 08:27
Show Gist options
  • Save BugenZhao/d0746332edac545c093d64bb1f629b7b to your computer and use it in GitHub Desktop.
Save BugenZhao/d0746332edac545c093d64bb1f629b7b to your computer and use it in GitHub Desktop.
Flink Table Store Nexmark
CREATE TABLE datagen (
event_type int,
person ROW<
id BIGINT,
name VARCHAR,
emailAddress VARCHAR,
creditCard VARCHAR,
city VARCHAR,
state VARCHAR,
dateTime TIMESTAMP(3),
extra VARCHAR>,
auction ROW<
id BIGINT,
itemName VARCHAR,
description VARCHAR,
initialBid BIGINT,
reserve BIGINT,
dateTime TIMESTAMP(3),
expires TIMESTAMP(3),
seller BIGINT,
category BIGINT,
extra VARCHAR>,
bid ROW<
auction BIGINT,
bidder BIGINT,
price BIGINT,
channel VARCHAR,
url VARCHAR,
dateTime TIMESTAMP(3),
extra VARCHAR>,
dateTime AS
CASE
WHEN event_type = 0 THEN person.dateTime
WHEN event_type = 1 THEN auction.dateTime
ELSE bid.dateTime
END,
WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) WITH (
'write-mode' = 'append-only'
);
CREATE TEMPORARY VIEW person AS
SELECT
person.id,
person.name,
person.emailAddress,
person.creditCard,
person.city,
person.state,
dateTime,
person.extra
FROM datagen WHERE event_type = 0;
CREATE TEMPORARY VIEW auction AS
SELECT
auction.id,
auction.itemName,
auction.description,
auction.initialBid,
auction.reserve,
dateTime,
auction.expires,
auction.seller,
auction.category,
auction.extra
FROM datagen WHERE event_type = 1;
CREATE TEMPORARY VIEW bid AS
SELECT
bid.auction,
bid.bidder,
bid.price,
bid.channel,
bid.url,
dateTime,
bid.extra
FROM datagen WHERE event_type = 2;
EXPLAIN
-- INSERT INTO discard_sink
SELECT
channel,
DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
max(DATE_FORMAT(dateTime, 'HH:mm')) as `minute`,
count(*) AS total_bids,
count(*) filter (where price < 10000) AS rank1_bids,
count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
count(*) filter (where price >= 1000000) AS rank3_bids,
count(distinct bidder) AS total_bidders,
count(distinct bidder) filter (where price < 10000) AS rank1_bidders,
count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders,
count(distinct auction) AS total_auctions,
count(distinct auction) filter (where price < 10000) AS rank1_auctions,
count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
count(distinct auction) filter (where price >= 1000000) AS rank3_auctions
FROM bid
GROUP BY channel, DATE_FORMAT(dateTime, 'yyyy-MM-dd');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment