nettynetty4

How to parallelize processing of partially ordered messages?


My Netty based app

  1. ingests hundreds of thousands of messages per second from a single TCP connection
  2. processes those messages in a couple of inbound handlers
  3. sends processing results somewhere downstream

Currently, all this is running on thread, as it is on a single TCP connection. I would like to know how I can parallelize 2. The difficulty is that messages cannot just be processed in parallel nilly-willy, because there is a partial order of messages. You can think of this as there being a key(message) function, and all messages for which this function returns the same result need to processed sequentially, but if the results are different, they may run in parallel. So I am thinking of having a mapping from message to thread like hash(key(message)) % threadCount.

Imagine this pipeline:

pipeline.addLast(deframer);
pipeline.addLast(new IdleStateHandler(...));
pipeline.addLast(decoder);
pipeline.addLast(bizLogicHandler1);
pipeline.addLast(bizLogicHandler2);

In the decoder, I am able to compute the result of key(message), so I would like to parallelize everything downstream of decoder. It is documented that in order to use multiple threads I can do

static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
pipeline.addLast(group, "bizLogicHandler1", bizLogicHandler1);
pipeline.addLast("bizLogicHandler2", bizLogicHandler2);

which I guess means bizLogicHandler1 and everything below it (in the above example that would be bizLogicHandler2) will be able to run in parallel? (Or would I have to specify group for bizLogicHandler2 as well?)

The above will however still run completely serial, as the documentation explains, offering UnorderedThreadPoolEventExecutor as an alternative to maximize parallelism, at the cost of getting rid of ordering completely, which does not work in my case.

Looking at the interfaces EventExecutorGroup and EventExecutor, I don't see how it would be possible to convey which messages can be processed in parallel, and which must be processed sequentially.

Any idea?


Solution

  • It turned out this is quite easy to do with one LocalServerChannel and as many LocalChannels as parallelism desired.

    The server channel will receive messages and dispatch them to one of the client channels. The other direction (from client channels to the server channel) works as well. I successfully parallelized an app this way to allow for much higher throughput, scaling out to more cores.

    Here's a bare bones version, with most of the error handling, logging, and business logic stripped away:

    public class Parallelizer extends SimpleChannelInboundHandler<Message> {
    
        private static final AtomicInteger EPHEMERAL_PORT = new AtomicInteger(0);
        private final Channel[] internalChannels;
        private final AtomicReference<ChannelHandlerContext> upstreamCtx = new AtomicReference<>(null);
    
        public Parallelizer(EventLoopGroup eventLoopGroup, List<ChannelInitializer<Channel>> channelInitializers) throws InterruptedException {
            internalChannels = (Channel[]) Array.newInstance(Channel.class, channelInitializers.size());
    
            int port = EPHEMERAL_PORT.getAndIncrement();
            final LocalAddress addr = new LocalAddress("PARALLELIZER-" + port);
            createServerChannel(eventLoopGroup, addr);
    
            channelInitializers.forEach(channelChannelInitializer -> createClientChannel(eventLoopGroup, addr, channelChannelInitializer));
    
            waitForInternalClientsToConnect();
        }
    
        private void waitForInternalClientsToConnect() throws InterruptedException {
            synchronized (internalChannels) {
                while (internalChannels[internalChannels.length - 1] == null) {
                    internalChannels.wait();
                }
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.close();
        }
    
        private void dispatch(Message msg, int clientChannelIdx) {
            Channel channel = internalChannels[clientChannelIdx];
            channel.writeAndFlush(msg, channel.voidPromise());
        }
    
        private void createClientChannel(EventLoopGroup eventLoopGroup, LocalAddress addr, ChannelInitializer<Channel> channelInitializer) {
            Bootstrap cb = new Bootstrap();
            cb.group(eventLoopGroup)
                    .channel(LocalChannel.class)
                    .handler(channelInitializer);
            cb.connect(addr);
        }
    
        private void createServerChannel(EventLoopGroup eventLoopGroup, LocalAddress addr) throws InterruptedException {
            ServerBootstrap sb = new ServerBootstrap();
            sb.group(eventLoopGroup)
                    .channel(LocalServerChannel.class)
                    .childHandler(new ChannelInitializer<LocalChannel>() {
                        @Override
                        public void initChannel(LocalChannel ch) {
                            ch.pipeline().addLast(new InternalHandler());
                        }
                    });
            sb.bind(addr).sync();
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
            final int hash = msg.getDistributionKey().hashCode();
            int clientChannelIdx = Integer.remainderUnsigned(hash, internalChannels.length);
            dispatch(msg, clientChannelIdx);
        }
    
        private class InternalHandler extends SimpleChannelInboundHandler<ByteBuf> {
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                ctx.close();
            }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                synchronized (internalChannels) {
                    final int firstNull = Arrays.asList(internalChannels).indexOf(null);
                    internalChannels[firstNull] = ctx.channel();
                    internalChannels.notify();
                }
                super.channelActive(ctx);
            }
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
                final ChannelHandlerContext upstreamCtx = Parallelizer.this.upstreamCtx.get();
                msg.retain();
                if (upstreamCtx != null)
                    upstreamCtx.writeAndFlush(msg, upstreamCtx.channel().voidPromise());
            }
        }
    }