Context: Read a group of files and create a scala spark dataset. Number of files is arround 500,000. To avoid excess maximum connection pool, need to read files in batches, 3000 per batch.
This code should work, allDS should have data from all 500K files. But it's a bit odd to create var allDS from first file.
// fileList contain 500K fileName (e.g. ["/file01.orc",...,"/file0x.orc"])
var allDS = spark.read.orc(fileList(0))
for (fromIdx <- 1 until 500000 by 3000){
val toIdx = fromIdx + 3000
val subList = fileList.slice(fromIdx, toIdx)
val subDS = spark.read.orc(subList: _*)
allDS = allDS.union(subDS)
}
How can I make the code more concise?
Would probably be faster if you can reorganize your files in S3 first.
E.g. say you have
s3-root
├── file01.orc
├── file02.orc
├── .....
├── file3000.orc
├── file3001.orc
├── file3002.orc
├── .....
├── file6000.orc
└── .....
Rename that to:
s3-root
├── part=1
│ ├── file01.orc
│ ├── file02.orc
│ ├── .....
│ └── file3000.orc
├── part=3001
│ ├── file3001.orc
│ ├── file3002.orc
│ ├── .....
│ └── file6000.orc
└── .....
Then let spark do the parallelism and partitioning for you, instead of you doing union()
:
val allDS = spark.read.orc('s3-root')
This should give you a dataset with an extra part
column, which you can drop if you don't need.
Assuming scala API works same as python when it comes to reading data from partitioned folder structure.