I'm making a pipeline that fires a couple of REST POST calls in parallel and then updates a table in a database. It is going to be used via an ULTRA task. I'm using a script for certain transformations and mapping that I won't be able to do using traditional snaps as most of them are not ULTRA compatible (for example, tail and group by fields).
I'm aware that I need to maintain the lineage information for such pipelines to work through ULTRA tasks. Refer to the following documentation.
Whenever, I write the result to the output, I use something as follows.
this.output.write(doc, wrapper);
But there is no mention in the documentation about maintaining lineage information in case of errors. I have a similar wrapper in the catch block that I'm writing to error
as follows.
this.error.write(wrapper);
I'm getting the following error from this script.
Document created by 'Pre-DB Script[08755fd3-46b0-4f3a-b353-4a0685b649c4 -- 2b2bfab7-18f7-49ab-9a98-41bf0bc694eb]' does not contain lineage information and cannot be used as output from an Ultra pipeline
Resolution:
Use a Join snap to merge static documents into an Ultra input document or request that the snap be made Ultra-compatible
Reason:
The document was not created from an Ultra input document or was created by a Snap that is not Ultra-compatible
Following is the relevant bit of code.
var impl = {
input: input,
output: output,
error: error,
log: log,
execute: function () {
// Some variables
var error = false;
this.log.info("Executing Transform Script");
while (this.input.hasNext()) {
try {
// Read the next document, wrap it in a map and write out the wrapper
var doc = this.input.next();
// Some operations
if (!doc.successful) {
error = true;
throw { message: doc.errors.errorDetailMessage, original: doc };
}
var wrapper = new java.util.HashMap();
// Add required fields to wrapper
wrapper.put("original", doc);
if (/* Should this wrapper be sent to output? */) {
if (!error) this.output.write(doc, wrapper);
error = false;
}
this.log.info("Transform Script finished");
}
catch (err) {
var wrapper = new java.util.HashMap();
wrapper.put("error", err.message);
wrapper.put("original", err.original);
this.log.error(err);
this.error.write(wrapper);
}
}
}
};
So, how can I resolve lineage information issues in cases where documents are routed to error path?
Turns out you can pass lineage information in error
similar to output
.
this.error.write(doc, wrapper);