I am trying to connect to Cloud SQL by using Python SDK io.jdbc module, more specifically ReadFromJdbc class, which is documented here- https://beam.apache.org/releases/pydoc/current/apache_beam.io.jdbc.html
Based on it and info on connecting to Cloud MySQL using JDBC here- https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/blob/main/docs/jdbc-mysql.md I wrote the following code
import apache_beam as beam
import apache_beam.io.jdbc as jdbc
import typing
import apache_beam.coders as coders
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = {
'project': 'project-name',
'runner': 'DataflowRunner',
'region': 'europe-central2',
'staging_location':"gs://temp",
'temp_location':"gs://temp",
'template_location':"gs://templates/temp_name"
}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
serviceAccount = r'path\to\serviceaccount.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount
ExampleRow = typing.NamedTuple('ExampleRow',
[('id', int), ('migration', str)])
coders.registry.register_coder(ExampleRow, coders.RowCoder)
with beam.Pipeline(options=pipeline_options) as p:
res = (
p
| "Read database list" >> jdbc.ReadFromJdbc(
table_name='table',
driver_class_name='com.mysql.jdbc.Driver',
jdbc_url='jdbc:mysql:///<DATABASE_NAME>?cloudSqlInstance=<INSTANCE_CONNECTION_NAME>&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=<MYSQL_USER_NAME>&password=<MYSQL_USER_PASSWORD>',
username='user',
password='pass',
query = "select id, migration from db.table;",
fetch_size=1,
classpath=["com.google.cloud.sql:mysql-socket-factory-connector-j-8:1.7.2"],
expansion_service = 'host:6666'
)
| "Print results" >> beam.io.WriteToText(r'gs://output/out.csv')
)
For the expansion service I have set up WLS2 python environment as documented here- https://beam.apache.org/documentation/sdks/java-multi-language-pipelines/#advanced-start-an-expansion-service
Unfortunately, I get this error:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:6666: WSA Error"
debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:6666: WSA Error {grpc_status:14, created_time:"2022-12-08T15:43:05.445755053+00:00"}"
I tried to switch expansion_service
to a specific IP that I got from wls hostname -I
but it produced the same result, even though you can reach it (tested with ping and hosted a webserver).
Am I doing something completely wrong? I find it hard to believe that it's so hard to connect to Cloud SQL, so I must be...
Transforms under apache_beam.io.jdbc
module are cross-language transforms implemented in the Beam Java SDK. Hence, during the pipeline construction, Python SDK will connect to a Java expansion service
to expand these transforms. You followed the instructions to create a Python expansion service
.
I think the easiest thing to do will be to use the default expansion service.
java
command is available. p | "Read database list" >> jdbc.ReadFromJdbc(
table_name='table',
driver_class_name='com.mysql.jdbc.Driver',
jdbc_url='jdbc:mysql:///<DATABASE_NAME>?cloudSqlInstance=<INSTANCE_CONNECTION_NAME>&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=<MYSQL_USER_NAME>&password=<MYSQL_USER_PASSWORD>',
username='user',
password='pass',
query = "select id, migration from db.table;",
fetch_size=1,
classpath=["com.google.cloud.sql:mysql-socket-factory-connector-j-8:1.7.2"]
)