apache-sparkpysparkapache-kafkaetldebezium

Pyspark stream kafka debezium topic Error format, ETL


I have successfully created a mariadb database connection using debezium and kafka

When I tried to stream the topic using pyspark this is the output that I get

-------------------------------------------
Batch: 0
-------------------------------------------
+------+--------------------------------------------------------------------------------------------------------------------------+
|key   |value                                                                                                                     |
+------+--------------------------------------------------------------------------------------------------------------------------+
||MaxDoe1.4.2.Final\nmysqlmariadb\btruebasecampemployees mysql-bin.000032�r�ȯݭd                   |
||\bJane\bMary1.4.2.Final\nmysqlmariadb\btruebasecampemployees mysql-bin.000032�r�ȯݭd               |
||\nAliceJohnson1.4.2.Final\nmysqlmariadb\blastbasecampemployees mysql-bin.000032�r�ȯݭd            |
||MaxDoe\bMaxxDoe1.4.2.Final\nmysqlmariadb���߭d\nfalsebasecampemployees mysql-bin.000032�\bu���߭d|
||\bMaxxDoeMaxDoe1.4.2.Final\nmysqlmariadb���߭d\nfalsebasecampemployees mysql-bin.000032�\ru��߭d |
||\bMaxxDoeMaxDoe1.4.2.Final\nmysqlmariadb���߭d\nfalsebasecampemployees mysql-bin.000032�\ru����d|
+------+--------------------------------------------------------------------------------------------------------------------------+

would this be a problem when I want to load/stream this data to the target database, basically what Im trying to do is mariadb database -> kafka-debezium -> pyspark -> mariadb database

this is my code that I got my output with:

#pysparkkafka1

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import traceback
import time

try:
    # Initialize the Spark session
    spark = SparkSession.builder \
        .appName("Kafka Spark Integration") \
        .getOrCreate()

    print("Spark session started")

    # Define Kafka topic and server
    kafka_topic = "mariadb.basecamp.employees"
    kafka_server = "kafka:9092"  # Replace with your Kafka server address if different

    # Print the Kafka topic
    print(f"Reading from Kafka topic: {kafka_topic}")

    # Read from the Kafka topic
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_server) \
        .option("subscribe", kafka_topic) \
        .option("startingOffsets", "earliest") \
        .load()

    print("DataFrame created from Kafka topic")

    # Select key and value and convert from bytes to string
    df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    print("DataFrame transformed")

    # Display the dataframe in console
    query = df.writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", False) \
        .start()

    print("Query started")

    # Timeout for termination
    timeout = 60  # 1 minute timeout
    start_time = time.time()
    while time.time() - start_time < timeout:
        if query.isActive:
            print("Streaming...")
            time.sleep(10)  # Check every 10 seconds
        else:
            break

    query.stop()
    print("Query stopped")
    
except Exception as e:
    print("An error occurred:", e)
    traceback.print_exc()

finally:
    spark.stop()
    print("Spark session stopped")

this is my docker compose

version: '3.8'

services:
  mariadb:
    image: mariadb:10.5
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: password
      MYSQL_DATABASE: mydatabase
      MYSQL_USER: root
      MYSQL_PASSWORD: password
    volumes:
      - ./mariadb-config:/etc/mysql/mariadb.conf.d
      - D:\mariakafka\my.cnf:/etc/mysql/my.cnf
      - ./mysql:/var/lib/mysql
    ports:
      - "3306:3306"

  phpmyadmin:
    image: phpmyadmin
    restart: always
    ports:
      - 8080:80
    environment:
      - PMA_ARBITRARY=1
      - UPLOAD_LIMIT=2G
      - MEMORY_LIMIT=2G

  postgres:
    image: debezium/postgres:13
    restart: always
    volumes:
      - ./postgres:/var/lib/postgresql/data
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=docker
      - POSTGRES_PASSWORD=docker
      - POSTGRES_DB=exampledb

  pgadmin:
    image: dpage/pgadmin4
    restart: always
    environment:
      PGADMIN_DEFAULT_EMAIL: admin@admin.com
      PGADMIN_DEFAULT_PASSWORD: root
    ports:
      - "5050:80"
    depends_on:
      - postgres
 
  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-server:5.5.1
    restart: always
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT"
      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
      KAFKA_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
      - "9092:9092"

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    restart: always
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8087,http://localhost:8087
    ports:
      - 8087:8087
    depends_on: [zookeeper, kafka]

  debezium:
    image: debezium/connect:1.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_status
      KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8087
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8087
      OFFSET_FLUSH_INTERVAL_MS: 60000
      OFFSET_FLUSH_TIMEOUT_MS: 5000
      SHUTDOWN_TIMEOUT: 10000
    ports:
      - 8083:8083
    depends_on:
      - kafka
      - schema-registry
    volumes:
      - D:\mariakafka\kafka\config:/kafka/config
      - D:/mariakafka/mysql-connector:/usr/share/java/kafka-connect

  pyspark:
    image: jupyter/pyspark-notebook:latest
    ports:
      - "8888:8888"
    environment:
      - PYSPARK_SUBMIT_ARGS=--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --jars /home/jovyan/work/mysql-connector-j-9.0.0.jar pyspark-shell
    volumes:
      - D:/mariakafka/pyspark:/home/jovyan/work
      - D:/mysql-connector-j-9.0.0.jar:/home/jovyan/work/mysql-connector-j-9.0.0.jar
    depends_on:
      - kafka

volumes:
  postgres_data:
  mariadb_data:
              
networks:
  flink-network:
    driver: bridge

This is what I was hoping to see, I got this output using the schema registry container

docker exec -it  3179874d15c23934fc55b841a5650d6e07a33a72cbdd74de308615a0c11c45e0 bash

and then

kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic mariadb.basecamp.employees --from-beginning --property schema.registry.url=http://schema-registry:8087

But I dont understand replicate it in pyspark

{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"true"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623364},"transaction":null}
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":2,"first_name":"Jane","last_name":"Mary"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"true"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623367},"transaction":null}
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":3,"first_name":"Alice","last_name":"Johnson"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"last"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623369},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724126944000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":549,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724126944214},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724127005000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":855,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724127006261},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724127005000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":855,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724127912824},"transaction":null}

Solution

  • You're casting the data in Spark as a String, not deserializing to Avro, or using the Schema Registry as part of your Spark consumer. Therefore you'll get unparsable UTF8 strings.

    You could fix your Debezium config to use JsonConverter rather than Avro, otherwise you'll have to write a UDF function to deserialize the Avro appropriately. Integrating Spark Structured Streaming with the Confluent Schema Registry

    Debezium or Confluent offer a JDBC sink as well, so you could also choose to remove Spark