I am aware that there have been questions regarding wildcards in pySparks .load()
-function like here or here.
Anyhow, none of the questions/answers I found dealt with my variation of it.
In pySpark I want to load files directly from HDFS because I have to use databricks avro-library for Spark 2.3.x. I'm doing so like this:
partition_stamp = "202104"
df = spark.read.format("com.databricks.spark.avro") \
.load(f"/path/partition={partition_stamp}*") \
.select("...")
As you can see the partitions are deriving from timestamps in the format yyyyMMdd
.
Currently I only get all partitions used for April 2021 (partition_stamp = "202104"
).
However, I need all partitions starting from April 2021.
Written in pseudo-code, I'd need a solution something alike this:
.load(f"/path/partition >= {partition_stamp}*")
Since there actually exist several hundred partitions it is no use to do it in any fashion that requires hard-coding.
So my question is: Is there a function for conditional file-loading?
As I learned there exist only the following options to dynamically process paths inside the .load()
-function:
*: Wildcard for any character or sequence of characters until the end of the line or a new sub-directory ('/') -> (/path/20200*)
[1-3]: Regex-like inclusion of a defined character-range -> (/path/20200[1-3]/...)
{1,2,3}: Set-like inclusion of a defined set of characters -> (/path/20200{1,2,3}/...)
Thus, to answer my question: There is no built-in function for conditional file-loading.
import pandas as pd # Utilize pandas date-functions
partition_stamp = ",".join((set(
str(_range.year) + "{:02}".format(_range.month)
for _range in pd.date_range(start=start_date, end=end_date, freq='D')
)))
df = spark.read.format("com.databricks.spark.avro") \
.load(f"/path/partition={{{partition_stamp}}}*") \
.select("...")
This way the restriction for a timestamp of format yyyyMM
is generated dynamically for a given start- and end-date and the string-based .load()
is still usable.