Let's say I have this data frame saved in a parquet format
import numpy as np
import pandas as pd
data = pd.DataFrame(dict(
a=[1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
b=[1.0, 1.0, 1.0, np.NaN, 0.0, np.NaN],
c=[0.9, np.NaN, 1.0, 0.0, 0.0, 0.0]
))
data.to_parquet('data.parquet')
along with a dictionary that tells me which values I should use for the imputation. Then I can write a preprocessing function.
import tensorflow as tf
impute_dictionary = dict(b=1.0, c=0.0)
def preprocessing_fn(inputs):
outputs = inputs.copy()
for key, value in impute_dictionary.items():
outputs[key] = tf.where(
tf.math.is_nan(outputs[key]),
tf.constant(value, shape=outputs[key].shape),
outputs[key]
)
return outputs
and use it in Apache Beam pipeline
import tempfile
import apache_beam as beam
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata, schema_utils
temp = tempfile.gettempdir()
RAW_DATA_FEATURE_SPEC = dict(
[(name, tf.io.FixedLenFeature([], tf.float32)) for name in ['a', 'b', 'c']]
)
RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC))
with beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
raw_data = pipeline | 'ReadTrainData' >> beam.io.ReadFromParquet('data.parquet')
raw_dataset = (raw_data, RAW_DATA_METADATA)
transformed_dataset, transform_fn = (raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset
transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)
I get this error: TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'
It seems that outputs[key].shape
is (None,)
Any suggestion?
Package versions:
tensorflow==2.1.0
tensorflow-transform==0.21.0
pandas==1.0.0
numpy==1.18.1
apache-beam==2.19.0
Entire error message:
WARNING: Logging before flag parsing goes to stderr.
W0204 10:36:03.793034 140735593104256 interactive_environment.py:113] Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
W0204 10:36:03.793169 140735593104256 interactive_environment.py:125] You have limited Interactive Beam features since your ipython kernel is not connected any notebook frontend.
W0204 10:36:03.929135 140735593104256 impl.py:360] Tensorflow version (2.1.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended.
W0204 10:36:03.929914 140735593104256 impl.py:360] Tensorflow version (2.1.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended.
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-5-465b3f61784c> in <module>
17 raw_data = pipeline | 'ReadTrainData' >> beam.io.ReadFromParquet('data.parquet')
18 raw_dataset = (raw_data, RAW_DATA_METADATA)
---> 19 transformed_dataset, transform_fn = (raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
20 transformed_data, transformed_metadata = transformed_dataset
21 transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)
/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, left, label)
547 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
548 self.pipeline = p
--> 549 result = p.apply(self, pvalueish, label)
550 if deferred:
551 return result
/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
575 transform.type_check_inputs(pvalueish)
576
--> 577 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
578
579 if type_options is not None and type_options.pipeline_type_check:
/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
193 m = getattr(self, 'apply_%s' % cls.__name__, None)
194 if m:
--> 195 return m(transform, input, options)
196 raise NotImplementedError(
197 'Execution of [%s] not implemented in runner %s.' % (transform, self))
/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
223 def apply_PTransform(self, transform, input, options):
224 # The base case of apply is to call the transform's expand.
--> 225 return transform.expand(input)
226
227 def run_transform(self,
/usr/local/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
861 # e.g. caching the values of expensive computations done in AnalyzeDataset.
862 transform_fn = (
--> 863 dataset | 'AnalyzeDataset' >> AnalyzeDataset(self._preprocessing_fn))
864
865 if Context.get_use_deep_copy_optimization():
/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, pvalueish, _unused)
987
988 def __ror__(self, pvalueish, _unused=None):
--> 989 return self.transform.__ror__(pvalueish, self.label)
990
991 def expand(self, pvalue):
/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, left, label)
547 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
548 self.pipeline = p
--> 549 result = p.apply(self, pvalueish, label)
550 if deferred:
551 return result
/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
534 try:
535 old_label, transform.label = transform.label, label
--> 536 return self.apply(transform, pvalueish)
537 finally:
538 transform.label = old_label
/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
575 transform.type_check_inputs(pvalueish)
576
--> 577 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
578
579 if type_options is not None and type_options.pipeline_type_check:
/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
193 m = getattr(self, 'apply_%s' % cls.__name__, None)
194 if m:
--> 195 return m(transform, input, options)
196 raise NotImplementedError(
197 'Execution of [%s] not implemented in runner %s.' % (transform, self))
/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
223 def apply_PTransform(self, transform, input, options):
224 # The base case of apply is to call the transform's expand.
--> 225 return transform.expand(input)
226
227 def run_transform(self,
/usr/local/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
808 input_values, input_metadata = dataset
809 result, cache = super(AnalyzeDataset, self).expand((input_values, None,
--> 810 None, input_metadata))
811 assert not cache
812 return result
/usr/local/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
681 copied_inputs = impl_helper.copy_tensors(input_signature)
682
--> 683 output_signature = self._preprocessing_fn(copied_inputs)
684
685 # At this point we check that the preprocessing_fn has at least one
<ipython-input-2-205d9abf4136> in preprocessing_fn(inputs)
9 outputs[key] = tf.where(
10 tf.math.is_nan(outputs[key]),
---> 11 tf.constant(value, shape=outputs[key].shape),
12 outputs[key]
13 )
/usr/local/lib/python3.7/site-packages/tensorflow_core/python/framework/constant_op.py in constant(value, dtype, shape, name)
256 """
257 return _constant_impl(value, dtype, shape, name, verify_shape=False,
--> 258 allow_broadcast=True)
259
260
/usr/local/lib/python3.7/site-packages/tensorflow_core/python/framework/constant_op.py in _constant_impl(value, dtype, shape, name, verify_shape, allow_broadcast)
294 tensor_util.make_tensor_proto(
295 value, dtype=dtype, shape=shape, verify_shape=verify_shape,
--> 296 allow_broadcast=allow_broadcast))
297 dtype_value = attr_value_pb2.AttrValue(type=tensor_value.tensor.dtype)
298 const_tensor = g._create_op_internal( # pylint: disable=protected-access
/usr/local/lib/python3.7/site-packages/tensorflow_core/python/framework/tensor_util.py in make_tensor_proto(values, dtype, shape, verify_shape, allow_broadcast)
446 # If shape is None, numpy.prod returns None when dtype is not set, but
447 # raises exception when dtype is set to np.int64
--> 448 if shape is not None and np.prod(shape, dtype=np.int64) == 0:
449 nparray = np.empty(shape, dtype=np_dt)
450 else:
<__array_function__ internals> in prod(*args, **kwargs)
/usr/local/lib/python3.7/site-packages/numpy/core/fromnumeric.py in prod(a, axis, dtype, out, keepdims, initial, where)
2960 """
2961 return _wrapreduction(a, np.multiply, 'prod', axis, dtype, out,
-> 2962 keepdims=keepdims, initial=initial, where=where)
2963
2964
/usr/local/lib/python3.7/site-packages/numpy/core/fromnumeric.py in _wrapreduction(obj, ufunc, method, axis, dtype, out, **kwargs)
88 return reduction(axis=axis, out=out, **passkwargs)
89
---> 90 return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
91
92
TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'
The problem is that I set the shape in tf.constant(value, shape=outputs[key].shape)
. I should have only used tf.constant(value, dtype=tf.float32)
.