I'm new to spark :) I try to use CrossValidator. My model is as follows :
training
#training data - several repartition have been tested, 50/50 seems the best
(trainData, testData) = modelData.randomSplit([0.5, 0.5])
#counting data used
print("Training dataset count : " +str(trainData.count()))
print("Test dataset count : " +str(testData.count()))
trainData.cache()
testData.cache()
Model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'v4_Indexer', maxIter = 5)
lrModel = lr.fit(trainData)
predictions = lrModel.transform(testData)
predictions.select('v4_Indexer','features','rawPrediction', 'prediction', 'probability').toPandas().head(2500)
I try this code for crossvalidation :
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[lr])
paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0,0.5,1]).addGrid(lr.elasticNetParam, [0,0.5,1]).addGrid(lr.maxIter,[1,10]).build())
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(trainData)
trainingSummary = cvModel.bestModel
I have a warning /databricks/spark/python/pyspark/ml/util.py:92: UserWarning: CrossValidator_7ba8c8c903af fit call failed but some spark jobs may still running for unfinished trials. To address this issue, you should enable pyspark pinned thread mode. warnings.warn("{} fit call failed but some spark jobs "
And an error : IllegalArgumentException: label does not exist. Available: v4_Indexer, features, CrossValidator_7ba8c8c903af_rand
this model worked for a while. I do not understand why it doesn't now.
Thx in advance for any help you could bring me =)
I've solved the issue by changing entirely my code. This is what it looks like now (prerequisite %pip install mlflow):
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
# StringIndexer: Convert the input column "label" (digits) to categorical values
indexer = StringIndexer(inputCol="v4_Indexer", outputCol="indexedLabel")
# Create an evaluator. In this case, use "weightedPrecision".
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="v4_Indexer", metricName="weightedPrecision")
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# DecisionTreeClassifier: Learn to predict column "indexedLabel" using the "features" column
dtc = DecisionTreeClassifier(labelCol="indexedLabel")
# Chain indexer + dtc together into a single ML Pipeline
pipeline = Pipeline(stages=[indexer, dtc])
# Define the parameter grid to examine.
grid = ParamGridBuilder().addGrid(dtc.maxDepth, [2, 3, 4, 5, 6, 7, 8]).addGrid(dtc.maxBins, [2, 4, 8]).build()
# Create a cross validator, using the pipeline, evaluator, and parameter grid you created in previous steps.
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3)
# Explicitly create a new run.
# This allows this cell to be run multiple times.
# If you omit mlflow.start_run(), then this cell could run once, but a second run would hit conflicts when attempting to overwrite the first run.
import mlflow
import mlflow.spark
with mlflow.start_run():
# Run the cross validation on the training dataset. The cv.fit() call returns the best model it found.
cvModel = cv.fit(train)
# Evaluate the best model's performance on the test dataset and log the result.
test_metric = evaluator.evaluate(cvModel.transform(test))
mlflow.log_metric('test_' + evaluator.getMetricName(), test_metric)
# Log the best model.
mlflow.spark.log_model(spark_model=cvModel.bestModel, artifact_path='best-model')