I'm trying to parallelize the training step of my model with tensorflow ParameterServerStrategy
. I work with GCP AI Platform
to create the cluster and launch the task.
As my dataset is huge, I use the bigquery tensorflow connector included in tensorflow-io
.
My script is inspired by the documentation of tensorflow bigquery reader and the documentation of tensorflow ParameterServerStrategy
Locally my script works well but when I launch it with AI Platform I get the following error :
{"created":"@1633444428.903993309","description":"Error received from peer ipv4:10.46.92.135:2222","file":"external/com_github_grpc_grpc/src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Op type not registered \'IO>BigQueryClient\' in binary running on gke-cml-1005-141531--n1-standard-16-2-644bc3f8-7h8p. Make sure the Op and Kernel are registered in the binary running in this process. Note that if you are loading a saved graph which used ops from tf.contrib, accessing (e.g.) `tf.contrib.resampler` should be done before importing the graph, as contrib ops are lazily registered when the module is first accessed.","grpc_status":5}
The scripts works with fake data on AI platform and works locally with bigquery connector. I imagine that the compilation of the model including the bigquery connector and its calls on other devices creates the bug but I don't know how to fix it.
I read this error happens when devices don't have same tensorflow versions so I checked tensorflow and tensorflow-io version on each device.
tensorflow : 2.5.0
tensorflow-io : 0.19.1
I created a similar example which reproduce the bug on AI platform
import os
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession
import tensorflow as tf
import multiprocessing
import portpicker
from tensorflow.keras.layers.experimental import preprocessing
from google.cloud import bigquery
from tensorflow.python.framework import dtypes
import numpy as np
import pandas as pd
client = bigquery.Client()
PROJECT_ID = <your_project>
DATASET_ID = 'tmp'
TABLE_ID = 'bq_tf_io'
BATCH_SIZE = 32
# Bigquery requirements
def init_bq_table():
table = '%s.%s.%s' %(PROJECT_ID, DATASET_ID, TABLE_ID)
# Create toy_data
def create_toy_data(N):
x = np.random.random(size = N)
y = 0.2 + x + np.random.normal(loc=0, scale = 0.3, size = N)
return x, y
x, y =create_toy_data(1000)
df = pd.DataFrame(data = {'x': x, 'y': y})
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE",)
job = client.load_table_from_dataframe( df, table, job_config=job_config )
job.result()
# Create initial data
#init_bq_table()
CSV_SCHEMA = [
bigquery.SchemaField("x", "FLOAT64"),
bigquery.SchemaField("y", "FLOAT64"),
]
def transform_row(row_dict):
# Trim all string tensors
dataset_x = row_dict
dataset_x['constant'] = tf.cast(1, tf.float64)
# Extract feature column
dataset_y = dataset_x.pop('y')
#Export as tensor
dataset_x = tf.stack([dataset_x[column] for column in dataset_x], axis=-1)
return (dataset_x, dataset_y)
def read_bigquery(table_name):
tensorflow_io_bigquery_client = BigQueryClient()
read_session = tensorflow_io_bigquery_client.read_session(
"projects/" + PROJECT_ID,
PROJECT_ID, TABLE_ID, DATASET_ID,
list(field.name for field in CSV_SCHEMA),
list(dtypes.double if field.field_type == 'FLOAT64'
else dtypes.string for field in CSV_SCHEMA),
requested_streams=2)
dataset = read_session.parallel_read_rows()
return dataset
def get_data():
dataset = read_bigquery(TABLE_ID)
dataset = dataset.map(transform_row, num_parallel_calls=4)
dataset = dataset.batch(BATCH_SIZE).prefetch(2)
return dataset
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
# parameter server and worker just wait jobs from the coordinator (chief)
if cluster_resolver.task_type in ("worker"):
worker_config = tf.compat.v1.ConfigProto()
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
config=worker_config,
protocol="grpc")
server.join()
elif cluster_resolver.task_type in ("ps"):
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol="grpc")
server.join()
elif cluster_resolver.task_type == 'chief':
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver=cluster_resolver)
if cluster_resolver.task_type == 'chief':
learning_rate = 0.01
with strategy.scope():
# model
model_input = tf.keras.layers.Input(
shape=(2,), dtype=tf.float64)
layer_1 = tf.keras.layers.Dense( 8, activation='relu')(model_input)
dense_output = tf.keras.layers.Dense(1)(layer_1)
model = tf.keras.Model(model_input, dense_output)
#optimizer
optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate)
accuracy = tf.keras.metrics.MeanSquaredError()
@tf.function
def distributed_train_step(iterator):
def train_step(x_batch_train, y_batch_train):
with tf.GradientTape() as tape:
y_predict = model(x_batch_train, training=True)
loss_value = tf.keras.losses.MeanSquaredError(reduction=tf.keras.losses.Reduction.NONE)(y_batch_train, y_predict)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
accuracy.update_state(y_batch_train, y_predict)
return loss_value
x_batch_train, y_batch_train = next(iterator)
return strategy.run(train_step, args=(x_batch_train, y_batch_train))
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
#test
def dataset_fn(_):
def create_toy_data(N):
x = np.random.random(size = N)
y = 0.2 + x + np.random.normal(loc=0, scale = 0.3, size = N)
return np.c_[x,y]
def toy_transform_row(row):
dataset_x = tf.stack([row[0], tf.cast(1, tf.float64)], axis=-1)
dataset_y = row[1]
return dataset_x, dataset_y
N = 1000
data =create_toy_data(N)
dataset = tf.data.Dataset.from_tensor_slices(data)
dataset = dataset.map(toy_transform_row, num_parallel_calls=4)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(2)
return dataset
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(lambda x : get_data()) # <-- Not working with AI platform
#return strategy.distribute_datasets_from_function(dataset_fn) # <-- Working with AI platform
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
# Train model
for epoch in range(5):
per_worker_iterator = iter(per_worker_dataset)
accuracy.reset_states()
for step in range(5):
coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (epoch, accuracy.result().numpy()))
When I create the dataset with per_worker_dataset_fn()
I can use the bigquery connector (bugging) or create the dataset in live (working).
AI Platform Cluster configuration :
runtimeVersion: "2.5"
pythonVersion: "3.7"
Did someone get this issue ? Bigquery connector worked pretty well with MirroredStrategy on AI Platform. Tell me if I should report the issue somewhere else.
I think this is due to lazy loading of libtensorflow_io.so. https://github.com/tensorflow/io/commit/85d018ee59ceccfae06914ec2a2f6d6583775ff7
Can you try adding something like this to your code:
import tensorflow_io
tensorflow_io.experimental.oss()