I have a train step for a Vertex AI pipeline that uses a template code for setting up distributed training. When fitting any model I get a message in the first epoch indicating the following message:
025-07-01 17:55:35.569062: I tensorflow/core/framework/local_rendezvous.cc:407] Local rendezvous is aborting with status: CANCELLED: GetNextFromShard was cancelled [[{{node MultiDeviceIteratorGetNextFromShard}}]] [[RemoteCall]] [type.googleapis.com/tensorflow.DerivedStatus='']
I have tried using ds = strategy.experimental_distribute_dataset(ds) but this doesnt solve this issue. despite this message the model is being trained, and doesnt appear in other epochs is this really an issue?
Sharing the code below:
# Single Machine, single compute device
if args.distribute == 'single':
if tf.test.is_gpu_available():
strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
else:
strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
# Single Machine, multiple compute device
elif args.distribute == 'mirror':
strategy = tf.distribute.MirroredStrategy()
# Multiple Machine, multiple compute device
elif args.distribute == 'multi':
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
def get_data_set(generator_fn)->tuple[tf.data.Dataset,int]:
data_tmp=generator_fn()
for sample_X_train,sample_y_train in data_tmp:
break
total=sum(1 for _ in data_tmp)+1
ds= tf.data.Dataset.from_generator(
generator_fn,
output_signature=(
tf.TensorSpec(shape=(sample_X_train.shape), dtype=tf.float16),
tf.TensorSpec(shape=(sample_y_train.shape), dtype=tf.float16)
)
)
ds=ds.batch(BATCH_SIZE).repeat()
#ds = strategy.experimental_distribute_dataset(ds)
return ds,total
and fit step is:
with strategy.scope():
train_generator,train_samples=get_data_set(lambda:multi_window.train)
val_generator,val_samples=get_data_set(lambda:multi_window.val)
print(train_samples,val_samples)
train_steps = train_samples // BATCH_SIZE
val_steps = val_samples //BATCH_SIZE
model.fit(
train_generator,
validation_data=val_generator,
epochs=epochs,
validation_steps=val_steps,
steps_per_epoch=train_steps,
)
Tried strategy.experimental_distribute_dataset(ds) unsuccessfully
This is more like an informational message that is a normal side-effect of how tf.data
prefetching and the model.fit
training loop interact in a distributed environment at the end of an epoch.
Your training starts for Epoch 1. All workers start requesting and processing batches of data. The tf.data
pipeline is prefetching data in the background on each worker to keep the compute devices busy. Afterwards, the main training loop completes train_steps
and model.fit
declares the epoch finished. At this moment, one or more of your workers might have already sent a request for the next batch as part of its prefetching duty. Since the epoch is over, model.fit
terminates the data iterator for the current epoch. This termination signal cancels any outstanding data requests. Then TensorFlow runtime on that worker logs the message: Local rendezvous is aborting with status: CANCELLED: GetNextFromShard was cancelled
. This is just the system reporting that a data request it was working on was told to stop, which is exactly what was supposed to happen.