javaakkaakka-streamakka-cluster

How to use an Actor based source with an Akka Graph?


I'm attempting to send data via an actor to a runnable graph that contains a fan out.

I define the source as :

final Source<Integer, ActorRef> integerSource =
        Source.actorRef(
                elem -> {
                    if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
                    else return Optional.empty();
                },
                elem -> Optional.empty(),
                10,
                OverflowStrategy.dropHead());

But I'm unsure how to get a handle on an ActoRef to send data via an actor to the source so that the runnable graph will process messages asynchronously as they are received :

RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
        GraphDSL.create(sink, (builder, out) -> {
            SourceShape<Integer> sourceShape = builder.add(integerSource);
            FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
            FlowShape<Integer, Integer> flow2Shape = builder.add(flow1);
            UniformFanOutShape<Integer, Integer> broadcast =
                    builder.add(Broadcast.create(2));
            UniformFanInShape<Integer, Integer> merge =
                    builder.add(Merge.create(2));

            builder.from(sourceShape)
                    .viaFanOut(broadcast)
                    .via(flow1Shape);

            builder.from(broadcast).via(flow2Shape);

            builder.from(flow1Shape)
                    .viaFanIn(merge)
                    .to(out);

            builder.from(flow2Shape).viaFanIn(merge);


            return ClosedShape.getInstance();
        } )
);

Entire src :

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.*;
import akka.stream.javadsl.*;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

@Slf4j
public class GraphActorSource {

    private final static ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "flowActorSystem");

    public void runFlow() {


        final Source<Integer, ActorRef> integerSource =
                Source.actorRef(
                        elem -> {
                            if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
                            else return Optional.empty();
                        },
                        elem -> Optional.empty(),
                        10,
                        OverflowStrategy.dropHead());


        Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class)
                .map (x -> {
                    System.out.println("Flow 1 is processing " + x);
                    return (x * 2);
                });

        Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(x -> {
            System.out.println(x);
        });

        RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
                GraphDSL.create(sink, (builder, out) -> {
                    SourceShape<Integer> sourceShape = builder.add(integerSource);
                    FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
                    FlowShape<Integer, Integer> flow2Shape = builder.add(flow1);
                    UniformFanOutShape<Integer, Integer> broadcast =
                            builder.add(Broadcast.create(2));
                    UniformFanInShape<Integer, Integer> merge =
                            builder.add(Merge.create(2));

                    builder.from(sourceShape)
                            .viaFanOut(broadcast)
                            .via(flow1Shape);

                    builder.from(broadcast).via(flow2Shape);

                    builder.from(flow1Shape)
                            .viaFanIn(merge)
                            .to(out);

                    builder.from(flow2Shape).viaFanIn(merge);


                    return ClosedShape.getInstance();
                } )
        );

        graph.run(actorSystem);

    }

    public static void main(String args[]){
        new GraphActorSource().runFlow();
    }
}

How to send data to the Runnable graph via an actor?

Something like ? :

integerSource.tell(1)
integerSource.tell(2)
integerSource.tell(3)

