I'm using tensorflow_transform
to pre-process text data using a TF Hub Module and later use the derived features for model training. I tried to provide a minimum working example below.
1) embeds two texts using NNLM
2) calculates the cosine distance between them
3) writes the preprocessed data into a .csv
file.
4) exports the transform_fn
function/preprocessing graph to be used later for serving
5) run python pipeline.py
import tensorflow as tf
import apache_beam as beam
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText
import tensorflow_transform.beam.impl as beam_impl
from tensorflow_transform.coders.csv_coder import CsvCoder
from tensorflow_transform.tf_metadata import dataset_metadata, dataset_schema
import tensorflow_hub as hub
tf_input_raw_feature_spec = {
'text_1': tf.FixedLenFeature([], tf.string),
'text_2': tf.FixedLenFeature([], tf.string),
'y': tf.FixedLenFeature([], tf.float32),
}
SAMPLE_INPUT = [({
'text_1': 'Help me embed this!',
'text_2': 'Help me embed this!',
'y': 1
}), ({
'text_1': 'And this as well',
'text_2': 'Lunch Lunch Lunch',
'y': 0
})]
tf_input_metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(tf_input_raw_feature_spec))
def tf_transform_preprocessing(inputs):
outputs = {}
module = hub.Module("https://tfhub.dev/google/nnlm-de-dim128-with-normalization/1")
text_1_embed = module(inputs['text_1'])
text_2_embed = module(inputs['text_2'])
# Calculate Cosine Similarity
question_normalized = tf.nn.l2_normalize(text_1_embed, 1)
content_normalized = tf.nn.l2_normalize(text_2_embed, 1)
outputs['cosine_similarity'] = tf.reduce_sum(tf.multiply(question_normalized, content_normalized),
keepdims=True,
axis=1)
outputs['y'] = inputs['y']
return outputs
def run():
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p,\
beam_impl.Context(temp_dir='./tmp'):
pcoll_text = p | beam.Create(SAMPLE_INPUT)
transformed_dataset, transform_fn = (
(pcoll_text, tf_input_metadata)
| 'Analyze and Transform' >> beam_impl.AnalyzeAndTransformDataset(tf_transform_preprocessing))
transformed_data, transformed_metadata = transformed_dataset
column_names = transformed_metadata.schema.as_feature_spec().keys()
(transformed_data | ' Write PCollection to GCS, csv' >> WriteToText(
file_path_prefix='./preprocessed_output',
num_shards=1,
coder=CsvCoder(column_names=column_names, schema=transformed_metadata.schema),
compression_type='uncompressed',
header=','.join(column_names)))
transform_fn | 'Write transformFn' >> transform_fn_io.WriteTransformFn('./metadata')
if __name__ == '__main__':
run()
Input:
SAMPLE_INPUT = [({
'text_1': 'Help me embed this!',
'text_2': 'Help me embed this!',
'y': 1
}), ({
'text_1': 'And this as well',
'text_2': 'Lunch Lunch Lunch',
'y': 0
})]
Preprocessed Output in preprocessed_output-00000-of-00001.csv
:
y,cosine_similarity
1.0,1.0000001
0.0,0.1290714
1) trains a tf.estimator.LinearRegressor
on the pre-processed data
2) Periodically evaluates and exports the model using Checkpoints
3) During this evaluation it also exports the serving_input_receiver_fn
that I later want to use for serving it in production. Since I want to feed
raw data to the model while serving, I apply the exported tf-transform
transformations in the serving_input_fn
.
4) run python train.py
from sys import argv
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
tf_input_raw_feature_spec = {
'text_1': tf.FixedLenFeature([], tf.string),
'text_2': tf.FixedLenFeature([], tf.string),
'y': tf.FixedLenFeature([], tf.float32),
}
tf_input_metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(tf_input_raw_feature_spec))
def make_input_fn(input_file_pattern, num_epochs, batch_size, label_variable, shuffle=False):
return tf.contrib.data.make_csv_dataset(file_pattern=input_file_pattern,
batch_size=batch_size,
label_name=label_variable,
num_epochs=num_epochs,
shuffle=shuffle)
def make_serving_input_fn(tf_transform_output):
tf_transform_output.load_transform_graph()
raw_feature_spec = tf_input_metadata.schema.as_feature_spec()
raw_feature_spec.pop('y')
def serving_input_fn():
raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(raw_feature_spec,
default_batch_size=None)
serving_input_receiver = raw_input_fn()
# Apply the transform function on raw input
raw_features = serving_input_receiver.features
transformed_features = tf_transform_output.transform_raw_features(raw_features)
return tf.estimator.export.ServingInputReceiver(transformed_features, serving_input_receiver.receiver_tensors)
return serving_input_fn
def train(args):
tf.logging.set_verbosity(tf.logging.INFO)
tf_transform_output = tft.TFTransformOutput(args['tf_transform'])
# model and all outputs under this relative path
model_dir = './logs/'
train_input_files = ['preprocessed_output-00000-of-00001']
tf.logging.info(train_input_files)
def train_input_fn():
return make_input_fn(input_file_pattern=train_input_files,
num_epochs=args['num_epochs'],
batch_size=args['batch_size'],
label_variable=args['label_variable'],
shuffle=True)
eval_input_files = ['preprocessed_output-00000-of-00001']
tf.logging.info(eval_input_files)
def eval_input_fn():
return make_input_fn(input_file_pattern=eval_input_files,
num_epochs=1,
batch_size=args['batch_size'],
label_variable=args['label_variable'])
feature_columns = [tf.feature_column.numeric_column(key='cosine_similarity')]
estimator = tf.estimator.LinearRegressor(feature_columns=feature_columns, model_dir=model_dir)
train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps=args['train_max_steps'])
serving_input_receiver_fn = make_serving_input_fn(tf_transform_output)
exporter = tf.estimator.LatestExporter(name='model_export', serving_input_receiver_fn=serving_input_receiver_fn)
eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=None, exporters=[exporter], throttle_secs=150)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
if __name__ == '__main__':
args = {
'tf_transform': './metadata',
'num_epochs': 10,
'batch_size': 1,
'label_variable': 'y',
'train_max_steps': 1000
}
train(args)
Whenever I run train.py
it successfully
Checkpoint
,but always fails when it tries to restore from Checkpoint
and
continue training with the following Error message:
NotFoundError (see above for traceback): Restoring from checkpoint failed. This is most likely due to a Variable name or other graph key that is missing from the checkpoint. Please ensure that you have not altered the graph expected based on the checkpoint. Original error:
Key transform/module/embeddings not found in checkpoint
[[node save/RestoreV2_1 (defined at /.../env/lib/python2.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:924) ]]
From what I understand it fails to restore parts of the TF Hub
module graph used in the preprocessing step (transform/module/embeddings
). Removing the exporter
from eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=None, exporters=[exporter], throttle_secs=150)
lets training complete successfully, but obviously doesn't export any saved_model
.
How do I use a TF Hub
module in tf-transform
preprocessing and apply those data transformations in a serving
environment in conjunction with a trained model?
requirements.txt
apache-beam[gcp]==2.11
tensorflow-transform==0.13
tensorflow==1.13.1
tensorflow-hub==0.4.0
Thanks a lot in advance!
Answered in Github. Following is the link, https://github.com/tensorflow/transform/issues/125#issuecomment-514558533.
Posting the answer here for the benefit of community.
Adding tftransform_output.load_transform_graph()
to train_input_fn
will resolve the issue. This relates to the way tf.Learn works. In your serving graph
, it tries to read from the training checkpoint
, but because you are using materialized data, your training graph doesn't contain the embedding.
Below is the code for the same:
def train_input_fn():
tf_transform_output.load_transform_graph()
return make_input_fn(input_file_pattern=train_input_files,
num_epochs=args['num_epochs'],
batch_size=args['batch_size'],
label_variable=args['label_variable'],
shuffle=True)