Skip to content

Instantly share code, notes, and snippets.

@Abacn
Created September 8, 2023 15:24
Show Gist options
  • Save Abacn/4e0375826ee762bbc532a2089eb4fbe9 to your computer and use it in GitHub Desktop.
Save Abacn/4e0375826ee762bbc532a2089eb4fbe9 to your computer and use it in GitHub Desktop.
Beam Python Jdbc read write Date
# Code snippet for read/write Date logical type that currently unsupported in Beam Python
# without the need of cast, by implements your own LogicalType (DateType)
import datetime
from decimal import Decimal
import logging
import typing
from apache_beam.coders.row_coder import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.utils.timestamp import Timestamp
import sqlalchemy
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.jdbc import ReadFromJdbc, WriteToJdbc
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline
JdbcWriteTestRow = typing.NamedTuple(
"JdbcWriteTestRow",
[
("f_id", int),
("f_real", float),
("f_str", str),
("f_str2", str),
("f_bin", bytes),
("f_bin2", bytes),
("f_decimal", Decimal),
("f_date", datetime.date)
],
)
@LogicalType.register_logical_type
class DateType(LogicalType[datetime.date, MillisInstant, str]):
def __init__(self, unused=""):
pass
@classmethod
def representation_type(cls):
# type: () -> type
return Timestamp
@classmethod
def urn(cls):
return "beam:logical_type:javasdk:v1"
@classmethod
def language_type(cls):
return datetime.date
def to_representation_type(self, value):
# type: (datetime.date) -> Timestamp
return Timestamp.from_utc_datetime(datetime.datetime.combine(value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))
def to_language_type(self, value):
# type: (Timestamp) -> datetime.date
return value.to_utc_datetime().date()
@classmethod
def argument_type(cls):
return str
def argument(self):
return ""
@classmethod
def _from_typing(cls, typ):
return cls()
class JdbcioTest:
ROW_COUNT = 10
def __init__(self):
self.table_name = 'datetest' # testxlangnodate
self.username = 'postgres'
self.password = ''
self.driver = 'org.postgresql.Driver'
self.jdbc_url = 'jdbc:postgresql://localhost:5432/postgres' # ?stringtype=unspecified
self.rfc1738_url = 'postgresql://localhost:5432/postgres'
self.engine = sqlalchemy.create_engine(self.rfc1738_url)
def run_write(self):
options = PipelineOptions()
inserted_rows = [
JdbcWriteTestRow(i, i + 0.1, str(i-1)+".23", str(i-1)+".25", b'123', b'123', Decimal(str(i-1)+".23"), datetime.date.today() + datetime.timedelta(days=i))
for i in range(self.ROW_COUNT)
]
self.engine.execute(
"CREATE TABLE IF NOT EXISTS {}(f_id INTEGER, f_real FLOAT, f_str VARCHAR(5), f_str2 CHAR(5), f_bin bytea, f_bin2 bytea, f_decimal DECIMAL(10,2), f_date DATE)".format(self.table_name)) #
# this line is needed for write JdbcWriteTestRow to Jdbc
coders.registry.register_coder(JdbcWriteTestRow, coders.RowCoder)
with Pipeline(options=options) as p:
_ = (
p
| beam.Create(inserted_rows).with_output_types(JdbcWriteTestRow)
| 'Write to jdbc' >> WriteToJdbc(
table_name=self.table_name,
driver_class_name=self.driver,
jdbc_url=self.jdbc_url.replace('localhost', 'host.docker.internal'),
username=self.username,
password=self.password
))
def run_read(self):
options = PipelineOptions()
# This line is needed to overwrite the correspondence Timestamp<->MillisInstant
LogicalType.register_logical_type(MillisInstant)
with Pipeline(options=options) as p:
result = (
p
| 'Read from jdbc' >> ReadFromJdbc(
table_name=self.table_name,
driver_class_name=self.driver,
jdbc_url=self.jdbc_url.replace('localhost', 'host.docker.internal'),
username=self.username,
password=self.password))
_ = result | beam.Map(lambda x: print("PrintFn", x.f_id, x.f_real, x.f_str, x.f_decimal, x.f_date))
"""
Expected result:
PrintFn 0 0.1 -1.23 -1.23 2023-09-08
PrintFn 1 1.1 0.23 0.23 2023-09-09
PrintFn 2 2.1 1.23 1.23 2023-09-10
PrintFn 3 3.1 2.23 2.23 2023-09-11
PrintFn 4 4.1 3.23 3.23 2023-09-12
PrintFn 5 5.1 4.23 4.23 2023-09-13
PrintFn 6 6.1 5.23 5.23 2023-09-14
PrintFn 7 7.1 6.23 6.23 2023-09-15
PrintFn 8 8.1 7.23 7.23 2023-09-16
PrintFn 9 9.1 8.23 8.23 2023-09-17
"""
if __name__ == '__main__':
logging.getLogger().setLevel('INFO')
test_instance = JdbcioTest()
test_instance.run_write()
test_instance.run_read()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment