pythongpujoblibmulti-gputesla

Multiprocessing for Python parallelization error - "function' object is not iterable"


We have NVIDIA Tesla K80 GPU accelerator computing in our data center with the following characteristics: Intel(R) Xeon(R) CPU E5-2670 v3 @2.30GHz, 48 CPU processors, 128GB RAM, 12 CPU coresrunning under Linux 64-bit.

I am running the following code which does GridSearchCV after vertically appending different sets of dataframes into a single series of data for a RandomForestRegressor model. As an example, the two sample datasets I am considering are found in this link

from joblib import Parallel, delayed
import multiprocessing
import sys
import imp
import glob
import os
import pandas as pd
import math
from sklearn.feature_extraction.text import CountVectorizer 
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
import matplotlib
from sklearn.model_selection import cross_val_score
import matplotlib.pyplot as plt
import numpy as np
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import GridSearchCV
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LassoCV
from sklearn.metrics import r2_score, mean_squared_error, make_scorer
from sklearn.model_selection import train_test_split
from math import sqrt
from sklearn.cross_validation import train_test_split

df = pd.concat(map(pd.read_csv, glob.glob(os.path.join('', "cubic*.csv"))), ignore_index=True)
#df = pd.read_csv('cubic31.csv')

for i in range(1,3):
    df['X_t'+str(i)] = df['X'].shift(i)

print(df)

df.dropna(inplace=True)

X = (pd.DataFrame({ 'X_%d'%i : df['X'].shift(i) for i in range(3)}).apply(np.nan_to_num, axis=0).values)

X = df.drop('Y', axis=1)
y = df['Y']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.40)

X_train = X_train.drop('time', axis=1)
X_test = X_test.drop('time', axis=1)


print(X.shape)
print(df['Y'].shape)

print()
print("Size of X_train:",(len(X_train)))
print("Size of Y_train:",(len(X_train)))
print("Size of X_test:",(len(X_test)))
print("Size of Y_test:",(len(y_test)))

print()

def gridSearchCVParallel():
    #Fit models with some grid search CV=5 (not to low), use the best model
    parameters = {'n_estimators': [10,30,100,500,1000]}
    clf_rf = RandomForestRegressor(random_state=1)
    clf = GridSearchCV(clf_rf, parameters, cv=5, scoring='neg_mean_squared_error')
    model = clf.fit(X_train, y_train)
    model.cv_results_['params'][model.best_index_]
    math.sqrt(model.best_score_*-1)
    model.grid_scores_

    #####
    print()
    print(model.grid_scores_)
    print("The best score: ",model.best_score_)

    print("RMSE:",math.sqrt(model.best_score_*-1))

    #reg = RandomForestRegressor(criterion='mse')
    clf_rf.fit(X_train,y_train)
    modelPrediction = clf_rf.predict(X_test)
    print(modelPrediction)

    print("Number of predictions:",len(modelPrediction))

    meanSquaredError=mean_squared_error(y_test, modelPrediction)
    print("Mean Square Error (MSE):", meanSquaredError)
    rootMeanSquaredError = sqrt(meanSquaredError)
    print("Root-Mean-Square Error (RMSE):", rootMeanSquaredError)


    ####### to add the trendline
    fig, ax = plt.subplots()
    #df.plot(x='time', y='Y', ax=ax)
    ax.plot(df['time'].values, df['Y'].values)


    fig, ax = plt.subplots()
    index_values=range(0,len(y_test))

    y_test.sort_index(inplace=True)
    X_test.sort_index(inplace=True)

    modelPred_test = clf_rf.predict(X_test)
    ax.plot(pd.Series(index_values), y_test.values)


    PlotInOne=pd.DataFrame(pd.concat([pd.Series(modelPred_test), pd.Series(y_test.values)], axis=1))

    plt.figure(); PlotInOne.plot(); plt.legend(loc='best')
NumberOfCores = multiprocessing.cpu_count()

gridResults = Parallel(n_jobs=NumberOfCores)(delayed(gridSearchCVParallel))

print(gridResults)

When I finally run this program for a huge dataset (around 2 million rows), it is taking more than 4 days to do the GridSearchCV. After a bit of search, I found out that Python threads can utilize more than one CPU using either concurrent.futures or multiprocessing. As it shown in my code, I tried to make use of multiplrocessing but I am getting this error TypeError: 'function' object is not iterable. This seems that the function should take a single parameter as an input and we pass in an iterable as the argument. How can I fix this issue so as to utilize more than one CPU and do the task faster in a short period of time?

Thank you in advance.


Solution

  • Do not attempt to parallelize this on your own. Do not use joblib.Parallel. You will be reinventing the wheel, anyway, since GridSearchCV is already-parellized. Just pass n_jobs parameter, which defaults to 1, i.e. it defaults to using a single job. To take advantage of multi-core architecture, pass n_jobs = number_of_cores, where number_of_cores is the number of cores you want to use.

    And if you check the source code, you'll see it basically wraps a call to joblib.Parallel, so n_jobs=-1 should work for "all cores".