Last active
March 19, 2022 16:32
-
-
Save mustafaakin/457859b8bf703c64029071c1139b593d to your computer and use it in GitHub Desktop.
Flink Streaming SQL Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
objc[3232]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/bin/java (0x1008de4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x1009a64e0). One of the two will be used. Which one is undefined. | |
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". | |
SLF4J: Defaulting to no-operation (NOP) logger implementation | |
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. | |
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1862072598] with leader session id 10afca54-e5f2-4b30-931f-b1df0275adfe. | |
07/02/2017 11:51:33 Job execution switched to status RUNNING. | |
07/02/2017 11:51:33 Source: Socket Stream -> Map -> Timestamps/Watermarks -> from: (room, temperature, creationDate, rowtime) -> select: (rowtime, room, temperature)(1/1) switched to SCHEDULED | |
07/02/2017 11:51:33 groupBy: (room), window: (TumblingGroupWindow('w$, 'rowtime, 10000.millis)), select: (room, AVG(temperature) AS avgTemp, start('w$) AS w$start, end('w$) AS w$end) -> select: (room, w$end, avgTemp) -> to: Row -> Sink: Unnamed(1/1) switched to SCHEDULED | |
07/02/2017 11:51:33 Source: Socket Stream -> Map -> Timestamps/Watermarks -> from: (room, temperature, creationDate, rowtime) -> select: (rowtime, room, temperature)(1/1) switched to DEPLOYING | |
07/02/2017 11:51:33 groupBy: (room), window: (TumblingGroupWindow('w$, 'rowtime, 10000.millis)), select: (room, AVG(temperature) AS avgTemp, start('w$) AS w$start, end('w$) AS w$end) -> select: (room, w$end, avgTemp) -> to: Row -> Sink: Unnamed(1/1) switched to DEPLOYING | |
07/02/2017 11:51:33 Source: Socket Stream -> Map -> Timestamps/Watermarks -> from: (room, temperature, creationDate, rowtime) -> select: (rowtime, room, temperature)(1/1) switched to RUNNING | |
07/02/2017 11:51:33 groupBy: (room), window: (TumblingGroupWindow('w$, 'rowtime, 10000.millis)), select: (room, AVG(temperature) AS avgTemp, start('w$) AS w$start, end('w$) AS w$end) -> select: (room, w$end, avgTemp) -> to: Row -> Sink: Unnamed(1/1) switched to RUNNING | |
Accepted connection | |
living room,2017-07-02 08:51:40.0,36.7043450149447 | |
kitchen,2017-07-02 08:51:40.0,33.876180051046205 | |
attic,2017-07-02 08:51:40.0,36.16675359462062 | |
outside,2017-07-02 08:51:40.0,31.82492651162825 | |
bedroom,2017-07-02 08:51:40.0,35.57564839912154 | |
kitchen,2017-07-02 08:51:50.0,35.17374622822952 | |
attic,2017-07-02 08:51:50.0,34.70059246571888 | |
bedroom,2017-07-02 08:51:50.0,33.42090378358368 | |
outside,2017-07-02 08:51:50.0,36.28734383828417 | |
living room,2017-07-02 08:51:50.0,35.69019700021718 | |
kitchen,2017-07-02 08:52:00.0,36.73052318978358 | |
living room,2017-07-02 08:52:00.0,37.6541246537134 | |
attic,2017-07-02 08:52:00.0,33.20183220572221 | |
outside,2017-07-02 08:52:00.0,35.79210457006139 | |
bedroom,2017-07-02 08:52:00.0,32.869989177833276 | |
.... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>mustafaakin</groupId> | |
<artifactId>streaming-sql-test</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-streaming-java_2.11</artifactId> | |
<version>1.3.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-table_2.11</artifactId> | |
<version>1.3.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-java</artifactId> | |
<version>1.3.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-scala_2.11</artifactId> | |
<version>1.3.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-streaming-scala_2.11</artifactId> | |
<version>1.3.1</version> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<executions> | |
<execution> | |
<id>compile</id> | |
<phase>compile</phase> | |
<goals> | |
<goal>compile</goal> | |
</goals> | |
</execution> | |
<execution> | |
<id>testCompile</id> | |
<phase>test-compile</phase> | |
<goals> | |
<goal>testCompile</goal> | |
</goals> | |
</execution> | |
</executions> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.java.tuple.Tuple3; | |
import org.apache.flink.streaming.api.TimeCharacteristic; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; | |
import org.apache.flink.table.api.Table; | |
import org.apache.flink.table.api.TableEnvironment; | |
import org.apache.flink.table.api.java.StreamTableEnvironment; | |
import org.apache.flink.types.Row; | |
import java.io.PrintWriter; | |
import java.net.ServerSocket; | |
import java.net.Socket; | |
import java.sql.Time; | |
import java.util.Random; | |
/** | |
* Created by mustafa on 01/07/2017. | |
*/ | |
public class SQLTester { | |
private static void listenAndGenerateNumbers(int port) { | |
try { | |
ServerSocket serverSocket = new ServerSocket(port); | |
Socket clientSocket = serverSocket.accept(); | |
System.out.println("Accepted connection"); | |
Random random = new Random(); | |
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); | |
String rooms[] = new String[]{"living room", "kitchen", "outside", "bedroom", "attic"}; | |
for (int i = 0; i < 10000; i++) { | |
String room = rooms[random.nextInt(rooms.length)]; | |
double temp = random.nextDouble() * 30 + 20; | |
out.println(room + "," + temp); | |
Thread.sleep(random.nextInt(10) + 50); | |
} | |
System.out.println("Closing server"); | |
clientSocket.close(); | |
serverSocket.close(); | |
} catch (Exception ex) { | |
ex.printStackTrace(); | |
} | |
} | |
private static final MapFunction<String, Tuple3<String, Double, Time>> mapFunction | |
= new MapFunction<String, Tuple3<String, Double, Time>>() { | |
@Override | |
public Tuple3<String, Double, Time> map(String s) throws Exception { | |
// data is: <roomname>,<temperature> | |
String p[] = s.split(","); | |
String room = p[0]; | |
Double temperature = Double.parseDouble(p[1]); | |
Time creationDate = new Time(System.currentTimeMillis()); | |
return new Tuple3<>(room, temperature, creationDate); | |
} | |
}; | |
private final static AscendingTimestampExtractor extractor = new AscendingTimestampExtractor<Tuple3<String, Double, Time>>() { | |
@Override | |
public long extractAscendingTimestamp(Tuple3<String, Double, Time> element) { | |
return element.f2.getTime(); | |
} | |
}; | |
public static void main(String[] args) throws Exception { | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setParallelism(1); | |
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
final int port = 3901; | |
new Thread(() -> listenAndGenerateNumbers(port)).start(); | |
Thread.sleep(1000); // wait the socket for a little; | |
DataStream<String> text = env.socketTextStream("localhost", port, "\n"); | |
DataStream<Tuple3<String, Double, Time>> dataset = text | |
.map(mapFunction) | |
.assignTimestampsAndWatermarks(extractor); | |
// Register it so we can use it in SQL | |
tableEnv.registerDataStream("sensors", dataset, "room, temperature, creationDate, rowtime.rowtime"); | |
String query = "SELECT room, TUMBLE_END(rowtime, INTERVAL '10' SECOND), AVG(temperature) AS avgTemp FROM sensors GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), room"; | |
Table table = tableEnv.sql(query); | |
// Just for printing purposes, in reality you would need something other than Row | |
tableEnv.toAppendStream(table, Row.class).print(); | |
env.execute(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment