apache-sparkpysparkazure-databricksfile-management

Looping through files in databricks fails


Continuation to Managing huge zip files in dataBricks

Databricks hangs after 30 files. What to do?

I have split huge 32Gb zip into 100 stand-alone pieces. I've split header from the file and can thus process it like any CSV-file. I need to filter the data based on columns. Files are in Azure Data Lake Storage Gen1 and must be stored there.

Trying to read single file (or all 100 files) at once fails after working for ~30 min. (see linked question above.)

What I've done:

def lookup_csv(CR_nro, hlo_lista =[], output = my_output_dir ): 

  base_lib = 'adl://azuredatalakestore.net/<address>'
  all_files = pd.DataFrame(dbutils.fs.ls(base_lib + f'CR{CR_nro}'), columns = ['full', 'name', 'size'])
  done = pd.DataFrame(dbutils.fs.ls(output), columns = ['full', 'name', 'size'])
  all_files = all_files[~all_files['name'].isin(tehdyt['name'].str.replace('/', ''))]
  all_files = all_files[~all_files['name'].str.contains('header')]

  my_scema = spark.read.csv(base_lib + f'CR{CR_nro}/header.csv', sep='\t', header=True, maxColumns = 1000000).schema
  tmp_lst = ['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT'] + [i for i in hlo_lista  if i in my_scema.fieldNames()]

  for my_file in all_files.iterrows(): 
    print(my_file[1]['name'], time.ctime(time.time()))
    data = spark.read.option('comment', '#').option('maxColumns', 1000000).schema(my_scema).csv(my_file[1]['full'], sep='\t').select(tmp_lst)
    data.write.csv( output + my_file[1]['name'], header=True, sep='\t')

This works... Kinda. It works thought ~30 files and then hangs up on

Py4JJavaError: An error occurred while calling o70690.csv. ​ Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 154.0 failed 4 times, most recent failure: Lost task 0.3 in stage 154.0 (TID 1435, 10.11.64.46, executor 7): com.microsoft.azure.datalake.store.ADLException: Error creating file <my_output_dir>CR03_pt29.vcf.gz/_started_1438828951154916601 Operation CREATE failed with HTTP401 : null Last encountered exception thrown after 2 tries. [HTTP401(null),HTTP401(null)]

I tried to add some deletion and sleeps:

   ​data.unpersist()
   ​data = []
   ​time.sleep(5)

Also some try-exception tries.

for j in range(1,24): 
    for i in range(4): 
        try: 
            lookup_csv(j, hlo_lista =FN_list, output = blake +f'<my_output>/CR{j}/' )
        except Exception as e:
            print(i, j, e)
            time.sleep(60)

No luck with these. Once it fails, it keeps failing.

Any idea how to handle this issue? I'm thinking that connection to ADL-drive fails after a time, but if I queue the commands:

lookup_csv(<inputs>) 
<next cell> 
lookup_csv(<inputs>) 

it works, fails and works next cell just fine. I can live with this, but is highly annoying that basic loop fails to work in this environment.


Solution

  • The best solution would be to permanently mount ADSL storage and use azure app for that.

    In Azure please go to App registrations - register app with name for example "databricks_mount". Add IAM role "Storage Blob Data Contributor" for that app in your delta lake storage.

    configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", 
          "fs.azure.account.oauth2.client.id": "<your-client-id>",
          "fs.azure.account.oauth2.client.secret": "<your-secret>",
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<your-endpoint>/oauth2/token"}
    
    dbutils.fs.mount(
     source = "abfss://delta@yourdatalake.dfs.core.windows.net/",
     mount_point = "/mnt/delta",
     extra_configs = configs)
    

    You can access without mount but still you need to register an app and apply config via spark settings in your notebook to get the access to ADLS. It should be permanent for whole session thanks to azure app:

    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"), 
    spark.conf.set("fs.azure.account.oauth2.client.id", "<your-client-id>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<your-secret>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<your-endpoint>/oauth2/token")
    

    This explanation is the best https://docs.databricks.com/data/data-sources/azure/adls-gen2/azure-datalake-gen2-sp-access.html#access-adls-gen2-directly although I remember that first times I had also problems with that. On that page is also explained how to register an app. Maybe it will be ok for your company policies.