In my NestJS app, I am trying to work on something that could take anywhere between 10-20 minutes of completion. So, the user clicks on a button and an API is called which does some starts to do its processes that would take this amount of time.
To paint a better picture, think of it like this: There is a web app where there is one process that takes some time. You end up showing the user progress updates on the frontend. So the user could see "Generating summary of your document" and then "Doing some more relevant work", etc.
While this goes on, I would like it that the progress is saved as well so that if the user navigates away, the can still come back and pick up where the progress left off (i.e. the UI would be updated). And once it's all complete, it would move forward
I want that when the user calls that API, it does not hinder the process or gets blocked. I thought about using Server-Sent-Events (SSE) for this but, SSE would just get disconnected if the user navigates away. So, how would I go about doing this? In an API where I am generating a response from OpenAI, I am using SSE for a more responsive feel (this is a trimmed snippet of my code):
@Sse(':id/generate-answer')
async answerWithMagic(
@Param('id') id,
) {
const messages = this.aiService.resolvePrompt();
const stream = new Subject();
this.aiService
.generateCompletion({ messages, listenTokens: true, isJsonParsable: false })
.pipe(
takeWhile((data) => {
if (data.eventType === LLM_END_EVENT) {
stream.next({
eventType: data.eventType,
data: { token: data.token },
});
return false;
} else {
return true;
}
}),
)
.subscribe((data) => {
stream.next({ eventType: data.eventType, data: { token: data.token } });
});
return stream;
}
How do I save the progress here? Making repeated calls to the database would not be very efficient so I thought about using Redis where I store the progress there but I am not sure which direction to take here with this.
I think that your approach is okay.
The idea is to run the long task on background and to use Redis to keep track of the progress. For the task computation you can use BullMQ (Redis-based queue) that is directly supported by NestJS: https://docs.nestjs.com/techniques/queues .
So, technically, you have to create a RedisService that will contains the methods to set and get the progress of the task:
@Injectable()
export class RedisService {
private redis: Redis;
constructor() {
this.redis = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT, 10) || 6379,
});
}
async setProgress(taskId: string, progress: any) {
await this.redis.set(`progress:${taskId}`, JSON.stringify(progress), 'EX', 3600); // TTL 1 hour
}
async getProgress(taskId: string) {
const progress = await this.redis.get(`progress:${taskId}`);
return progress ? JSON.parse(progress) : null;
}
}
Then, you have to inject the RedisService into the service that handle the task to store the progresses.
After that, you just have to create another endpoint to retrieve the task updates:
@Post(':id/generate-answer')
async startTask(@Param('id') taskId: string) {
// Run task with BullMQ
}
@Get(':id/progress')
async getProgress(@Param('id') taskId: string) {
const progress = await this.redisService.getProgress(taskId);
return progress ?? { progress: 0, message: 'Not started' };
}
The Frontend can poll /tasks/:id/progress every few seconds (or reconnect SSE if they stay).