mongodbdataframepysparkazure-databricksdatabricks-notebook

Py4JJava Error on Azure Databricks notebook


I am trying to read data from mongodb in batch My compute cluster configurations are:

I have installed mongodb connector library version is(through maven):

I have my schemas defined in a separate cell in the notebook

    StructField("_id", StringType(), True),
    StructField("channelId", StringType(), True),
    StructField("channelInformation", StructType([
        StructField("channelDefinition", StructType([
            StructField("channelName", StringType(), True),
            StructField("subChannelName", StringType(), True)
        ]), True)
    ]), True),
    StructField("componentId", StringType(), True),
    StructField("connectorId", StringType(), True),
    StructField("createdAt", StringType(), True),
    StructField("customer", StructType([
        StructField("email", StringType(), True),
        StructField("displayName", StringType(), True)
    ]), True),
    StructField("displayFinancialStatus", StringType(), True),
    StructField("displayFulfillmentStatus", StringType(), True),
    StructField("disputes", ArrayType(StructType([
        StructField("id", StringType(), True),
        StructField("status", StringType(), True),
        StructField("initiatedAs", StringType(), True)
    ])), True),
    StructField("disputesInternal", ArrayType(StringType()), True),
    StructField("id", StringType(), True),
    StructField("lineItems", StructType([
        StructField("edges", ArrayType(StructType([
            StructField("node", StructType([
                StructField("variant", StructType([
                    StructField("sku", StringType(), True)
                ]), True)
            ]), True)
        ])), True)
    ]), True),
    StructField("name", StringType(), True),
    StructField("note", StringType(), True),
    StructField("riskLevel", StringType(), True),
    StructField("tags", ArrayType(StringType()), True),
    StructField("tenantId", StringType(), True),
    StructField("transactions", ArrayType(StructType([
        StructField("authorizationExpiresAt", StringType(), True)
    ])), True)
])

In another cell i have my code that reads data from mongodb


# Define your MongoDB connection details
connectionString = "mongodb+srv://user:xyz@connection_string/"
database = tenant


for component_id, schema in schemas.items():
    options = {
        "spark.mongodb.connection.uri": connectionString,
        "spark.mongodb.database": database,
        "spark.mongodb.collection": f"Persistence_{component_id}",
    }
    df = (spark.read.format("mongodb")
          .options(**options)
          .schema(schema)
          .load())

Now when i call df.show() in the next cell it gives the following error:

: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(Lscala/collection/immutable/Seq;Lorg/apache/spark/sql/catalyst/analysis/Analyzer;)Lorg/apache/spark/sql/catalyst/encoders/ExpressionEncoder;
    at com.mongodb.spark.sql.connector.schema.SchemaToExpressionEncoderFunction.apply(SchemaToExpressionEncoderFunction.java:97)
    at com.mongodb.spark.sql.connector.schema.RowToInternalRowFunction.<init>(RowToInternalRowFunction.java:41)
    at com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter.<init>(BsonDocumentToRowConverter.java:111)
    at com.mongodb.spark.sql.connector.read.MongoBatch.<init>(MongoBatch.java:46)
    at com.mongodb.spark.sql.connector.read.MongoScan.toBatch(MongoScan.java:79)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:54)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:54)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.$anonfun$inputPartitions$2(BatchScanExec.scala:72)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions$lzycompute(BatchScanExec.scala:72)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions(BatchScanExec.scala:70)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:172)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:168)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.supportsColumnar(BatchScanExec.scala:44)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:184)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:78)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:78)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:119)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:106)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$5(QueryPlanner.scala:104)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:101)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:119)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:106)
    at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:1112)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:488)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:471)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$5(QueryExecution.scala:613)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionPhase(SQLExecution.scala:143)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:613)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1177)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:612)
    at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:608)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1184)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:608)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhaseWithTracker$1(QueryExecution.scala:625)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:195)
    at org.apache.spark.sql.execution.QueryExecution.executePhaseWithTracker(QueryExecution.scala:625)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:475)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:471)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$_executedPlan$1(QueryExecution.scala:504)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1184)
    at org.apache.spark.sql.execution.QueryExecution._executedPlan$lzycompute(QueryExecution.scala:504)
    at org.apache.spark.sql.execution.QueryExecution._executedPlan(QueryExecution.scala:499)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:573)
    at com.databricks.sql.transaction.tahoe.metering.DeltaMetering$.reportUsage(DeltaMetering.scala:229)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$10(SQLExecution.scala:655)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:793)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:333)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1184)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:204)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:730)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4805)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:3544)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3775)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:397)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:433)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
    at java.lang.Thread.run(Thread.java:750)
File <command-1524629330570308>, line 1
----> 1 df.show()
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res
File /databricks/spark/python/pyspark/sql/dataframe.py:1072, in DataFrame.show(self, n, truncate, vertical)
    983 def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> None:
    984     """
    985     Prints the first ``n`` rows of the DataFrame to the console.
    986 
   (...)
   1070     name | This is a super l...
   1071     """
-> 1072     print(self._show_string(n, truncate, vertical))
File /databricks/spark/python/pyspark/sql/dataframe.py:1090, in DataFrame._show_string(self, n, truncate, vertical)
   1084     raise PySparkTypeError(
   1085         error_class="NOT_BOOL",
   1086         message_parameters={"arg_name": "vertical", "arg_type": type(vertical).__name__},
   1087     )
   1089 if isinstance(truncate, bool) and truncate:
-> 1090     return self._jdf.showString(n, 20, vertical)
   1091 else:
   1092     try:
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
   1349 command = proto.CALL_COMMAND_NAME +\
   1350     self.command_header +\
   1351     args_command +\
   1352     proto.END_COMMAND_PART
   1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
   1356     answer, self.gateway_client, self.target_id, self.name)
   1358 for temp_arg in temp_args:
   1359     if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:255, in capture_sql_exception.<locals>.deco(*a, **kw)
    252 from py4j.protocol import Py4JJavaError
    254 try:
--> 255     return f(*a, **kw)
    256 except Py4JJavaError as e:
    257     converted = convert_exception(e.java_exception)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Solution

  • I installed the spark-mongo-connector version 2.12: 10.3.0 on compute cluster through Maven and the code started working for me.

    Previously i tested with versions 2.12 : 10.4.0 and 2.13 : 10.4.0 both of them did not work.

    The possible reason is that spark version - 3.5.0 was not compatible with the 10.4.0 version of the connector.

    You can install the library by going to the libraries tab in the cluster settings as below:

    Select the Maven as library source and click Search Packages

    Now select Maven Central and type spark-mongodb in the search bar

    Now select version 10.3.0 and make sure that your clusters scala version matches with the connector's scala version