Consider the following:
S1
, S2
and S3
S1P1
, S1P2
, S2P1
, S2P2
, S3P1
, S3P2
ZeroMQ
broker running in a single process and reachable by all service processes A logical service, let's say S1
, publishes a message M1
that is of interest to logical services S2
and S3
. Only one process of each logical service must receive M1
, so let's say S2P1
and S3P2
.
I have tried the following, but without success:
XSUB/XPUB
proxyROUTER/DEALER
proxy with the ROUTER
connected to the XPUB
socket and subscribed to everything (for logical S1
)ROUTER/DEALER
proxy with the ROUTER
connected to the XPUB
socket and subscribed to everything (for logical S2
)ROUTER/DEALER
proxy with the ROUTER
connected to the XPUB socket and subscribed to everything (for logical S3
)REP
socket thread connected to the broker DEALER
socketI figured that the XSUB/XPUB
proxy would give me publish/subscribe semantics and that the ROUTER/DEALER
proxies would introduce a competition between the REP
sockets for the messages sent by the XSUB/XPUB
proxy.
How can I combine ZeroMQ
sockets to accomplish this?
Update1
I know "without success" isn't helpful, I've tried different configurations and got different errors. The latest configuration I tried is the following:
(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP
The copyLoop goes like this:
public void start() {
context = ZMQ.context(1);
subSocket = context.socket(ZMQ.SUB);
subSocket.connect(subSocketUrl);
subSocket.subscribe("".getBytes());
reqSocket = context.socket(ZMQ.REQ);
reqSocket.connect(reqSocketUrl);
while (!Thread.currentThread().isInterrupted()) {
final Message msg = receiveNextMessage();
resendMessage(msg);
}
}
private Message receiveNextMessage() {
final String header = subSocket.recvStr();
final String entity = subSocket.recvStr();
return new Message(header, entity);
}
private void resendMessage(Message msg) {
reqSocket.sendMore(msg.getKey());
reqSocket.send(msg.getData(), 0);
}
The exception I get is the following:
java.lang.IllegalStateException: Cannot send another request
at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]
I'm running JeroMQ 0.3.4, Oracle Java 8 JVM and Windows 7.
You seem to be adding in some complexity with your ROUTER
connection - you should be able to do everything connected directly to your publisher.
The error you're currently running into is that REQ
sockets have a strict message ordering pattern - you are not allowed to send()
twice in a row, you must send/receive/send/receive/etc (likewise, REP
sockets must receive/send/receive/send/etc). From what it looks like, you're just doing send/send/send/etc on your REQ
socket without ever receiving a response. If you don't care about a response from your peer, then you must receive and discard it or use DEALER
(or ROUTER
, but DEALER
makes more sense in your current diagram).
I've created a diagram of how I would accomplish this architecture below - using your basic process structure.
Broker T1 Broker T2 Broker T3 Broker T4
(PUB*)------>(*SUB)[--](DEALER*) -->(*SUB)[--](DEALER*) -->(*SUB)[--](DEALER*)
|_____________________||____| || | ||
|_____________________||_______________________||____| ||
|| || ||
========================|| ==================|| ===========||=
|| || || || || ||
|| || || || || ||
|| || || || || ||
(REP*) (REP*) (REP*) (REP*) (REP*) (REP*)
S1P1 S1P2 S2P1 S2P2 S3P1 S3P2
So, the main difference is that I've ditched your (SUB copyLoop=> REQ)
step. Whether you choose XPUB/XSUB
vs PUB/SUB
is up to you, but I would tend to start simpler unless you currently want to make use of the extra features of XPUB/XSUB
.
Obviously this diagram doesn't deal with how information enters your broker, where you currently show an XSUB
socket - that's out of scope for the information you've provided thus far, presumably you're able to receive information into your broker successfully already so I won't deal with that.
I assume your broker threads that are dedicated to each service are making intelligent choices on whether to send the message to their service or not? If so, then your choice of having them subscribed to everything should work fine, otherwise more intelligent subscription setups might be necessary.
If you're using a REP
socket on your service processes, then the service process must take that message and deal with it asynchronously, never communicating back any details about that message to the broker. It must then respond to each message with an acknowledgement (like "RECEIVED") so that it follows the strict receive/send/receive/send pattern for REP
sockets.
If you want any other type of communication about how the service handles that message sent back to the broker, REP
is no longer the appropriate socket type for your service processes, and DEALER
may no longer be the correct socket type for your broker. If you want some form of load balancing so that you send to the next open service process, you'll need to use ROUTER/REQ
and have each service indicate its availability and have the broker hold on to the message until the next service process says its available by sending results back. If you want some other type of message handling, you'll have to indicate what that is so a suitable architecture can be proposed.