databasetensorflowoutputtfx

How do I get a dataframe or database write from TFX BulkInferrer?


I'm very new to TFX, but have an apparently-working ML Pipeline which is to be used via BulkInferrer. That seems to produce output exclusively in Protobuf format, but since I'm running bulk inference I want to pipe the results to a database instead. (DB output seems like it should be the default for bulk inference, since both Bulk Inference & DB access take advantage of parallelization... but Protobuf is a per-record, serialized format.)

I assume I could use something like Parquet-Avro-Protobuf to do the conversion (though that's in Java and the rest of the pipeline's in Python), or I could write something myself to consume all the protobuf messages one-by-one, convert them into JSON, deserialize the JSON into a list of dicts, and load the dict into a Pandas DataFrame, or store it as a bunch of key-value pairs which I treat like a single-use DB... but that sounds like a lot of work and pain involving parallelization and optimization for a very common use case. The top-level Protobuf message definition is Tensorflow's PredictionLog.

This must be a common use case, because TensorFlowModelAnalytics functions like this one consume Pandas DataFrames. I'd rather be able to write directly to a DB (preferably Google BigQuery), or a Parquet file (since Parquet / Spark seems to parallelize better than Pandas), and again, those seem like they should be common use cases, but I haven't found any examples. Maybe I'm using the wrong search terms?

I also looked at the PredictExtractor, since "extracting predictions" sounds close to what I want... but the official documentation appears silent on how that class is supposed to be used. I thought TFTransformOutput sounded like a promising verb, but instead it's a noun.

I'm clearly missing something fundamental here. Is there a reason no one wants to store BulkInferrer results in a database? Is there a configuration option that allows me to write the results to a DB? Maybe I want to add a ParquetIO or BigQueryIO instance to the TFX pipeline? (TFX docs say it uses Beam "under the hood" but that doesn't say much about how I should use them together.) But the syntax in those documents looks sufficiently different from my TFX code that I'm not sure if they're compatible?

Help?


Solution

  • (Copied from the related issue for greater visibility)

    After some digging, here is an alternative approach, which assumes no knowledge of the feature_spec before-hand. Do the following:

    bulk_inferrer = BulkInferrer(
         ....
         output_example_spec=bulk_inferrer_pb2.OutputExampleSpec(
             output_columns_spec=[bulk_inferrer_pb2.OutputColumnsSpec(
                 predict_output=bulk_inferrer_pb2.PredictOutput(
                     output_columns=[bulk_inferrer_pb2.PredictOutputCol(
                         output_key='original_label_name',
                         output_column='output_label_column_name', )]))]
         ))
    
     statistics = StatisticsGen(
         examples=bulk_inferrer.outputs.output_examples
     )
    
     schema = SchemaGen(
         statistics=statistics.outputs.output,
     )
    

    After that, one can do the following:

    import tensorflow as tf
    from tfx.utils import io_utils
    from tensorflow_transform.tf_metadata import schema_utils
    
    # read schema from SchemaGen
    schema_path = '/path/to/schemagen/schema.pbtxt'
    schema_proto = io_utils.SchemaReader().read(schema_path)
    spec = schema_utils.schema_as_feature_spec(schema_proto).feature_spec
    
    # read inferred results
    data_files = ['/path/to/bulkinferrer/output_examples/examples/examples-00000-of-00001.gz']
    dataset = tf.data.TFRecordDataset(data_files, compression_type='GZIP')
    
    # parse dataset with spec
    def parse(raw_record):
        return tf.io.parse_example(raw_record, spec)
    
    dataset = dataset.map(parse)
    

    At this point, the dataset is like any other parsed dataset, so its trivial to write a CSV, or to a BigQuery table or whatever from there. It certainly helped us in ZenML with our BatchInferencePipeline.