I am trying to transform a dictionary into SPARK dataframe. But it is appending all my values to a single row. For my final result I want to have a SPARK dataframe containing 3 rows corresponding to each unique_survey_id.
Write a PySpark code for the same.
inferenced_df=
{
**'unique_survey_id'**: ['0001', '0002', '0003'],
'**verbatim**': ["My name is John", "I am 23 yrs old, "I live in US"],
'**classification_critical_process_fg**': [0, 0, 0],
'**reason_critical_process_fg**': [**{**"Customer's Issue": "I wish there were more providers ", 'Status of Resolution': 'Unresolved', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': "Although the issue is unresolved, So flag is 0"**}**,
**{**"Customer's Issue": 'I am trying to make a payment', 'Status of Resolution': 'Unresolved', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': "Although the issue is unresolved So flag is 0"**}**,
**{**"Customer's Issue": '', 'Status of Resolution': '', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': 'The review does not mention any issue or negative experience. So the flag is 0'**}**],
'**classification_critical_technical_fg**': ['No', 'No', 'No'],
'**reason_critical_technical_fg**': ['The review mentions difficulty in finding provider.', 'The review mentions an unresolved issue ', 'The review does not mention any technical issues'],
'**classification_critical_crc_escalation_fg**': ['Yes', 'Yes', 'No'],
'**reason_critical_crc_escalation_fg**': ['The customer is expressing frustration.', 'The customer is expressing dissatisfaction', 'The review does not mention any unresolved issues.'],
'**classification_insight_experience_fg**': ['Yes', 'No', 'Yes'],
'**reason_insight_experience_fg**': ["The review mentions a suggestion", 'The review mentions an unresolved', "The review explicitly mentions positive feedback"],
'**classification_insight_process_fg**': [0, 0, 0],
'**reason_insight_process_fg**': [**{**"Customer's Issue": "I need a diabetic eye exam ", 'Status of Resolution': 'Unresolved', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': 'Customer has just stated the issue.**}**, **{**"Customer's Issue": 'I am trying to make a payment ', 'Status of Resolution': 'Unresolved', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': 'Customer has just stated the issue.**}**,**{**"Customer's Issue": '', 'Status of Resolution': '', "Verbatim chunk explaining customer's efforts": '', 'Reason for classification': "The customer review does not mention any issue or negative experience."**}**]
}
Current Output
The code is:
from pyspark.sql.types import StructType, StructField, StringType
def alerts_inference(inferenced_df) :
schema = StructType([
StructField("unique_survey_id", StringType(), True),
StructField("verbatim", StringType(), True),
StructField("classification_critical_crc_escalation_fg", StringType(), True),
StructField("reason_critical_crc_escalation_fg", StringType(), True),
StructField("classification_critical_technical_fg", StringType(), True),
StructField("reason_critical_technical_fg", StringType(), True),
StructField("classification_critical_process_fg", StringType(), True),
StructField("reason_critical_process_fg", StringType(), True),
StructField("classification_insight_experience_fg", StringType(), True),
StructField("reason_insight_experience_fg", StringType(), True),
StructField("classification_insight_process_fg", StringType(), True),
StructField("reason_insight_process_fg", StringType(), True)
])
inferenced_df = spark.createDataFrame([inferenced_df],schema)
return inferenced_df
**EXPECTED O/P**: A dataframe containing 3 rows corresponding to each unique_survey_id and its corresponding columns.
You can create a Spark dataframe from a Pandas dataframe:
pandas_df = pd.DataFrame(inferenced_df)
inferenced_df = spark.createDataFrame(pandas_df,schema)