infosphere-splibm-infosphere

using Java custom operator within java API in infosphere streams


I have been searching for awhile now on how to use a customized Java operator withing infosphere streams Java API

What I need is after writing a customized operator like below ...

public class Test extends AbstractOperator {
private int i;
private int num;
@Override
public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
i = 0; ....

I want to use it like the below ....

        Topology topology = new Topology("toplogy_test");
        TStream<String> inDataFileName = ...
//call the "Test" operator here

Solution

  • You can call a Java operator / C++ operator from the topology API by doing the following:

    1. Add the toolkit of the Java operator:

      SPL.addToolkit(topology, new File("/home/streamsadmin/myTk"));

    2. Convert the incoming stream to SPL stream:

      StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>");
      
      SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){
      
        @Override
        public OutputTuple apply(String input_string, OutputTuple output_rstring) {
          output_rstring.setString("rstring_attr_name", input_string);
          return output_rstring;
        }}, rstringSchema);
      
    3. Invoke the operator:

      SPLStream splOutputStream = SPL.invokeOperator("OperatorNamespace::YourOperatorName", splInputStream, rstringSchema, new HashMap());

    You can find more information about this here:

    http://ibmstreams.github.io/streamsx.documentation/docs/4.2/java/java-appapi-devguide/#integrating-spl-operators-with-the-java-application-api

    On a side note, if you are thinking about using the Topology API to write the Streams topology, then it's easier to write a regular Java class, and invoke it directly from the Topology API.

    For example:

    MyJavaCode someObj = new MyJavaCode();
    
    Topology topology = new Topology("MyTopology");
    TStream<String> inDataFileName = ...
    inDataFileName.transform(new Function<String, String>(){
            @Override
            public String apply(String word) {
                return someObj.someFunction(word);
            }
        });
    

    The only requirement here is that your Java class needs to implement Serializable.