I'm a newbie in PySpark, and I want to translate the preprocessing including encoding and normalizing part scripts which are pythonic, into PySpark for synthetic data. (Columns A & C are categorical) At first, I have Spark data frame so-called sdf
including 5 columns:
Below is the example:
#+----------+-----+---+-------+----+
#|A |B |C |D |E |
#+----------+-----+---+-------+----+
#|Sentence |92 |6 |False |49 |
#|Sentence |17 |3 |False |15 |
#|Sentence |17 |3 |False |15 |
#|- |0 |0 |False |0 |
#|- |0 |0 |False |0 |
#|- |0 |0 |False |0 |
#+----------+-----+---+-------+----+
Now I want to allocate statistic frequency besides other features and concat the results with sdf
. So far, I can do it using pythonic scripts:
#import libs
import copy
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import scale
from sklearn import preprocessing
#Statistical Preprocessing
def add_freq_to_features(df):
frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
return new_df
# Encode and Normalize
def normalize_features(df):
temp_df = df.copy()
le = preprocessing.LabelEncoder()
#le.fit(temp_df)
temp_df[["A", "C"]] = temp_df[["A", "C"]].apply(le.fit_transform)
for column in ["A", "B", "C", "D", "E"]:
#-1: all rows selected into 1
# reshape(1, -1) select one row contains all columns/features
temp_df[column] = MinMaxScaler().fit_transform(temp_df[column].values.reshape(-1, 1))
return temp_df
# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features(features_df)
#Apply Encoding and Normalizing function
normalized_features_df = normalize_features(features_df)
to_numeric_columns = ["A", "B" , "C", "D", "E", "Freq"]
normalized_features_df[to_numeric_columns] = normalized_features_df[to_numeric_columns].apply(pd.to_numeric)
#normalized_features_df
Problem: what is the best approach to translating Preprocessing without converting Spark dataframe to Pandas datafarame toPandas()
to optimize the pipeline and process it 100% spark form?
The expected output is shown below in the form of a Spark dataframe:
#+----------+-----+---+-------+----+----------+
#|A |B |C |D |E |Freq |
#+----------+-----+---+-------+----+----------+
#|Sentence |92 |6 |False |49 |0.166667 |
#|Sentence |17 |3 |False |15 |0.333333 |
#|Sentence |17 |3 |False |15 |0.333333 |
#|- |0 |0 |False |0 |0.500000 |
#|- |0 |0 |False |0 |0.500000 |
#|- |0 |0 |False |0 |0.500000 |
#+----------+-----+---+-------+----+----------+
Spark has Spark MLlib package that is designed for feature engineering and Machine Learning purpose. That being said, you shouldn't build features manually like what you're doing with Pandas. At the end of the day, you still have to use Spark to build your models, so why not start using Spark ML properly? I strongly suggest reading through few sections like building features, building pipelines, then classification/regression, and few other algorithms.
Going back to your original question, this is Spark version of your sample code (I also ran it in your notebook with a little change to fit with your variables.)
# this is to build "raw" Freq
sdf2 = (sdf
.groupBy(sdf.columns)
.agg(F.count('*').alias('Freq'))
.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
)
sdf2.cache().count()
sdf2.show()
# this is to normalize features using MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
type_indexer = StringIndexer(inputCol='Type', outputCol='Type_Cat')
encoding_indexer = StringIndexer(inputCol='Encoding_type', outputCol='Encoding_Type_Cat')
assembler = VectorAssembler(inputCols=['Type_Cat', 'Length', 'Token_number', 'Encoding_Type_Cat', 'Character_feature', 'Freq'], outputCol='features')
scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')
pipeline = Pipeline(stages=[type_indexer, encoding_indexer, assembler, scaler])
# Compute summary statistics and generate model
model = pipeline.fit(sdf2)
# rescale each feature to range [min, max].
model.transform(sdf2).show(10, False)
# Output
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
# |Type |Length|Token_number|Encoding_type|Character_feature|Freq|Type_Cat|Encoding_Type_Cat|features |scaled_features |
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
# |String|8 |0 |true |7 |1 |0.0 |0.0 |[0.0,8.0,0.0,0.0,7.0,1.0]|[0.5,1.0,0.5,0.5,1.0,0.5]|
# |String|0 |0 |true |0 |1 |0.0 |0.0 |(6,[5],[1.0]) |[0.5,0.0,0.5,0.5,0.0,0.5]|
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+