I got this issue, when I ingested/wrote data to FeatureSet (part of MLRun FeatureStore) and than I read the data via PySpark (it seems as invalid parquet). See exception:
AnalysisException Traceback (most recent call last)
<ipython-input-8-a8c688f9ceb5> in <module>
----> 1 newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
2 newDF1.show()
/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths, **options)
299 int96RebaseMode=int96RebaseMode)
300
--> 301 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
302
303 def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
1320 answer = self.gateway_client.send_command(command)
1321 return_value = get_return_value(
-> 1322 answer, self.gateway_client, self.target_id, self.name)
1323
1324 for temp_arg in temp_args:
/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise
AnalysisException: Unable to infer schema for Parquet. It must be specified manually.
See the key part of source code (which generated the exception):
...
feature_set1=fstore.FeatureSet(name="FS-ingest",entities=[fstore.Entity('app'),fstore.Entity('id')],engine="spark",timestamp_key='time')
feature_set1.set_targets(targets=[ParquetTarget(name="s1",partitioned=False),NoSqlTarget(name="s2")],with_defaults=False)
feature_set1.save()
fstore.ingest(f"store://feature-sets/{project_name}/FS-ingest", sparkDF,spark_context=spark, overwrite=True)
...
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
newDF1.show()
Did you see similar issue?
NOTE: Parquet path contains parquet files (all files are valid), it means the ingestion was succesful.
The source code (usage of parquet) contains mistake. The FeatureSet used two targets, online and offline store and in this case, the spark.read.parquet affected also online storage, where is different format than parquet. I see two possible solutions.
1. Update parquet read part
It is easy way, how to solve the issue. Simple, extend/add current path /parquet
, see updated code:
...
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest/parquet")
newDF1.show()
...
2. Remove online/NoSql target
It is about update of FeatureSet definition (remove NoSqlTarget(name="s2")
) and keep spark.read.parquet part see updated code:
...
feature_set1=fstore.FeatureSet(name="FS-ingest",entities=[fstore.Entity('app'),fstore.Entity('id')],engine="spark",timestamp_key='time')
feature_set1.set_targets(targets=[ParquetTarget(name="s1",partitioned=False)],with_defaults=False)
feature_set1.save()
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
newDF1.show()
...
BTW: The same solution is valid also for this different exception, which contains more exact issue description (with view to the different path to online and offline store):
Py4JJavaError: An error occurred while calling o3233.parquet.
: java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:
v3io://projects/spark-parquet-test2/featurestore/FS-ingest/nosql/sets/FS-ingest/1674747966078_84
v3io://projects/spark-parquet-test2/featurestore/FS-ingest/parquet/sets/FS-ingest/1674747966078_84
If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:178)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:110)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:158)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:73)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:50)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169)