google-cloud-platformgoogle-cloud-dataflowapache-beamgoogle-cloud-sqlapache-beam-io

Cannot connect to Cloud SQL using Apache-Beam JDBC


I am trying to connect to Cloud SQL by using Python SDK io.jdbc module, more specifically ReadFromJdbc class, which is documented here- https://beam.apache.org/releases/pydoc/current/apache_beam.io.jdbc.html

Based on it and info on connecting to Cloud MySQL using JDBC here- https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/blob/main/docs/jdbc-mysql.md I wrote the following code

import apache_beam as beam
import apache_beam.io.jdbc as jdbc
import typing
import apache_beam.coders as coders

from apache_beam.options.pipeline_options import PipelineOptions

pipeline_options = {
    'project': 'project-name',
    'runner': 'DataflowRunner',
    'region': 'europe-central2',
    'staging_location':"gs://temp",
    'temp_location':"gs://temp",
    'template_location':"gs://templates/temp_name"
}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)


serviceAccount = r'path\to\serviceaccount.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

ExampleRow = typing.NamedTuple('ExampleRow',
                               [('id', int), ('migration', str)])
coders.registry.register_coder(ExampleRow, coders.RowCoder)


with beam.Pipeline(options=pipeline_options) as p:
    res = (
        p
        | "Read database list" >> jdbc.ReadFromJdbc(
            table_name='table',
            driver_class_name='com.mysql.jdbc.Driver',
            jdbc_url='jdbc:mysql:///<DATABASE_NAME>?cloudSqlInstance=<INSTANCE_CONNECTION_NAME>&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=<MYSQL_USER_NAME>&password=<MYSQL_USER_PASSWORD>',
            username='user',
            password='pass',
            query = "select id, migration from db.table;",
            fetch_size=1,
            classpath=["com.google.cloud.sql:mysql-socket-factory-connector-j-8:1.7.2"],
            expansion_service = 'host:6666'
        )
        | "Print results" >> beam.io.WriteToText(r'gs://output/out.csv')
    )

For the expansion service I have set up WLS2 python environment as documented here- https://beam.apache.org/documentation/sdks/java-multi-language-pipelines/#advanced-start-an-expansion-service

Unfortunately, I get this error:

grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:6666: WSA Error"
        debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:6666: WSA Error {grpc_status:14, created_time:"2022-12-08T15:43:05.445755053+00:00"}"

I tried to switch expansion_service to a specific IP that I got from wls hostname -I but it produced the same result, even though you can reach it (tested with ping and hosted a webserver).

Am I doing something completely wrong? I find it hard to believe that it's so hard to connect to Cloud SQL, so I must be...


Solution

  • Transforms under apache_beam.io.jdbc module are cross-language transforms implemented in the Beam Java SDK. Hence, during the pipeline construction, Python SDK will connect to a Java expansion service to expand these transforms. You followed the instructions to create a Python expansion service.

    I think the easiest thing to do will be to use the default expansion service.

           p | "Read database list" >> jdbc.ReadFromJdbc(
                table_name='table',
                driver_class_name='com.mysql.jdbc.Driver',
                jdbc_url='jdbc:mysql:///<DATABASE_NAME>?cloudSqlInstance=<INSTANCE_CONNECTION_NAME>&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=<MYSQL_USER_NAME>&password=<MYSQL_USER_PASSWORD>',
                username='user',
                password='pass',
                query = "select id, migration from db.table;",
                fetch_size=1,
                classpath=["com.google.cloud.sql:mysql-socket-factory-connector-j-8:1.7.2"]
            )