I am looking for resources on how to read/write data from/to Postgres using Beam Python SDK.So far, I have learned that apache_beam.io.jdbc is our best bet (let me know if I there's a better alternative).
I tried using it, and it was able to handle primitive data types, like integer and strings. However, it could not handle LogicalTypes like "timestamp without time zone" type from Postgres.
Here's some details on my little experiment. Appreciate any help!
Python v3.11.4
apache-beam v2.51.0 (Python SDK)
postgres v11.5
DirectRunner
Here is the pipeline code:
with beam.Pipeline(options=None) as p:
pipeline = (
p
| ReadFromJdbc(
table_name="table_name",
driver_class_name='org.postgresql.Driver',
jdbc_url='jdbc:{}://{}:{}/{}'.format("postgresql", "127.0.0.1", "5432", "db_name"),
username="postgres",
password="redacted",
query="SELECT * FROM table_name")
| beam.Map(print)
)
And it runs into this below error when trying to parse "timestamp without time zone" column. My understanding is the LogicalType MicrosInstant
is not able to parse the timestamp. I can confirm the value of my timestamp field is not NULL.
File "apache_beam/coders/coder_impl.py", line 1890, in apache_beam.coders.coder_impl.LogicalTypeCoderImpl.decode_from_stream
File "/Users/archit.shah/PycharmProjects/duplopy-pysql-beam/venv-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py", line 873, in to_language_type
return Timestamp(seconds=int(value.seconds), micros=int(value.micros))
^^^^^^^^^^^^^^^^^^
TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'
I also see this below Java warning:
WARNING:root:severity: WARN
timestamp {
seconds: 1697642143
nanos: 162000000
}
message: "Hanged up for url: \"host.docker.internal:58970\"\n."
log_location: "org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer"
thread: "16"
May be I am missing something? I also tried registering a coder, but same result.
Add this two lines before pipeline creation should resolve the issue
from apache_beam.typehints.schemas import MillisInstant
LogicalType.register_logical_type(MillisInstant)
This was due to Java JdbcIO is using joda timestamp. Until https://github.com/apache/beam/issues/28359 has been resolved, this workaround is needed