In the Apache PLC4X project (https://plc4x.apache.org) we are implementing drivers for industrial PLCs using Netty. Here usually a variety of protocols are layered. Some times one layer requires us to split up one message into multiple messages of the underlying layer. Now we are facing one big problem: One protocol negotiates a maximum number of unconfirmed messages per connection. So we can't send more messages to that than this maximum or the receiver will simply send an error response.
Now we would need to not add things to "out" in the encode method, but to add them to some sort of queue and have some Netty mechanism take care of draining that queue ... is there such a mechanism in Netty? If not, what would be the best way to implement this?
Would also be cool if someone with good Netty insight could join our project mailing list (dev@plc4x.apache.org) as we're also working on some really cool additions for Netty (Raw Socket transport on Ethernet Frame and one on IP packet base) ... I bet both projects could benefit greatly from each other.
While Netty does not provide such a handler out of the box, but because of the internal design, it is really easy to make such max concurrent pending requests out of the box.
Making such handler can be done using the PendingWriteQueue
class from the Netty framework in combination with a generic handler:
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue;
public class MaxPendingRequestHandler extends ChannelHandlerAdapter {
private PendingWriteQueue queue;
private int freeSlots;
public MaxPendingRequestHandler(int maxRequests) {
this.freeSlots = maxRequests;
}
private synchronized void trySendMessages(ChannelHandlerContext ctx) {
if(this.freeSlots > 0) {
while(this.freeSlots > 0) {
if(this.queue.removeAndWrite() == null) {
ctx.flush();
return;
}
this.freeSlots--;
}
ctx.flush();
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
this.queue = new PendingWriteQueue(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Send everything so we get a proper failurefor those pending writes
this.queue.removeAndWriteAll();
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
this.queue.removeAndWriteAll();
super.channelUnregistered(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
this.queue.add(msg, promise);
trySendMessages(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
synchronized(this) {
this.freeSlots++;
trySendMessages(ctx);
}
super.channelRead(ctx, msg);
}
}
This handler works on the fact that it saves every new message in a queue, and it checks the free slots on the wire on every write/read.
Notice that handler should be placed in the pipeline after the packet decoders/encoders, else problems happen with counting incoming packets as potential multiple packets, example:
pipeline.addLast(new PacketCodex()); // A codex exists of an encoder and decoder, you can also ass them seperately
// pipeline.addLast(new TrafficShapingHandler()) // Optional, depending on your required protocols
// pipeline.addLast(new IdleStateHandler()) // Optional, depending on your required protocols
pipeline.addLast(new MaxPendingRequestHandler())
pipeline.addLast(new Businesshandler())
Of course, you also want to verify that our handler works, this can be done using a Unit test containing a EmbeddedChannel
& JUnit:
public class MaxPendingRequestHandlerTest {
@Test
public void testMaxPending() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
@Test
public void testMaxPendingWhenAResponseHasReceived() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
channel.writeInbound("RE: 1");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), "4");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
@Test
public void testMaxPendingWhenAllResponseHasReceived() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
channel.writeInbound("RE: 1");
channel.writeInbound("RE: 2");
channel.writeInbound("RE: 3");
channel.writeInbound("RE: 4");
channel.writeInbound("RE: 5");
channel.writeInbound("RE: 6");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), "4");
Assert.assertEquals(channel.readOutbound(), "5");
Assert.assertEquals(channel.readOutbound(), "6");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
@Test
public void testMaxPendingWhenAllResponseHasReceivedAndNewMessagesAreSend() {
EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));
// channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly
channel.write("1");
channel.write("2");
channel.write("3");
channel.write("4");
channel.write("5");
channel.write("6");
channel.writeInbound("RE: 1");
channel.writeInbound("RE: 2");
channel.writeInbound("RE: 3");
channel.writeInbound("RE: 4");
channel.writeInbound("RE: 5");
channel.writeInbound("RE: 6");
channel.write("7");
channel.write("8");
channel.write("9");
channel.write("10");
Assert.assertEquals(channel.readOutbound(), "1");
Assert.assertEquals(channel.readOutbound(), "2");
Assert.assertEquals(channel.readOutbound(), "3");
Assert.assertEquals(channel.readOutbound(), "4");
Assert.assertEquals(channel.readOutbound(), "5");
Assert.assertEquals(channel.readOutbound(), "6");
Assert.assertEquals(channel.readOutbound(), "7");
Assert.assertEquals(channel.readOutbound(), "8");
Assert.assertEquals(channel.readOutbound(), "9");
Assert.assertEquals(channel.readOutbound(), (Object)null);
}
}