Created
September 8, 2023 15:24
-
-
Save Abacn/4e0375826ee762bbc532a2089eb4fbe9 to your computer and use it in GitHub Desktop.
Beam Python Jdbc read write Date
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Code snippet for read/write Date logical type that currently unsupported in Beam Python | |
| # without the need of cast, by implements your own LogicalType (DateType) | |
| import datetime | |
| from decimal import Decimal | |
| import logging | |
| import typing | |
| from apache_beam.coders.row_coder import LogicalType | |
| from apache_beam.typehints.schemas import MillisInstant | |
| from apache_beam.utils.timestamp import Timestamp | |
| import sqlalchemy | |
| import apache_beam as beam | |
| from apache_beam import coders | |
| from apache_beam.io.jdbc import ReadFromJdbc, WriteToJdbc | |
| from apache_beam.options.pipeline_options import PipelineOptions | |
| from apache_beam.pipeline import Pipeline | |
| JdbcWriteTestRow = typing.NamedTuple( | |
| "JdbcWriteTestRow", | |
| [ | |
| ("f_id", int), | |
| ("f_real", float), | |
| ("f_str", str), | |
| ("f_str2", str), | |
| ("f_bin", bytes), | |
| ("f_bin2", bytes), | |
| ("f_decimal", Decimal), | |
| ("f_date", datetime.date) | |
| ], | |
| ) | |
| @LogicalType.register_logical_type | |
| class DateType(LogicalType[datetime.date, MillisInstant, str]): | |
| def __init__(self, unused=""): | |
| pass | |
| @classmethod | |
| def representation_type(cls): | |
| # type: () -> type | |
| return Timestamp | |
| @classmethod | |
| def urn(cls): | |
| return "beam:logical_type:javasdk:v1" | |
| @classmethod | |
| def language_type(cls): | |
| return datetime.date | |
| def to_representation_type(self, value): | |
| # type: (datetime.date) -> Timestamp | |
| return Timestamp.from_utc_datetime(datetime.datetime.combine(value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) | |
| def to_language_type(self, value): | |
| # type: (Timestamp) -> datetime.date | |
| return value.to_utc_datetime().date() | |
| @classmethod | |
| def argument_type(cls): | |
| return str | |
| def argument(self): | |
| return "" | |
| @classmethod | |
| def _from_typing(cls, typ): | |
| return cls() | |
| class JdbcioTest: | |
| ROW_COUNT = 10 | |
| def __init__(self): | |
| self.table_name = 'datetest' # testxlangnodate | |
| self.username = 'postgres' | |
| self.password = '' | |
| self.driver = 'org.postgresql.Driver' | |
| self.jdbc_url = 'jdbc:postgresql://localhost:5432/postgres' # ?stringtype=unspecified | |
| self.rfc1738_url = 'postgresql://localhost:5432/postgres' | |
| self.engine = sqlalchemy.create_engine(self.rfc1738_url) | |
| def run_write(self): | |
| options = PipelineOptions() | |
| inserted_rows = [ | |
| JdbcWriteTestRow(i, i + 0.1, str(i-1)+".23", str(i-1)+".25", b'123', b'123', Decimal(str(i-1)+".23"), datetime.date.today() + datetime.timedelta(days=i)) | |
| for i in range(self.ROW_COUNT) | |
| ] | |
| self.engine.execute( | |
| "CREATE TABLE IF NOT EXISTS {}(f_id INTEGER, f_real FLOAT, f_str VARCHAR(5), f_str2 CHAR(5), f_bin bytea, f_bin2 bytea, f_decimal DECIMAL(10,2), f_date DATE)".format(self.table_name)) # | |
| # this line is needed for write JdbcWriteTestRow to Jdbc | |
| coders.registry.register_coder(JdbcWriteTestRow, coders.RowCoder) | |
| with Pipeline(options=options) as p: | |
| _ = ( | |
| p | |
| | beam.Create(inserted_rows).with_output_types(JdbcWriteTestRow) | |
| | 'Write to jdbc' >> WriteToJdbc( | |
| table_name=self.table_name, | |
| driver_class_name=self.driver, | |
| jdbc_url=self.jdbc_url.replace('localhost', 'host.docker.internal'), | |
| username=self.username, | |
| password=self.password | |
| )) | |
| def run_read(self): | |
| options = PipelineOptions() | |
| # This line is needed to overwrite the correspondence Timestamp<->MillisInstant | |
| LogicalType.register_logical_type(MillisInstant) | |
| with Pipeline(options=options) as p: | |
| result = ( | |
| p | |
| | 'Read from jdbc' >> ReadFromJdbc( | |
| table_name=self.table_name, | |
| driver_class_name=self.driver, | |
| jdbc_url=self.jdbc_url.replace('localhost', 'host.docker.internal'), | |
| username=self.username, | |
| password=self.password)) | |
| _ = result | beam.Map(lambda x: print("PrintFn", x.f_id, x.f_real, x.f_str, x.f_decimal, x.f_date)) | |
| """ | |
| Expected result: | |
| PrintFn 0 0.1 -1.23 -1.23 2023-09-08 | |
| PrintFn 1 1.1 0.23 0.23 2023-09-09 | |
| PrintFn 2 2.1 1.23 1.23 2023-09-10 | |
| PrintFn 3 3.1 2.23 2.23 2023-09-11 | |
| PrintFn 4 4.1 3.23 3.23 2023-09-12 | |
| PrintFn 5 5.1 4.23 4.23 2023-09-13 | |
| PrintFn 6 6.1 5.23 5.23 2023-09-14 | |
| PrintFn 7 7.1 6.23 6.23 2023-09-15 | |
| PrintFn 8 8.1 7.23 7.23 2023-09-16 | |
| PrintFn 9 9.1 8.23 8.23 2023-09-17 | |
| """ | |
| if __name__ == '__main__': | |
| logging.getLogger().setLevel('INFO') | |
| test_instance = JdbcioTest() | |
| test_instance.run_write() | |
| test_instance.run_read() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment