Hello I have a command bus, a query bus, which basically has a keypair with the name of the command or query and the handler and then I execute the command that should publish my event. But I have some doubts about how I could do my event-bus. is the command-bus part of an event-bus? how could I do an event-bus with the handlers
command-bus:
export interface ICommand {
}
export interface ICommandHandler<
TCommand extends ICommand = any,
TResult = any
> {
execute(command: TCommand): Promise<TResult>
}
export interface ICommandBus<CommandBase extends ICommand = ICommand> {
execute<T extends CommandBase>(command: T): Promise<any>
register(data:{commandHandler: ICommandHandler, command: ICommand}[]): void
}
command-bus implementation:
export class CommandBus<Command extends ICommand = ICommand>
implements ICommandBus<Command> {
private handlers = new Map<string, ICommandHandler<Command>>()
public execute<T extends Command>(command: T): Promise<any> {
const commandName = this.getCommandName(command as any)
const handler = this.handlers.get(commandName)
if (!handler) throw new Error(``)
return handler.execute(command)
}
public register(
data: { commandHandler: ICommandHandler; command: ICommand }[],
): void {
data.forEach(({command,commandHandler}) => {
this.bind(commandHandler, this.getCommandName(command as any))
})
}
private bind<T extends Command>(handler: ICommandHandler<T>, name: string) {
this.handlers.set(name, handler)
}
private getCommandName(command: Function): string {
const { constructor } = Object.getPrototypeOf(command)
return constructor.name as string
}
}
Here another question arose, who should have the responsibility to publish the events in my event db or read a stream of my event db is my class event-store?
event-store class:
export class EventStoreClient {
[x: string]: any;
/**
* @constructor
*/
constructor(private readonly config: TCPConfig) {
this.type = 'event-store';
this.eventFactory = new EventFactory();
this.connect();
}
connect() {
this.client = new TCPClient(this.config);
return this;
}
getClient() {
return this.client;
}
newEvent(name: any, payload: any) {
return this.eventFactory.newEvent(name, payload);
}
close() {
this.client.close();
return this;
}
}
And then I have doubts about how to implement my event-bus, with my event handlers and my events.
I would be happy if someone could help me ..
event-interface:
export interface IEvent {
readonly aggregrateVersion: number
readonly aggregateId: string
}
export interface IEventHandler<T extends IEvent = any> {
handle(event: T): any
}
maybe usage:
commandBus.execute(new Command())
class commandHandler {
constructor(repository: IRepository, eventBus ????){}
execute(){
//how i can publish an event with after command handler logic with event bus her
}
}
I see there's some confusion between the various Buses and the Event Store. Before attempting to implement an Event Bus, you need to answer one important question that lies at the foundation of any Event Sourcing implementation:
That is, your Event Store contains the complete state of the domain. This also means that the consumers of the Event Bus (whatever it ends up being - a message queue, a streaming platform, Redis, etc.) should only get the events that are persisted. Therefore, the goals become:
These two goals intuitively translate to "I want atomic commit between the Event Store and the Event Bus". This is simplest to achieve when they're the same thing!
So, instead of thinking about how to connect an "Event Bus" to command handlers and send events back and forth, think about how to retrieve already persisted events from the Event Store and subscribe to that. This also removes any dependency between command handlers and event subscribers - they live on different sides of the Event Store (writer vs. reader), and could be in different processes, on different machines.