I've managed to plug in the GCP PubSub dependency into the Flink Statefun JAR and then build the Docker image.
I've added the below to the pom.xml
.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub</artifactId>
<version>1.16.0</version>
<scope>test</scope>
</dependency>
It's not too clear how I now specify my PubSub ingress and egress in the module.yaml that we use with the StateFun image.
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/
For example, for Kakfa you use:
kind: io.statefun.kafka.v1/egress
spec:
id: com.example/my-egress
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
transactionTimeout: 15min
I can see the official connectors have a Kind
const in the Java code that you use to reference the connectors within your module.yaml
but I can't see in the docs how to reference the Flink connectors you plug in yourself to the StateFun image.
GCP PubSub is not officially supported as a standard Statefun IO component, only Kafka and Kinesis for now; however you can come up with your own custom ingress/egress connector relatively easily. Unfortunately you won't be able to provide a way to have a new yaml-based config item, as the modules configurators for Kafka and Kinesis seem to be hard-coded in the runtime. You'll have to do your configuration in your code:
Looking at the source/ingress example:
public class ModuleWithSourceSpec implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
IngressIdentifier<TypedValue> id =
new IngressIdentifier<>(TypedValue.class, "com.example", "custom-source");
IngressSpec<TypedValue> spec = new SourceFunctionSpec<>(id, new FlinkSource<>());
binder.bindIngress(spec);
binder.bindIngressRouter(id, new CustomRouter());
}
}
Your goal is going to be to provide the new FlinkSource<>()
, which is a org.apache.flink.streaming.api.functions.source.SourceFunction
You could declare it thus:
SourceFunction source =
PubSubSource.newBuilder()
.withDeserializationSchema(new IntegerSerializer())
.withProjectName(projectName)
.withSubscriptionName(subscriptionName)
.withMessageRateLimit(1)
.build();
You'll also have to come up with a new CustomRouter()
, to determine which function instance should handle an event initially. You can take inspiration from here:
public static class GreetingsStateBootstrapDataRouter implements Router<Tuple2<String, Integer>> {
@Override
public void route(
Tuple2<String, Integer> message, Downstream<Tuple2<String, Integer>> downstream) {
downstream.forward(new Address(GREETER_FUNCTION_TYPE, message.f0), message);
}
}
Same thing for sink/egress, no router to provide:
public class ModuleWithSinkSpec implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
EgressIdentifier<TypedValue> id = new EgressIdentifier<>("com.example", "custom-sink", TypedValue.class);
EgressSpec<TypedValue> spec = new SinkFunctionSpec<>(id, new FlinkSink<>());
binder.bindEgress(spec);
}
}
With new FlinkSink<>()
replaced by this sink
:
SinkFunction sink =
PubSubSink.newBuilder()
.withSerializationSchema(new IntegerSerializer())
.withProjectName(projectName)
.withTopicName(outputTopicName)
.build();
That you would use like so, in the egress case:
public class GreeterFn implements StatefulFunction {
static final TypeName TYPE = TypeName.typeNameFromString("com.example.fns/greeter");
static final TypeName CUSTOM_EGRESS = TypeName.typeNameFromString("com.example/custom-sink");
static final ValueSpec<Integer> SEEN = ValueSpec.named("seen").withIntType();
@Override
CompletableFuture<Void> apply(Context context, Message message) {
if (!message.is(User.TYPE)) {
throw new IllegalStateException("Unknown type");
}
User user = message.as(User.TYPE);
String name = user.getName();
var storage = context.storage();
var seen = storage.get(SEEN).orElse(0);
storage.set(SEEN, seen + 1);
context.send(
EgressMessageBuilder.forEgress(CUSTOM_EGRESS)
.withUtf8Value("Hello " + name + " for the " + seen + "th time!")
.build());
return context.done();
}
}
You'll also have to make your Module known to the runtime using a file mentioning your Module in the META-INF/services directory of your jar, like so:
com.example.your.path.ModuleWithSourceSpec
com.example.your.path.ModuleWithSinkSpec
Alternatively if you prefer annotations you can use Google Autoservice like so
I hope it helps!