I'm attempting to send data via an actor to a runnable graph that contains a fan out.
I define the source as :
final Source<Integer, ActorRef> integerSource =
Source.actorRef(
elem -> {
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
elem -> Optional.empty(),
10,
OverflowStrategy.dropHead());
But I'm unsure how to get a handle on an ActoRef
to send data via an actor to the source so that the runnable graph will process messages asynchronously as they are received :
RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
GraphDSL.create(sink, (builder, out) -> {
SourceShape<Integer> sourceShape = builder.add(integerSource);
FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
FlowShape<Integer, Integer> flow2Shape = builder.add(flow1);
UniformFanOutShape<Integer, Integer> broadcast =
builder.add(Broadcast.create(2));
UniformFanInShape<Integer, Integer> merge =
builder.add(Merge.create(2));
builder.from(sourceShape)
.viaFanOut(broadcast)
.via(flow1Shape);
builder.from(broadcast).via(flow2Shape);
builder.from(flow1Shape)
.viaFanIn(merge)
.to(out);
builder.from(flow2Shape).viaFanIn(merge);
return ClosedShape.getInstance();
} )
);
Entire src :
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.*;
import akka.stream.javadsl.*;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
@Slf4j
public class GraphActorSource {
private final static ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "flowActorSystem");
public void runFlow() {
final Source<Integer, ActorRef> integerSource =
Source.actorRef(
elem -> {
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
elem -> Optional.empty(),
10,
OverflowStrategy.dropHead());
Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class)
.map (x -> {
System.out.println("Flow 1 is processing " + x);
return (x * 2);
});
Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(x -> {
System.out.println(x);
});
RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
GraphDSL.create(sink, (builder, out) -> {
SourceShape<Integer> sourceShape = builder.add(integerSource);
FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
FlowShape<Integer, Integer> flow2Shape = builder.add(flow1);
UniformFanOutShape<Integer, Integer> broadcast =
builder.add(Broadcast.create(2));
UniformFanInShape<Integer, Integer> merge =
builder.add(Merge.create(2));
builder.from(sourceShape)
.viaFanOut(broadcast)
.via(flow1Shape);
builder.from(broadcast).via(flow2Shape);
builder.from(flow1Shape)
.viaFanIn(merge)
.to(out);
builder.from(flow2Shape).viaFanIn(merge);
return ClosedShape.getInstance();
} )
);
graph.run(actorSystem);
}
public static void main(String args[]){
new GraphActorSource().runFlow();
}
}
How to send data to the Runnable graph via an actor?
Something like ? :
integerSource.tell(1)
integerSource.tell(2)
integerSource.tell(3)
ActorRef.tell
works. Construct the graph blueprint so the source ActorRef
will be returned when the blueprint is materialized and run.
For just one materialized object, use that materialized type for the materialized type parameter of the Graph
.
Here the materialized type parameter for integerSource
is ActorRef
.
The materialized type parameter for Graph
is also ActorRef
.
Only integerSource
is passed to GraphDSL.create
.
Source<Integer, ActorRef> integerSource = ...
Graph<ClosedShape, ActorRef> graph =
GraphDSL.create(integerSource, (builder, src) -> {
...
});
RunnableGraph<ActorRef> runnableGraph = RunnableGraph.fromGraph(graph);
ActorRef actorRef = runnableGraph.run(actorSystem);
actorRef.tell(1, ActorRef.noSender());
To access more than one materialized object, a tuple must be constructed to capture them. If two objects from the materialized graph are desired, say src and snk, then Pair<A,B>
can capture both types.
Here both integersource
and sink
are passed to GraphDSL.create
.
The materialized ActorRef
and CompletionStage
are paired for the result of run
with Pair::new
.
The type Pair<ActorRef,CompletionStage<Done>>
is the materialized type parameter of the Graph
.
Source<Integer, ActorRef> integerSource = ...
Sink<Integer, CompletionStage<Done>> sink = ...
Graph<ClosedShape, Pair<ActorRef, CompletionStage<Done>>> graph =
GraphDSL.create(integerSource, sink, Pair::new, (builder, src, snk) -> {
....
});
RunnableGraph<Pair<ActorRef, CompletionStage<Done>>> runnableGraph =
RunnableGraph.fromGraph(graph);
Pair<ActorRef, CompletionStage<Done>> pair =
runnableGraph.run(actorSystem);
ActorRef actorRef = pair.first();
CompletionStage<Done> completionStage = pair.second();
actorRef.tell(1, ActorRef.noSender());
Full example:
(build.gradle)
apply plugin: "java"
apply plugin: "application"
mainClassName = "GraphActorSource"
repositories {
mavenCentral()
}
dependencies {
implementation "com.typesafe.akka:akka-actor-typed_2.13:2.6.19"
implementation "com.typesafe.akka:akka-stream-typed_2.13:2.6.19"
implementation 'org.slf4j:slf4j-jdk14:1.7.36'
}
compileJava {
options.compilerArgs << "-Xlint:unchecked"
}
(src/main/java/GraphActorSource.java)
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Status.Success;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.util.Timeout;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
public class GraphActorSource {
private final static ActorSystem actorSystem =
ActorSystem.create(Behaviors.empty(), "flowActorSystem");
public void runFlow() {
// 1. Create graph (blueprint)
// 1a. Define source, flows, and sink
final Source<Integer, ActorRef> integerSource =
Source.actorRef
(
elem -> {
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
elem -> Optional.empty(),
10,
OverflowStrategy.dropHead()
);
Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class)
.map (x -> {
System.out.println("Flow 1 is processing " + x);
return (100 + x);
});
Flow<Integer, Integer, NotUsed> flow2 = Flow.of(Integer.class)
.map (x -> {
System.out.println("Flow 2 is processing " + x);
return (200 + x);
});
Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(x -> {
System.out.println("Sink received "+x);
});
// 1b. Connect nodes and flows into a graph.
// Inputs and output nodes (source, sink) will be produced at run start.
Graph<ClosedShape, Pair<ActorRef, CompletionStage<Done>>> graph =
GraphDSL.create(integerSource, sink, Pair::new, (builder, src, snk) -> {
UniformFanOutShape<Integer, Integer> broadcast =
builder.add(Broadcast.create(2));
FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
FlowShape<Integer, Integer> flow2Shape = builder.add(flow2);
UniformFanInShape<Integer, Integer> merge =
builder.add(Merge.create(2));
builder.from(src)
.viaFanOut(broadcast);
builder.from(broadcast.out(0))
.via(flow1Shape)
.toInlet(merge.in(0));
builder.from(broadcast.out(1))
.via(flow2Shape)
.toInlet(merge.in(1));
builder.from(merge)
.to(snk);
return ClosedShape.getInstance();
} );
RunnableGraph<Pair<ActorRef, CompletionStage<Done>>> runnableGraph =
RunnableGraph.fromGraph(graph);
// 2. Start run,
// which produces materialized source ActorRef and sink CompletionStage.
Pair<ActorRef, CompletionStage<Done>> pair =
runnableGraph.run(actorSystem);
ActorRef actorRef = pair.first();
CompletionStage<Done> completionStage = pair.second();
// On completion, terminates actor system (optional).
completionStage.thenRun(() -> {
System.out.println("Done, terminating.");
actorSystem.terminate();
});
// 3. Send messages to source actor
actorRef.tell(1, ActorRef.noSender());
actorRef.tell(2, ActorRef.noSender());
// The stream completes successfully with the following message
actorRef.tell(Done.done(), ActorRef.noSender());
}
public static void main(String args[]){
new GraphActorSource().runFlow();
}
}
Reference Akka Documentation (accessed Version 2.6.19)