Last active
January 4, 2023 15:30
-
-
Save gunnarmorling/eff2ea093d47fc30d0b975cf691113f6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/pyflink-walkthrough/Dockerfile b/pyflink-walkthrough/Dockerfile | |
index eb9afc1..c93b29a 100644 | |
--- a/pyflink-walkthrough/Dockerfile | |
+++ b/pyflink-walkthrough/Dockerfile | |
@@ -28,7 +28,7 @@ ARG FLINK_VERSION=1.16.0 | |
# debian bullseye which ships with that version, so build python3.7 here. | |
RUN set -ex; \ | |
apt-get update && \ | |
- apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \ | |
+ apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \ | |
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \ | |
tar -xvf Python-3.7.9.tgz && \ | |
cd Python-3.7.9 && \ | |
diff --git a/pyflink-walkthrough/docker-compose.yml b/pyflink-walkthrough/docker-compose.yml | |
index 9120ec3..fe89ff6 100644 | |
--- a/pyflink-walkthrough/docker-compose.yml | |
+++ b/pyflink-walkthrough/docker-compose.yml | |
@@ -47,6 +47,10 @@ services: | |
- JOB_MANAGER_RPC_ADDRESS=jobmanager | |
zookeeper: | |
image: wurstmeister/zookeeper:3.4.6 | |
+ ulimits: | |
+ nofile: | |
+ soft: 65536 | |
+ hard: 65536 | |
ports: | |
- "2181:2181" | |
kafka: | |
diff --git a/pyflink-walkthrough/payment_msg_proccessing.py b/pyflink-walkthrough/payment_msg_proccessing.py | |
index 759175a..bc2a1c7 100644 | |
--- a/pyflink-walkthrough/payment_msg_proccessing.py | |
+++ b/pyflink-walkthrough/payment_msg_proccessing.py | |
@@ -18,6 +18,7 @@ | |
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic | |
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings | |
+from pyflink.table.expressions import call, col | |
from pyflink.table.udf import udf | |
@@ -73,9 +74,9 @@ def log_processing(): | |
t_env.register_function('province_id_to_name', province_id_to_name) | |
t_env.from_path("payment_msg") \ | |
- .select("province_id_to_name(provinceId) as province, payAmount") \ | |
- .group_by("province") \ | |
- .select("province, sum(payAmount) as pay_amount") \ | |
+ .select(call('province_id_to_name', col('provinceId')).alias("province"), col('payAmount')) \ | |
+ .group_by(col('province')) \ | |
+ .select(col('province'), call('sum', col('payAmount').alias("pay_amount"))) \ | |
.execute_insert("es_sink") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[build@control pyflink-walkthrough (FLINK-30441 *)]$ docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink-walkthrough/payment_msg_proccessing.py -d | |
Traceback (most recent call last): | |
File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main | |
"__main__", mod_spec) | |
File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code | |
exec(code, run_globals) | |
File "/tmp/pyflink/2329ce35-5274-4e29-95fa-9fa7e9a2c7bc/1779f581-f1d1-40a9-9d18-9aa2cf52f97e/payment_msg_proccessing.py", line 83, in <module> | |
log_processing() | |
File "/tmp/pyflink/2329ce35-5274-4e29-95fa-9fa7e9a2c7bc/1779f581-f1d1-40a9-9d18-9aa2cf52f97e/payment_msg_proccessing.py", line 76, in log_processing | |
.select("province_id_to_name(provinceId) as province, payAmount") \ | |
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table.py", line 131, in select | |
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/utils.py", line 104, in to_expression_jarray | |
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/utils.py", line 104, in <listcomp> | |
AttributeError: 'str' object has no attribute '_j_expr' | |
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 | |
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:498) | |
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) | |
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) | |
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) | |
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:846) | |
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240) | |
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1090) | |
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1168) | |
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) | |
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1168) | |
Caused by: java.lang.RuntimeException: Python process exits with code: 1 | |
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130) | |
... 13 more |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment