I'm working on a PySpark application running on Amazon EMR, where my task involves downloading files based on a url from a DataFrame. The goal is to continuously download these files on an EMR executor until a specified file size limit is reached. Once this limit is hit, I intend to bundle these files into a tar archive. However, I'm facing a challenge with enforcing the file size limit correctly before initiating the tar file creation process. I've tried to employ the mapPartitions
function with the intention of batching multiple files until they collectively reach a file size limit, after which they should be archived into a single tar file.
The process should work as follows:
Here's a simplified version of my approach:
from pyspark.sql import SparkSession
import os
import uuid
from pyspark.sql.functions import spark_partition_id
# Initialize Spark session
spark = SparkSession.builder.appName("DownloadandTar").getOrCreate()
# Sample DataFrame of files
files = [("file1",), ("file2",), ("file3",), ...]
schema = ["file"]
df = spark.createDataFrame(files, schema=schema)
# Define the file size limit (e.g., 100MB)
FILE_SIZE_LIMIT = 100 * 1024 * 1024 # 100MB
def download_and_tar_files(partition):
accumulated_size = 0
files_to_tar = []
for row in partition:
file = row.filename
file_path, file_size = download_file(file)
files_to_tar.append(file_path)
accumulated_size += file_size
if accumulated_size >= FILE_SIZE_LIMIT:
tar_file_name = f"{uuid.uuid4()}.tar"
create_tar_file(files_to_tar, tar_file_name)
files_to_tar = []
accumulated_size = 0
# Handle any remaining files
if files_to_tar:
tar_file_name = f"{uuid.uuid4()}.tar"
create_tar_file(files_to_tar, tar_file_name)
# Apply the function to each partition
df.repartition(spark_partition_id()).foreachPartition(download_and_tar_files)
Could anyone provide insight into potential issues with my size calculation or batching logic? I've also tried to employ the map
function with the intention of processing each row of file name. Despite this approach, my current implementation results in creating a tar file for each individual file, failing to accumulate multiple files before reaching the size threshold. This leads to an output where each tar archive contains only one file, rather than the intended batch processing until the specified size limit is met.
The issue I encountered with the FILE_SIZE_LIMIT
not being adhered to on AWS EMR's core nodes stems from the way global variables are managed and accessed within Spark's distributed architecture. Specifically, FILE_SIZE_LIMIT
, being a global variable, was initialized on the EMR cluster's primary (master) node. However, this variable was not directly accessible on the core (executor) nodes due to each node operating in its own memory and process space.
After a bit of deep dive I figured out that in Spark applications, the driver (running on the primary node) and the executors (running on core nodes) do not share the same memory space. This means that global variables declared in the driver's script are not automatically propagated to the executors. As a result, any logic relying on FILE_SIZE_LIMIT
within executor nodes would not function as expected because the executors did not have the correct value of this variable.
To resolve this, I needed to explicitly pass the FILE_SIZE_LIMIT
value to the executor nodes. This was achieved by including it as an argument to the functions being executed on the executors.