Data -
Training and testing data i have is very large in size ~ 150gb and highly imbalanced as well 99% neg labels/ 1% pos labels, and i cannot downsample as its a very imp information as well so currently using a weighted estimator.
Problem -
If we use spark way of splitting using sample() function and save to multiple files then there is a high chance of having the neg samples only in one of the files out of many (say 1 in 100) which causes a problem while ingesting the data as have only positive samples being fed to the estimator that causes zero loss and model cannot learn.
Moreover, i do use shuffle when making a batch but input function takes multiple files as the input so batches are created by shuffling from data within each file, which causes the model to be fed only neg cases for a very very long time until shuffle happens for the file which has neg samples.
Is there a better way to make sure while saving data using pyspark each file saved by spark has samples from both the classes/labels (preferably in the same ratio as overall data pos/neg ratio) ?
I have tried to use one big file to feed and shuffle works fine in those cases but when we have many files fed it creates problem of zero loss as only samples from one class is being fed into the model.
using the following input function in tensorflow code -
def csv_input_fn(files_name_pattern, mode=tf.estimator.ModeKeys.EVAL,
skip_header_lines=0,
num_epochs=None,
batch_size=1000):
shuffle = True if mode == tf.estimator.ModeKeys.TRAIN else False
num_threads = multiprocessing.cpu_count() if MULTI_THREADING else 1
print("")
print("* data input_fn:")
print("================")
print("Input file(s): {}".format(files_name_pattern))
print("Batch size: {}".format(batch_size))
print("Epoch Count: {}".format(num_epochs))
print("Mode: {}".format(mode))
print("Thread Count: {}".format(num_threads))
print("Shuffle: {}".format(shuffle))
print("================")
print("")
file_names = tf.matching_files(files_name_pattern)
dataset = data.TextLineDataset(filenames=file_names)
dataset = dataset.skip(skip_header_lines)
if shuffle:
dataset = dataset.shuffle(buffer_size=2 * batch_size + 1)
dataset = dataset.batch(batch_size)
dataset = dataset.map(lambda csv_row: parse_csv_row(csv_row),
num_parallel_calls=num_threads)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()
features, target = iterator.get_next()
return features, target
Any suggestions would be appreciated! thanks
Found the answer to my own question, so you can change the buffer_size to be the number of elements/rows in the complete dataset and in that way we can make sure that the indices that are randomly assigned using shuffle would be uniform as now the shuffling is done using the whole dataset.
code changed -
if shuffle:
dataset = dataset.shuffle(buffer_size='total training instances size')