pythontensorflowtensorflow-servingtensorflow-estimatortensorflow-transform

TF hub module variables used in preprocessing not exported in Checkpoints during training


I'm using tensorflow_transform to pre-process text data using a TF Hub Module and later use the derived features for model training. I tried to provide a minimum working example below.

pipeline.py

1) embeds two texts using NNLM
2) calculates the cosine distance between them
3) writes the preprocessed data into a .csv file.
4) exports the transform_fn function/preprocessing graph to be used later for serving
5) run python pipeline.py

    import tensorflow as tf

    import apache_beam as beam
    from tensorflow_transform.beam.tft_beam_io import transform_fn_io
    from apache_beam.options.pipeline_options import SetupOptions
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.io import WriteToText

    import tensorflow_transform.beam.impl as beam_impl
    from tensorflow_transform.coders.csv_coder import CsvCoder
    from tensorflow_transform.tf_metadata import dataset_metadata, dataset_schema

    import tensorflow_hub as hub

    tf_input_raw_feature_spec = {
        'text_1': tf.FixedLenFeature([], tf.string),
        'text_2': tf.FixedLenFeature([], tf.string),
        'y': tf.FixedLenFeature([], tf.float32),
    }

    SAMPLE_INPUT = [({
        'text_1': 'Help me embed this!',
        'text_2': 'Help me embed this!',
        'y': 1
    }), ({
        'text_1': 'And this as well',
        'text_2': 'Lunch Lunch Lunch',
        'y': 0
    })]

    tf_input_metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(tf_input_raw_feature_spec))


    def tf_transform_preprocessing(inputs):
        outputs = {}

        module = hub.Module("https://tfhub.dev/google/nnlm-de-dim128-with-normalization/1")

        text_1_embed = module(inputs['text_1'])
        text_2_embed = module(inputs['text_2'])

        # Calculate Cosine Similarity
        question_normalized = tf.nn.l2_normalize(text_1_embed, 1)
        content_normalized = tf.nn.l2_normalize(text_2_embed, 1)
        outputs['cosine_similarity'] = tf.reduce_sum(tf.multiply(question_normalized, content_normalized),
                                                     keepdims=True,
                                                     axis=1)
        outputs['y'] = inputs['y']

        return outputs


    def run():
        pipeline_options = PipelineOptions()
        pipeline_options.view_as(SetupOptions).save_main_session = True

        with beam.Pipeline(options=pipeline_options) as p,\
                beam_impl.Context(temp_dir='./tmp'):

            pcoll_text = p | beam.Create(SAMPLE_INPUT)

            transformed_dataset, transform_fn = (
                (pcoll_text, tf_input_metadata)
                | 'Analyze and Transform' >> beam_impl.AnalyzeAndTransformDataset(tf_transform_preprocessing))

            transformed_data, transformed_metadata = transformed_dataset

            column_names = transformed_metadata.schema.as_feature_spec().keys()

            (transformed_data | ' Write PCollection to GCS, csv' >> WriteToText(
                file_path_prefix='./preprocessed_output',
                num_shards=1,
                coder=CsvCoder(column_names=column_names, schema=transformed_metadata.schema),
                compression_type='uncompressed',
                header=','.join(column_names)))

            transform_fn | 'Write transformFn' >> transform_fn_io.WriteTransformFn('./metadata')


    if __name__ == '__main__':
        run()

Input:

SAMPLE_INPUT = [({
    'text_1': 'Help me embed this!',
    'text_2': 'Help me embed this!',
    'y': 1
}), ({
    'text_1': 'And this as well',
    'text_2': 'Lunch Lunch Lunch',
    'y': 0
})]

Preprocessed Output in preprocessed_output-00000-of-00001.csv:

y,cosine_similarity
1.0,1.0000001
0.0,0.1290714

train.py

1) trains a tf.estimator.LinearRegressor on the pre-processed data
2) Periodically evaluates and exports the model using Checkpoints
3) During this evaluation it also exports the serving_input_receiver_fn that I later want to use for serving it in production. Since I want to feed raw data to the model while serving, I apply the exported tf-transform transformations in the serving_input_fn.
4) run python train.py

from sys import argv
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema

tf_input_raw_feature_spec = {
    'text_1': tf.FixedLenFeature([], tf.string),
    'text_2': tf.FixedLenFeature([], tf.string),
    'y': tf.FixedLenFeature([], tf.float32),
}

