I am working on a PySpark script to perform a simple word count. My script runs fine, but I encounter an error when trying to save the results using saveAsTextFile (Now I'm on ubuntu). Here's the error I get:
py4j.protocol.Py4JJavaError: An error occurred while calling o48.saveAsTextFile.
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/pyspark_python/wordcount/output_new already exists
Here are the steps I have taken so far:
Verified that the output directory does not contain any data (ls shows it is empty). Deleted and recreated the directory using rm -r and mkdir -p. Ensured no other Spark jobs are running (ps aux | grep spark).
Despite this, the error persists when I re-run the script.
Here is the code I am using :
from pyspark import SparkConf, SparkContext
import os
def main(input_file, output_dir):
# Configuration Spark
conf = SparkConf().setAppName("WordCountTask").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Lecture du fichier d'entrée
text_file = sc.textFile(input_file)
# Comptage des mots
counts = (
text_file.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
)
# Sauvegarde des résultats
if not os.path.exists(output_dir):
os.makedirs(output_dir)
counts.saveAsTextFile(output_dir)
print(f"Résultats sauvegardés dans le répertoire : {output_dir}")
if __name__ == "__main__":
# Définir les chemins d'entrée et de sortie
input_file = r"/home/othniel/pyspark_python/wordcount/input/loremipsum.txt"
output_dir = "/home/othniel/pyspark_python/wordcount/output_new"
# Exécution de la tâche WordCount
main(input_file, output_dir)
How can I resolve this error and ensure PySpark successfully writes to the output directory ? Is there something specific I need to configure in my script or environment ?
Thank you for your help!
Your main issue is that Spark's RDD saveAsTextFile
method attempts to create a new directory at the location denoted by its first filepath
argument [0].
You've already created the output_dir
directory, so spark is throwing an error as it cannot create the directory for you.
To solve this, you can add an additional path element to the argument you pass to the saveAsTextFile
method.
counts.saveAsTextFile(os.path.join(output_dir, "counts.txt"))
This will create a 'container' directory called counts.txt
which contains 1x text file for each partition of the RDD (you can see the number of partitions for an RDD with the getNumPartitions
method [1] before writing).
For an RDD with 2x partitions, you will end up with 2x files in this container directory. The important ones are the part-*
files, which contain the actual text data for each partition:
./output_new/counts.txt/part-00000
./output_new/counts.txt/part-00001
If you want to read all of the data back into a single RDD [2] later on, you can use this:
rdd = sc.textFile(os.path.join(output_dir, "counts.txt"))
While a counts.txt
directory name may look odd at first glance, it is good practice for two reasons:
It indicates the format of the partition data files stored within that directory. For example, a *.txt
container directory likely contains files in a different format to a container directory suffixed with *.parquet
.
When saving multiple RDDs from the same script, you would want to ensure each RDD is written into their own container directory. Otherwise data from one RDD write operation can be overwrite data for another.
In the land of Spark APIs, container directories are usually treated as a "file" abstraction. That way we don't need to worry about reading individual partition files. Especially when it comes to the DataFrame API.
[0]: saveAsTextFile docs: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.saveAsTextFile.html
[1]: getNumPartitions docs: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.getNumPartitions.html
[2]: be aware that the data will be read in as string types, and you'll need to parse out whatever you've stored into their proper types.