Solution

  • ActorRef.tell works. Construct the graph blueprint so the source ActorRef will be returned when the blueprint is materialized and run.

    For just one materialized object, use that materialized type for the materialized type parameter of the Graph.

    Here the materialized type parameter for integerSource is ActorRef. The materialized type parameter for Graph is also ActorRef. Only integerSource is passed to GraphDSL.create.

    Source<Integer, ActorRef> integerSource = ...
    
    Graph<ClosedShape, ActorRef> graph =
      GraphDSL.create(integerSource, (builder, src) -> {
         ...
      });
    
    RunnableGraph<ActorRef> runnableGraph = RunnableGraph.fromGraph(graph);
    
    ActorRef actorRef = runnableGraph.run(actorSystem);
    
    actorRef.tell(1, ActorRef.noSender());
    

    To access more than one materialized object, a tuple must be constructed to capture them. If two objects from the materialized graph are desired, say src and snk, then Pair<A,B> can capture both types.

    Here both integersource and sink are passed to GraphDSL.create. The materialized ActorRef and CompletionStage are paired for the result of run with Pair::new. The type Pair<ActorRef,CompletionStage<Done>> is the materialized type parameter of the Graph.

    Source<Integer, ActorRef> integerSource = ...
    Sink<Integer, CompletionStage<Done>> sink = ...
    
    Graph<ClosedShape, Pair<ActorRef, CompletionStage<Done>>> graph =
          GraphDSL.create(integerSource, sink, Pair::new, (builder, src, snk) -> {
    ....
    });
    
    RunnableGraph<Pair<ActorRef, CompletionStage<Done>>> runnableGraph =
      RunnableGraph.fromGraph(graph);
    
    Pair<ActorRef, CompletionStage<Done>> pair =
      runnableGraph.run(actorSystem);
    ActorRef actorRef = pair.first();
    CompletionStage<Done> completionStage = pair.second();
    
    actorRef.tell(1, ActorRef.noSender());
    

    Full example:

    (build.gradle)

    apply plugin: "java"
    apply plugin: "application"
    mainClassName = "GraphActorSource"
    repositories {
      mavenCentral()
    }
    dependencies { 
      implementation "com.typesafe.akka:akka-actor-typed_2.13:2.6.19"
      implementation "com.typesafe.akka:akka-stream-typed_2.13:2.6.19"
      implementation 'org.slf4j:slf4j-jdk14:1.7.36'
    }
    compileJava {
      options.compilerArgs << "-Xlint:unchecked"
    }
    

    (src/main/java/GraphActorSource.java)

    import akka.Done;
    import akka.NotUsed;
    import akka.actor.ActorRef;
    import akka.actor.Status.Success;
    import akka.actor.typed.ActorSystem;
    import akka.actor.typed.javadsl.Behaviors;
    import akka.japi.Pair;
    import akka.stream.*;
    import akka.stream.javadsl.*;
    import akka.util.Timeout;
    
    import java.util.Optional;
    import java.util.concurrent.CompletionStage;
    import java.util.concurrent.TimeUnit;
    
    public class GraphActorSource {
    
      private final static ActorSystem actorSystem =
        ActorSystem.create(Behaviors.empty(), "flowActorSystem");
    
      public void runFlow() {
    
        // 1. Create graph (blueprint)
    
        // 1a. Define source, flows, and sink
    
        final Source<Integer, ActorRef> integerSource =
          Source.actorRef
          (
            elem -> {
              if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
              else return Optional.empty();
            },
            elem -> Optional.empty(),
            10,
            OverflowStrategy.dropHead()
          );
    
    
        Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class)
          .map (x -> {
              System.out.println("Flow 1 is processing " + x);
              return (100 + x);
            });
        Flow<Integer, Integer, NotUsed> flow2 = Flow.of(Integer.class)
          .map (x -> {
              System.out.println("Flow 2 is processing " + x);
              return (200 + x);
            });
    
        Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(x -> {
            System.out.println("Sink received "+x);
          });
    
        // 1b. Connect nodes and flows into a graph.
        // Inputs and output nodes (source, sink) will be produced at run start.
    
        Graph<ClosedShape, Pair<ActorRef, CompletionStage<Done>>> graph =
          GraphDSL.create(integerSource, sink, Pair::new, (builder, src, snk) -> {
              UniformFanOutShape<Integer, Integer> broadcast =
              builder.add(Broadcast.create(2));
              FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
              FlowShape<Integer, Integer> flow2Shape = builder.add(flow2);
              UniformFanInShape<Integer, Integer> merge =
              builder.add(Merge.create(2));
    
              builder.from(src)
              .viaFanOut(broadcast);
    
              builder.from(broadcast.out(0))
              .via(flow1Shape)
              .toInlet(merge.in(0));
    
              builder.from(broadcast.out(1))
              .via(flow2Shape)
              .toInlet(merge.in(1));
    
              builder.from(merge)
              .to(snk);
    
              return ClosedShape.getInstance();
            } );
    
        RunnableGraph<Pair<ActorRef, CompletionStage<Done>>> runnableGraph =
          RunnableGraph.fromGraph(graph);
    
        // 2. Start run,
        // which produces materialized source ActorRef and sink CompletionStage.
    
        Pair<ActorRef, CompletionStage<Done>> pair =
          runnableGraph.run(actorSystem);
        ActorRef actorRef = pair.first();
        CompletionStage<Done> completionStage = pair.second();
    
        // On completion, terminates actor system (optional).
        completionStage.thenRun(() -> {
            System.out.println("Done, terminating.");
            actorSystem.terminate();
          });
              
        // 3. Send messages to source actor
    
        actorRef.tell(1, ActorRef.noSender());
        actorRef.tell(2, ActorRef.noSender());
    
        // The stream completes successfully with the following message
        actorRef.tell(Done.done(), ActorRef.noSender());
    
      }
    
      public static void main(String args[]){
        new GraphActorSource().runFlow();
      }
    }
    

    Reference Akka Documentation (accessed Version 2.6.19)

    Streams / Operators / Source.actorRef

    Streams / Streams Cookbook / Working with operators