tf_input_metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(tf_input_raw_feature_spec))


def make_input_fn(input_file_pattern, num_epochs, batch_size, label_variable, shuffle=False):
    return tf.contrib.data.make_csv_dataset(file_pattern=input_file_pattern,
                                            batch_size=batch_size,
                                            label_name=label_variable,
                                            num_epochs=num_epochs,
                                            shuffle=shuffle)


def make_serving_input_fn(tf_transform_output):
    tf_transform_output.load_transform_graph()
    raw_feature_spec = tf_input_metadata.schema.as_feature_spec()
    raw_feature_spec.pop('y')

    def serving_input_fn():
        raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(raw_feature_spec,
                                                                                   default_batch_size=None)
        serving_input_receiver = raw_input_fn()

        # Apply the transform function on raw input
        raw_features = serving_input_receiver.features
        transformed_features = tf_transform_output.transform_raw_features(raw_features)
        return tf.estimator.export.ServingInputReceiver(transformed_features, serving_input_receiver.receiver_tensors)

    return serving_input_fn


def train(args):
    tf.logging.set_verbosity(tf.logging.INFO)
    tf_transform_output = tft.TFTransformOutput(args['tf_transform'])

    # model and all outputs under this relative path
    model_dir = './logs/'

    train_input_files = ['preprocessed_output-00000-of-00001']

    tf.logging.info(train_input_files)

    def train_input_fn():
        return make_input_fn(input_file_pattern=train_input_files,
                             num_epochs=args['num_epochs'],
                             batch_size=args['batch_size'],
                             label_variable=args['label_variable'],
                             shuffle=True)

    eval_input_files = ['preprocessed_output-00000-of-00001']

    tf.logging.info(eval_input_files)

    def eval_input_fn():
        return make_input_fn(input_file_pattern=eval_input_files,
                             num_epochs=1,
                             batch_size=args['batch_size'],
                             label_variable=args['label_variable'])

    feature_columns = [tf.feature_column.numeric_column(key='cosine_similarity')]

    estimator = tf.estimator.LinearRegressor(feature_columns=feature_columns, model_dir=model_dir)

    train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps=args['train_max_steps'])

    serving_input_receiver_fn = make_serving_input_fn(tf_transform_output)

    exporter = tf.estimator.LatestExporter(name='model_export', serving_input_receiver_fn=serving_input_receiver_fn)

    eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=None, exporters=[exporter], throttle_secs=150)

    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)


if __name__ == '__main__':
    args = {
        'tf_transform': './metadata',
        'num_epochs': 10,
        'batch_size': 1,
        'label_variable': 'y',
        'train_max_steps': 1000
    }
    train(args)

Issue

Whenever I run train.py it successfully

but always fails when it tries to restore from Checkpoint and continue training with the following Error message:

NotFoundError (see above for traceback): Restoring from checkpoint failed. This is most likely due to a Variable name or other graph key that is missing from the checkpoint. Please ensure that you have not altered the graph expected based on the checkpoint. Original error:

Key transform/module/embeddings not found in checkpoint
         [[node save/RestoreV2_1 (defined at /.../env/lib/python2.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:924) ]]

From what I understand it fails to restore parts of the TF Hub module graph used in the preprocessing step (transform/module/embeddings). Removing the exporter from eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=None, exporters=[exporter], throttle_secs=150) lets training complete successfully, but obviously doesn't export any saved_model.

TLDR

How do I use a TF Hub module in tf-transform preprocessing and apply those data transformations in a serving environment in conjunction with a trained model?

Appendix

requirements.txt

apache-beam[gcp]==2.11
tensorflow-transform==0.13
tensorflow==1.13.1
tensorflow-hub==0.4.0

Thanks a lot in advance!


Solution

  • Answered in Github. Following is the link, https://github.com/tensorflow/transform/issues/125#issuecomment-514558533.

    Posting the answer here for the benefit of community.

    Adding tftransform_output.load_transform_graph() to train_input_fn will resolve the issue. This relates to the way tf.Learn works. In your serving graph, it tries to read from the training checkpoint, but because you are using materialized data, your training graph doesn't contain the embedding.

    Below is the code for the same:

    def train_input_fn():
            tf_transform_output.load_transform_graph()
            return make_input_fn(input_file_pattern=train_input_files,
                                 num_epochs=args['num_epochs'],
                                 batch_size=args['batch_size'],
                                 label_variable=args['label_variable'],
                                 shuffle=True)