I have been searching for awhile now on how to use a customized Java operator withing infosphere streams Java API
What I need is after writing a customized operator like below ...
public class Test extends AbstractOperator {
private int i;
private int num;
@Override
public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
i = 0; ....
I want to use it like the below ....
Topology topology = new Topology("toplogy_test");
TStream<String> inDataFileName = ...
//call the "Test" operator here
You can call a Java operator / C++ operator from the topology API by doing the following:
Add the toolkit of the Java operator:
SPL.addToolkit(topology, new File("/home/streamsadmin/myTk"));
Convert the incoming stream to SPL stream:
StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>");
SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){
@Override
public OutputTuple apply(String input_string, OutputTuple output_rstring) {
output_rstring.setString("rstring_attr_name", input_string);
return output_rstring;
}}, rstringSchema);
Invoke the operator:
SPLStream splOutputStream = SPL.invokeOperator("OperatorNamespace::YourOperatorName", splInputStream, rstringSchema, new HashMap());
You can find more information about this here:
On a side note, if you are thinking about using the Topology API to write the Streams topology, then it's easier to write a regular Java class, and invoke it directly from the Topology API.
For example:
MyJavaCode someObj = new MyJavaCode();
Topology topology = new Topology("MyTopology");
TStream<String> inDataFileName = ...
inDataFileName.transform(new Function<String, String>(){
@Override
public String apply(String word) {
return someObj.someFunction(word);
}
});
The only requirement here is that your Java class needs to implement Serializable.