I'm using RabbitMQ and Celery to process email attachments using the gmail API. In my first celery task
it fetches batches of emails with large attachments in base64 strings greater than 25mb per file. The current RabbitMQ default limit is 16mb, but I don't want to raise it because I read a few articles about how keeping the message size small is a better practice.
What is the best practice here? While the first task is pulling emails, I want to create multiple other celery workers
that processes those files (with OCR and storing it in a database) concurrently to optimize the speed of the process.
A few solutions (that I'm not sure if it's a good practice because I'm a newbie) I came up with:
Raising the RabbitMQ message size limit
Storing the file in memory and referencing that in the second celery task (Not sure if this is a good idea, because my server I'm running is 32gb of ram)
In the first celery task that's pulling emails, I can directly upload that to a cloud storage service, and then reference that url to the file in the second celery task. But the downside of that is I would have to upload the file, and then redownload the file in order to do OCR extraction on it, which doesn't seem efficient. (Also increased costs because of the bandwidth usage)
Is there another solution for my design problem here?
In your case, it might make sense to split your first large task, which downloads the email packages and their attachments, into two smaller tasks. Since your attachments are large, they can be downloaded from a separate endpoint (in the second task).
In the first task you extract and process emails from GmailAPI
and pass them to the next task: userId
, messageId
, attachmentId
, access_token
.
In the second task, you upload an attachment and immediately perform OCR processing and save the results to the database.
Here is the simplified code for this solution:
import requests
from celery import shared_task
API_URL = 'https://gmail.googleapis.com'
@shared_task
def process_user_mails(user_id, access_token):
def iter_user_mail_ids_from_first_page():
url = f'{API_URL}/gmail/v1/users/{user_id}/messages'
mails_data = requests.get(url=url, headers=headers).json()
for mail in mails_data['messages']:
yield mail['id']
def load_user_mail_message():
url = f'{API_URL}/gmail/v1/users/{user_id}/messages/{mail_id}'
return requests.get(url=url, headers=headers).json()
def iter_attachments_meta(payload):
if payload.get('filename'):
yield {
'mime_type': payload['mimeType'],
'filename': payload['filename'],
'attachment_id': payload['body']['attachmentId'],
}
for part in payload.get('parts', []):
yield from iter_attachments_meta(part)
headers = {'Authorization': f'Bearer {access_token}'}
for mail_id in iter_user_mail_ids_from_first_page():
mail_message = load_user_mail_message()
attachments_it = iter_attachments_meta(mail_message.get('payload', []))
for attachment_meta in attachments_it:
process_attachment.delay(
user_id=user_id,
access_token=access_token,
mail_id=mail_id,
attachment_meta=attachment_meta,
)
@shared_task
def process_attachment(user_id, access_token, mail_id, attachment_meta):
attachment_id = attachment_meta['attachment_id']
headers = {'Authorization': f'Bearer {access_token}'}
url = (
f'{API_URL}/gmail/v1/users/{user_id}/messages/'
f'{mail_id}/attachments/{attachment_id}'
)
attachment = requests.get(url=url, headers=headers).json()
attachment_data = attachment['data']
# Do the necessary OCR processing, save everything you need to the database.
The algorithm itself is as simple as possible:
GmailApi
.All the endpoints that were involved:
The advantages of this approach are obvious: you pass a minimum of necessary information to the second task, you don't need to save the intermediate state somewhere, the possibility of parallel processing of attachments (Divide and conquer).
The second solution that comes to my mind is to create a shared folder on the server, into which your Celery
workers can save and read saved attachments. The flow would look something like this:
Celery
task.What I see as the advantages here:
RabbitMQ
message size limit.