With Spark 3.1 reaching its End Of Life, I'm in the process of switching all my Notebooks in Azure Synapse Analytics over from 3.1 to 3.3. Some of these Notebooks use data from the SQL database in Syanpse, but I've run into an issue with some boolean/bit values. It seems that after importing data with spark.read.sqlanalytics(), the Notebook crashes after filtering on a Boolean value. This didn't happen with Spark 3.1 (not sure about 3.2), and also doesn't happen if I manually create a dataframe and filter on a Boolean.
So for example, this code works in both Spark 3.1 and 3.3
# Imports
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType
# Create Spark session
spark = SparkSession.builder.appName("DatatypeTest").getOrCreate()
# Define the schema with columns of different data types
schema = StructType([
StructField("string", StringType(), True),
StructField("integer", IntegerType(), True),
StructField("float", FloatType(), True),
StructField("boolean", BooleanType(), True)
])
# Create the DataFrame
data = [("Hello", 123, 3.14, True),
("World", 456, 9.81, False)]
df = spark.createDataFrame(data, schema=schema)
# All of these work
df = df.filter(df.boolean)
df = df.filter(df.boolean == 'true')
df = df.filter(df.boolean == True)
# Show the DataFrame
display(df)
Whereas the following doesn't on 3.3, even though it's basically the same table:
CREATE TABLE [DatatypeTest]
(
[string] [varchar](100) NULL,
[integer] [int] NULL,
[float] [decimal](9, 2) NULL,
[boolean] [bit] NULL
);
INSERT INTO [DatatypeTest]
VALUES ('Hello', 123, 3.14, 1),
('World', 456, 9.81, 0);
%%spark
val df = spark.read.sqlanalytics("DatatypeTest")
df.createOrReplaceTempView("test")
%%pyspark
from pyspark.sql import functions as F
df = spark.sql("SELECT * FROM test")
#df = df.filter(F.col('boolean')) # doesn't work
#df = df.filter(df.boolean) # doesn't work
#df = df.filter(df.boolean == 'true') # doesn't work
#df = df.filter(df.boolean == True) # doesn't work
display(df)
Running df.dtypes
shows column "boolean" being a boolean. In the SQL database, column "boolean" has datatype bit.
Has anyone else run into this problem as well? The only thing I could find was Weird pyspark behavior when filtering on booleans , and this doesn't answer the question.
This is due to an issue with the Azure Synapse Dedicated SQL Pool Connector.
Instead of reading the whole table and filtering, you can read from the SQL query, filtering the Booleans with the same connector.
import com.microsoft.spark.sqlanalytics
from com.microsoft.spark.sqlanalytics.Constants import Constants
from pyspark.sql.functions import col
dfToReadFromQueryAsOption = (spark.read
.option(Constants.DATABASE, "jdedic")
.option(Constants.SERVER, "<server_name>.sql.azuresynapse.net")
.option(Constants.TEMP_FOLDER, "abfss://<container>@<storage_acc>.dfs.core.windows.net/tmp_dedicated_pool/")
.option(Constants.QUERY, "select * from dbo.DatatypeTest where boolean='true'")
.synapsesql())
dfToReadFromQueryAsOption.show()
Output:
string | integer | float | boolean |
---|---|---|---|
Hello | 123 | 3.14 | true |
For more information about the connector, refer to this Azure Synapse Dedicated SQL Pool Connector for Apache Spark - Azure Synapse Analytics | Microsoft Learn.
Or
Another way is to read using JDBC, passing the connection string like below.
df = spark.read.jdbc(url="jdbc:sqlserver://<server-name>.sql.azuresynapse.net:1433;database=<database_name>;user=<>;password=<>;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;",table="DatatypeTest")
df = df.filter(F.col('boolean'))
df = df.filter(df.boolean)
df = df.filter(df.boolean == 'true')
df = df.filter(df.boolean == True)
display(df)
Output: