I am trying to create a simple .hdf in the Databricks environment. I can create the file on the driver, but the same code when executed with rdd.map(), it throws following exception.
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 287.0 failed 4 times, most recent failure: Lost task 3.3 in stage 287.0 (TID 1080) (10.67.238.26 executor 1): org.apache.spark.api.python.PythonException: **'RuntimeError: Can't decrement id ref count (file write failed:** time = Tue Nov 1 11:38:44 2022
, filename = '/dbfs/mnt/demo.hdf', file descriptor = 7, errno = 95, error message = '**Operation not supported**', buf = 0x30ab998, total write size = 40, bytes this sub-write = 40, bytes actually written = 18446744073709551615, offset = 0)', from <command-1134257524229717>, line 13. Full traceback below:
I can write the same file on the worker and copy the file back to /dbfs/mnt. However, I was looking for a way through which I can write/modify the .hdf files stored in dbfs/mnt locaction through worker nodes directly.
def create_hdf_file_tmp(x):
import numpy as np
import h5py, os, subprocess
import pandas as pd
dummy_data = [1,2,3,4,5]
df_data = pd.DataFrame(dummy_data, columns=['Numbers'])
with h5py.File('/dbfs/mnt/demo.hdf', 'w') as f:
dset = f.create_dataset('default', data = df_data) # write to .hdf file
return True
def read_hdf_file(file_name, dname):
import numpy as np
import h5py, os, subprocess
import pandas as pd
with h5py.File(file_name, 'r') as f:
data = f[dname]
print(data[:5])
return data
#driver code
rdd = spark.sparkContext.parallelize(['/dbfs/mnt/demo.hdf'])
result = rdd.map(lambda x: create_hdf_file_tmp(x)).collect()
Above is the minimal code that I am trying to run in the Databricsk notebook with 1 driver and 2 worker nodes.
From error message: Operation not supported
, most probably, when writing HDF file, the API uses something like random writes that aren't supported by DBFS (see DBFS Local API limitations in the docs). You will need to write a file to a local disk and then move that file to DBFS mount. But it will work only on the driver node...