I have a pyspark dataframe as below from a Data Quality results table.
+-------------------+---------------+---------+---------------+-------------+---------------------+
|table_name |key_missing |key_value|retailer_name |detail |report_generated_date|
+-------------------+---------------+---------+---------------+-------------+---------------------+
|customer |customer_id |118 |Apple |Missing |2024-06-05 |
|customer |customer_id |349 |Mueller |Missing |2024-06-05 |
|product_line |product_id |XX097h5 |ECOMEDIA AG |Missing |2024-06-05 |
|purchase_master |purchase_id |907 |kit_retailer_id|Duplicates |2024-06-05 |
|activity_summary |act_id |1208vtt |Media Markt |Duplicates |2024-06-05 |
+-------------------+---------------+---------+---------------+-------------+---------------------+
Now, I would like to pick rows related to each table_name
and add it to separate sections in an excel file along with header using pyspark.
How can I achieve this? Thanks in advance.
You can write multiple excel files if this fits your needs and then combine the data files. You need to install the maven excel package for spark by crealitics first.
https://github.com/crealytics/spark-excel
.
You can interate over the data in spark if you want.
import datetime as dt
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.sql import functions as f
data = [
Row(
table_name="customer",
key_missing="customer_id",
),
Row(
table_name="customer",
key_missing="customer_id",
),
Row(
table_name="product_line",
key_missing="product_id",
),
]
schema = StructType([
StructField(name="table_name", dataType=StringType()),
StructField(name="key_missing", dataType=StringType()),
])
df = spark.createDataFrame(data=data, schema=schema)
table_names = [row.table_name for row in df.select("table_name").distinct().collect()]
header = "true"
mode = "overwrite"
path = "dbfs:/mnt/your_path/" # this is a databricks filepath
#see here https://github.com/crealytics/spark-excel
for table_name in table_names:
df_selected_table_rows = df.filter(f.col("table_name") == f"{table_name}")
(
df_selected_table_rows
.write.format("com.crealytics.spark.excel")
.option("header", header)
.option("maxRowsInMemory", 5)
.mode(mode)
.save(path + table_name + ".xlsx")
)
It ends up in the files:
# Result table customer
+----------+-----------+
|table_name|key_missing|
+----------+-----------+
| customer|customer_id|
| customer|customer_id|
+----------+-----------+
# Result table product_line
+------------+-----------+
| table_name|key_missing|
+------------+-----------+
|product_line| product_id|
+------------+-----------+