javacdiwebsphere-libertygrpc-javaopen-liberty

GRPC StreamObserver threads has no CDI context in Jakarta EE Open Liberty server


I have an Open Liberty server (23.0.0.9), running Jakarta 10 with CDI 3.0 that I am running a grpc-channel with push-pull based streams to an external server. A ManagedServiceExecutor is supplied to the channel spawning the streams, which in theory should make the threads managed by the container and have CDI context, it does not seem to be the case or my understanding of it is wrong. The streams are created in ordinary fashion with a defined responseObserver as,

ManagedChannelBuilder.forAddress(grpcHost, port)
            .enableRetry()
            .keepAliveWithoutCalls(true)
            .executor(executorService)
            .build();

PubSubGrpc.newStub(channel).withCallCredentials(credentials).subscribe(responseObserver);

And the responseObserver as following

public StreamObserver<FetchResponse> getDefaultResponseStreamObserver() {

        return new StreamObserver<FetchResponse>() {

            @Override
            public void onNext(FetchResponse fetchResponse) {
                for (ConsumerEvent ce : fetchResponse.getEventsList()) {
                    try {
                        
                        injectedBean.methodThatRequiresCDI(ce);  // This method does not work
                        
                    } catch (Exception e) {
                        logger.info(e.toString());
                    }

                }

                if (fetchResponse.getPendingNumRequested() == 0 && subscriptionReference.isActive()) {
                    subscriptionReference.getRequestObserver().onNext(FetchRequest.newBuilder().setNumRequested(100).build());
                }

            }

            @Override
            public void onError(Throwable t) {
                subscriptionReference = subscriptionReference.closedSubscription();
                logger.info("big bad error :(");

            }

            @Override
            public void onCompleted() {
                subscriptionReference = subscriptionReference.closedSubscription();
                logger.info("Call completed by server. Closing AsyncSubscriptionFactory. Goodbye.");
            }
        };

The "methodThatRequiresCDI" calls CDI.current() downstream in another library that I can not substitute, which throws an IllegalStateException. To not pollute the question I have included the same issue and stacktrace that arise invoking CDI.current() with a breakpoint in onNext(), it illustrates the same issue. Whenever onNext() is invoked by a response from the gRPC-stream, CDI.current() does not work and no context is active.

java.lang.IllegalStateException: Could not find deployment
    at com.ibm.ws.cdi.impl.AbstractCDIRuntime.getCDI(AbstractCDIRuntime.java:163)
    at jakarta.enterprise.inject.spi.CDI.getCDIProvider(CDI.java:78)
    at jakarta.enterprise.inject.spi.CDI.current(CDI.java:65)
    at ----.subscribe.control.ExampleSubscriber$1.onNext(ExampleSubscriber.java:107)
    at ----.subscribe.control.ExampleSubscriber$1.onNext(ExampleSubscriber.java:100)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:466)
    at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
    at io.openliberty.grpc.internal.client.monitor.GrpcMonitoringClientCallListener.onMessage(GrpcMonitoringClientCallListener.java:59)
    at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
    at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
    at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
    at io.grpc.internal.DelayedClientCall$DelayedListener.onMessage(DelayedClientCall.java:447)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:661)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:646)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at com.ibm.ws.threading.internal.PolicyTaskFutureImpl.run(PolicyTaskFutureImpl.java:762)
    at com.ibm.ws.threading.internal.PolicyExecutorImpl.runTask(PolicyExecutorImpl.java:1172)
    at com.ibm.ws.threading.internal.PolicyExecutorImpl$GlobalPoolTask.run(PolicyExecutorImpl.java:198)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

The method runs just fine (as expected) in any other context, for example invoked as part of jax-rs request or by an MDB, as long as CDI.current() works and beanManager can be accessed the invocation of the method specified above works great.

There are two workaround that in the absence of another solution would be fine.

  1. Let the the injected bean that holds the method be an EJB @Singleton, it seems that the invocation runs in it's own container managed thread and CDI.current() works just fine. At the moment it is @ApplicationScoped and I would like to keep it that way. I intend to invoke many different beans and I don't feel the need to change them all to EJB Singletons.

  2. Fire an event with the payload to an @Observes method that invokes the method instead. This also works and CDI.current() executes well in that method and subsequently in the downstream method. The drawback is that it adds a weird indirection and feels like a hack to make it work. Also more difficult to refactor if there are many different types of subscriptions, would have to fire an event and add @Observes method for each one.

I'd rather want to know why (and for my own understanding of CDI as well) the thread that invokes the onNext() method does not have CDI context and how to fix that issue instead, if possible. I expected I would not have this issue as long as I let the container create the threads run the tasks I submit to it with a ManagedExecutorService. Have also tested using a ManagedThreadFactory, submitting the task to a ManagedThreadManager inside the onNext() -method, adding @ActivateRequestContext and several other options to no avail.


Solution

  • I talked with some other people who work in these areas. This isn't a complete answer, but hopefully it's enough to help.

    The CDI code that throws the IllegalStateException is checking the thread context to see what the current application is. Since we're getting this exception, it seems that the correct thread context is not present.

    The ManagedExecutorService will copy thread context (including the information that CDI is looking for) from the thread that submits the task to the executor and applies it to the thread while the task is run. As long as tasks are always submitted through the ManagedExecutorService, the context should be copied from task to task.

    However, if at any point a task is run on a non-managed executor, the thread context doesn't get copied when that task is run, so even if that task then submits the next task to use the ManagedExecutorService, there's no context on the submitting thread to be copied. Your stack trace has PolicyExecutorImpl near the bottom suggesting that you are likely running on a managed thread at this point, so I suspect that somewhere along the line, gRPC has run a task on a different executor and has lost the context.

    We'd suggest trying the following things:

    ManagedChannelBuilder has an offloadExecutor method. You could try setting this as well.

    If that doesn't work we would try injecting a ManagedThreadFactory and using it to create an ExecutorService

    @Resource(lookup = "java:comp/DefaultManagedThreadFactory")
    private ManagedThreadFactory managedThreadFactory;
    
    ExecutorService executor = Executors.newFixedThreadPool(numThreads, managedThreadFactory);
    

    That should have the application context applied for any tasks run on that executor.

    Edit: I note that a change did go into this area in 24.0.0.11 which allows the CDI code to fall back to checking the thread context classloader. That might help you if somehow the TCCL is set correctly when onNext is called, even if the thread context CDI is looking for is missing.