Last active
December 8, 2022 18:48
-
-
Save ottomata/6f4493042331f051f1651b8cd3da1e53 to your computer and use it in GitHub Desktop.
flink-kubernetes-operator python example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM docker-registry.wikimedia.org/flink:1.16.0-37 | |
# add python script | |
USER root | |
RUN mkdir -p /srv/flink_app && ls | |
ADD python_demo.py /srv/flink_app/python_demo.py | |
USER flink | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# To deploy this: | |
# kubectl apply -f ./python-example.yaml | |
# To undeploy: | |
# kubectl delete flinkdeployment python-example | |
apiVersion: flink.apache.org/v1beta1 | |
kind: FlinkDeployment | |
metadata: | |
namespace: flink-app0 | |
name: python-example | |
spec: | |
image: pyflink_demo:wmf_flink39 | |
flinkVersion: v1_16 | |
flinkConfiguration: | |
taskmanager.numberOfTaskSlots: "1" | |
serviceAccount: flink | |
jobManager: | |
resource: | |
memory: "2048m" | |
cpu: 1 | |
taskManager: | |
resource: | |
memory: "2048m" | |
cpu: 1 | |
job: | |
# jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.0.jar # Note, this jarURI is actually a placeholder | |
entryClass: "org.apache.flink.client.python.PythonDriver" | |
args: ["-pyclientexec", "/usr/bin/python3", "-py", "/srv/flink_app/python_demo.py"] | |
parallelism: 1 | |
upgradeMode: stateless | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import sys | |
from pyflink.datastream import StreamExecutionEnvironment | |
from pyflink.table import StreamTableEnvironment | |
def python_demo(): | |
env = StreamExecutionEnvironment.get_execution_environment() | |
env.set_parallelism(1) | |
t_env = StreamTableEnvironment.create(stream_execution_environment=env) | |
t_env.execute_sql(""" | |
CREATE TABLE orders ( | |
order_number BIGINT, | |
price DECIMAL(32,2), | |
buyer ROW<first_name STRING, last_name STRING>, | |
order_time TIMESTAMP(3) | |
) WITH ( | |
'connector' = 'datagen', | |
'rows-per-second' = '1' | |
)""") | |
t_env.execute_sql(""" | |
CREATE TABLE print_table ( | |
order_number BIGINT, | |
price DECIMAL(32,2), | |
buyer ROW<first_name STRING, last_name STRING>, | |
order_time TIMESTAMP(3) | |
) WITH ( | |
'connector' = 'print' | |
)""") | |
t_env.execute_sql(""" | |
INSERT INTO print_table SELECT * FROM orders""") | |
if __name__ == '__main__': | |
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") | |
python_demo() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment