pythonemailweb-scrapingimaplibemail-parsing

Python live script not functioning to check for new emails in gmail


I am trying to build an application script to parse through new incoming emails I have check my script on already existing emails from specific sender and it runs fine but since I have modified it into live running script it's throwing me an error saying: An error occurred: 'utf-8' codec can't decode byte 0xa9 in position 802: invalid start byte

Following is the code for it :

import imaplib
import email
import re
import yaml
from datetime import datetime
import time

# Function to extract the promo code from email body
def extract_promo_code(body):
    promo_code_pattern = r'(?s)Enter (?:code|promo).*?\b([A-Z\d]{10,})'
    match = re.search(promo_code_pattern, body, re.MULTILINE)
    if match:
        return match.group(1)
    else:
        return None

# Function to extract the expiry date from email body
def extract_expiry_date(body):
    expiry_date_pattern = r'Offer valid until ([A-Za-z]+ \d{1,2}, \d{4})'
    match = re.search(expiry_date_pattern, body)
    if match:
        original_date = match.group(1)
        parsed_date = datetime.strptime(original_date, '%B %d, %Y')
        formatted_date = parsed_date.strftime('%d/%m/%Y')
        return formatted_date
    else:
        return None

# Read credentials from a YAML file
with open('credentials.yaml') as f:
    content = f.read()

my_credentials = yaml.load(content, Loader=yaml.FullLoader)

user, password = my_credentials['user'], my_credentials['password']

imap_url = 'imap.gmail.com'

while True:
    try:
        my_mail = imaplib.IMAP4_SSL(imap_url)
        my_mail.login(user, password)
        my_mail.select('Inbox')

        _, data = my_mail.search(None, 'ALL')

        mail_id_list = data[0].split()
        
        for num in mail_id_list:
            typ, data = my_mail.fetch(num, '(RFC822)')
            msgs = []

            for msg in data:
                if isinstance(msg, tuple):
                    my_msg = email.message_from_bytes(msg[1])

                    # Initialize data fields
                    msg_to = my_msg['to']
                    date = my_msg['date']
                    expiry_date = None
                    subject = my_msg['subject']
                    promo_code = None
                    exclusions = None
                    supplier = "Supplier ID"
                    message_id = my_msg['Message-ID']

                    # Extract promo code from email body
                    for part in my_msg.walk():
                        if part.get_content_type() == 'text/plain':
                            body = part.get_payload(decode=True).decode('utf-8')
                            promo_code = extract_promo_code(body)

                            # Extract exclusions if present (you can modify this part)
                            exclusions_match = re.search(r'\*\s*EXCLUSIONS AND DISCLAIMERS\s*(.*?)Some exclusions apply\.', body, re.IGNORECASE | re.MULTILINE | re.DOTALL)
                            if exclusions_match:
                                exclusions = exclusions_match.group(1).strip()

                            # Extract expiry date from email body
                            expiry_date = extract_expiry_date(body)

                    # Check if a promo code was found before printing or saving the extracted data
                    if promo_code:
                        # Print or save the extracted data
                        print('______________________________')
                        print("msg_to:", msg_to)
                        print("date:", date)
                        print("expiry_date:", expiry_date)
                        print("subject:", subject)
                        print("promo_code:", promo_code)
                        print("exclusions:", exclusions)
                        print("supplier:", supplier)
                        print("message_id:", message_id)
                        print('______________________________')

        # Close the mailbox
        my_mail.logout()

        # Sleep for a while before checking for new emails again
        time.sleep(60)  # Sleep for 60 seconds before checking again
    except Exception as e:
        print(f"An error occurred: {str(e)}")

The code runs into the exception every 60 sec even though when no email is received. I just want it to check for new incoming email that is unread and extract the required data from it if it exists.


Solution

  • I'm not exactly sure, but I would guess you have an email with a non-UTF-8 character somewhere in your inbox. Since your script is running in a continuous loop with a 60-second sleep interval between iterations, every minute it attempts to read this email which then causes the exception.

    I would recommend wrapping your decoding statement in a try-catch block to handle errors gracefully:

    try:
        body = part.get_payload(decode=True).decode('utf-8')
    except UnicodeDecodeError:
        print(f"Error decoding email {message_id}. Skipping.")
        continue
    

    Also it might make sense to only process unread emails instead of all emails:

    _, data = my_mail.search(None, 'UNSEEN')