Skip to content

Instantly share code, notes, and snippets.

@gunnarmorling
Last active January 4, 2023 15:30
Show Gist options
  • Save gunnarmorling/eff2ea093d47fc30d0b975cf691113f6 to your computer and use it in GitHub Desktop.
Save gunnarmorling/eff2ea093d47fc30d0b975cf691113f6 to your computer and use it in GitHub Desktop.
diff --git a/pyflink-walkthrough/Dockerfile b/pyflink-walkthrough/Dockerfile
index eb9afc1..c93b29a 100644
--- a/pyflink-walkthrough/Dockerfile
+++ b/pyflink-walkthrough/Dockerfile
@@ -28,7 +28,7 @@ ARG FLINK_VERSION=1.16.0
# debian bullseye which ships with that version, so build python3.7 here.
RUN set -ex; \
apt-get update && \
- apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
+ apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
diff --git a/pyflink-walkthrough/docker-compose.yml b/pyflink-walkthrough/docker-compose.yml
index 9120ec3..fe89ff6 100644
--- a/pyflink-walkthrough/docker-compose.yml
+++ b/pyflink-walkthrough/docker-compose.yml
@@ -47,6 +47,10 @@ services:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
zookeeper:
image: wurstmeister/zookeeper:3.4.6
+ ulimits:
+ nofile:
+ soft: 65536
+ hard: 65536
ports:
- "2181:2181"
kafka:
diff --git a/pyflink-walkthrough/payment_msg_proccessing.py b/pyflink-walkthrough/payment_msg_proccessing.py
index 759175a..bc2a1c7 100644
--- a/pyflink-walkthrough/payment_msg_proccessing.py
+++ b/pyflink-walkthrough/payment_msg_proccessing.py
@@ -18,6 +18,7 @@
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
+from pyflink.table.expressions import call, col
from pyflink.table.udf import udf
@@ -73,9 +74,9 @@ def log_processing():
t_env.register_function('province_id_to_name', province_id_to_name)
t_env.from_path("payment_msg") \
- .select("province_id_to_name(provinceId) as province, payAmount") \
- .group_by("province") \
- .select("province, sum(payAmount) as pay_amount") \
+ .select(call('province_id_to_name', col('provinceId')).alias("province"), col('payAmount')) \
+ .group_by(col('province')) \
+ .select(col('province'), call('sum', col('payAmount').alias("pay_amount"))) \
.execute_insert("es_sink")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment