I'm trying to set up FX modules, but I have doubts and can't find a way forward. Basically, I have one module for the server:
func NewGRPCServer(
lc fx.Lifecycle, log *zap.Logger, tracer trace.Tracer,
srvsInterceptors []grpc.UnaryServerInterceptor, serverOpt []grpc.ServerOption,
) *grpc.Server {
defaultRecoveryHandler := func(ctx context.Context, r interface{}) (err error) {
logger.FromContext(ctx).Error("recovered from panic", zap.Any("panic", r), zap.Stack("stacktrace"))
return status.Error(codes.Internal, "unexpected error")
}
interceptors := append([]grpc.UnaryServerInterceptor{
LoggerToContextInterceptor(log),
TracerToContextInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandlerContext(defaultRecoveryHandler)),
}, srvsInterceptors...)
otelHandler := otelgrpc.NewServerHandler(
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
)
serverOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(interceptors...),
grpc.StatsHandler(otelHandler),
grpc.KeepaliveEnforcementPolicy(
keepalive.EnforcementPolicy{
MinTime: 60 * time.Second,
PermitWithoutStream: true,
}),
grpc.KeepaliveParams(
keepalive.ServerParameters{
Time: 60 * time.Second,
Timeout: 10 * time.Second,
},
),
}
serverOpts = append(serverOpts, serverOpt...)
server := grpc.NewServer(serverOpts...)
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
reflection.Register(server)
return server
}
// NewListener creates a new network listener for the gRPC server using the gRPC server address parsed from the config.
func NewListener(cfg Config) (net.Listener, error) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.GRPC))
if err != nil {
return nil, fmt.Errorf("dial connection: %w", err)
}
return lis, nil
}
func GRPCModule() fx.Option {
return fx.Module(
"grpc",
fx.Provide(
fx.Annotate(
NewGRPCServer,
fx.ParamTags(``, ``, ``, `optional:"true"`, `optional:"true"`),
),
),
fx.Invoke(func(lc fx.Lifecycle, server *grpc.Server, config Config, log *zap.Logger) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
lis, err := NewListener(config)
if err != nil {
return err
}
go func(srv *grpc.Server, logger *zap.Logger) {
logger.Info("Starting gRPC server")
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
logger.Error("gRPC server failed", zap.Error(err))
}
}(server, log)
return nil
},
OnStop: func(ctx context.Context) error {
server.GracefulStop()
return nil
},
})
}),
)
}
And a module for UI:
func NewGRPCUIServer(
lc fx.Lifecycle,
logger *zap.Logger,
tracer trace.Tracer,
config Config,
) (*http.Server, error) {
logger.Info("enter on new grpc ui server.")
rpcGrpcHost := fmt.Sprintf("0.0.0.0:%d", config.GRPC)
keepAliveOpt := grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 60 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
})
cc, err := grpc.NewClient(
rpcGrpcHost,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
keepAliveOpt,
)
if err != nil {
logger.Error("Failed to connect to gRPC server for UI", zap.Error(err))
return nil, err
}
h, err := standalone.HandlerViaReflection(context.Background(), cc, rpcGrpcHost)
if err != nil {
logger.Error("Failed to create UI handler", zap.Error(err))
return nil, err
}
mux := http.NewServeMux()
mux.Handle("/grpc-ui/", http.StripPrefix("/grpc-ui", h))
return httplib.NewHTTPServerFx(
lc,
httplib.Config{
ServerAddr: fmt.Sprintf(":%d", config.UI),
ServerReadTimeout: 15 * time.Second,
ServerWriteTimeout: 15 * time.Second,
},
logger,
tracer,
mux,
)
}
type GRPCUIParams struct {
fx.In
WebServer *http.Server `name:"grpc-ui-server"`
Logger *zap.Logger
}
func GRPCUIModule() fx.Option {
return fx.Module(
"x:ui",
fx.Provide(
NewGRPCUIServer,
fx.Annotate(
NewGRPCUIServer,
fx.ParamTags(``, ``, ``, ``, ``),
fx.ResultTags(`name:"grpc-ui-server"`),
),
),
fx.Invoke(func(params GRPCUIParams) {
params.Logger.Info("gRPC UI Server initialized:", zap.String("address", params.WebServer.Addr))
}),
)
}
But for some reason it's failing at handlerViaReflection:
h, err := standalone.HandlerViaReflection(context.Background(), cc, rpcGrpcHost)
if err != nil {
logger.Error("Failed to create UI handler", zap.Error(err))
return nil, err
}
And it's giving an error because the gRPC server hasn't started yet. I placed a breakpoint here in the server module:
fx.Invoke(func(lc fx.Lifecycle, server *grpc.Server, config Config, log *zap.Logger) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
lis, err := NewListener(config)
if err != nil {
return err
}
go func(srv *grpc.Server, logger *zap.Logger) {
logger.Info("Starting gRPC server")
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
logger.Error("gRPC server failed", zap.Error(err))
}
}(server, log)
return nil
},
And it doesn't enter; it always goes to the UI module first. It creates the server: NewGRPCServer, but never initializes it. Does anyone know how I can solve this?
Create ≠ Start: NewGRPCServer
only creates a *grpc.Server
object but does not start listening on the port.
In your code, both NewGRPCServer
and NewGRPCUIServer
are provided through fx.Provide
. The UI server attempts to connect to the gRPC server immediately upon construction. However, the actual startup of your gRPC server occurs in the OnStart
lifecycle hook, which happens after all initializations are complete.
You can try moving the reflection connection logic of the UI service to the OnStart
hook as well, to ensure that the connection attempt happens only after the gRPC server has started. For example:
func NewGRPCUIServer(lc fx.Lifecycle, ...) (*http.Server, error) {
server := &http.Server{...}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
cc, err := grpc.NewClient(...)
h, err := standalone.HandlerViaReflection(...)
// ...
},
})
return server, nil
}
btw, if it always goes to the UI module first, check whether the dependency order is correct. Ensure that the gRPC server module is registered before the UI module when registering Fx modules.