I am trying to build an application script to parse through new incoming emails I have check my script on already existing emails from specific sender and it runs fine but since I have modified it into live running script it's throwing me an error saying: An error occurred: 'utf-8' codec can't decode byte 0xa9 in position 802: invalid start byte
Following is the code for it :
import imaplib
import email
import re
import yaml
from datetime import datetime
import time
# Function to extract the promo code from email body
def extract_promo_code(body):
promo_code_pattern = r'(?s)Enter (?:code|promo).*?\b([A-Z\d]{10,})'
match = re.search(promo_code_pattern, body, re.MULTILINE)
if match:
return match.group(1)
else:
return None
# Function to extract the expiry date from email body
def extract_expiry_date(body):
expiry_date_pattern = r'Offer valid until ([A-Za-z]+ \d{1,2}, \d{4})'
match = re.search(expiry_date_pattern, body)
if match:
original_date = match.group(1)
parsed_date = datetime.strptime(original_date, '%B %d, %Y')
formatted_date = parsed_date.strftime('%d/%m/%Y')
return formatted_date
else:
return None
# Read credentials from a YAML file
with open('credentials.yaml') as f:
content = f.read()
my_credentials = yaml.load(content, Loader=yaml.FullLoader)
user, password = my_credentials['user'], my_credentials['password']
imap_url = 'imap.gmail.com'
while True:
try:
my_mail = imaplib.IMAP4_SSL(imap_url)
my_mail.login(user, password)
my_mail.select('Inbox')
_, data = my_mail.search(None, 'ALL')
mail_id_list = data[0].split()
for num in mail_id_list:
typ, data = my_mail.fetch(num, '(RFC822)')
msgs = []
for msg in data:
if isinstance(msg, tuple):
my_msg = email.message_from_bytes(msg[1])
# Initialize data fields
msg_to = my_msg['to']
date = my_msg['date']
expiry_date = None
subject = my_msg['subject']
promo_code = None
exclusions = None
supplier = "Supplier ID"
message_id = my_msg['Message-ID']
# Extract promo code from email body
for part in my_msg.walk():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode('utf-8')
promo_code = extract_promo_code(body)
# Extract exclusions if present (you can modify this part)
exclusions_match = re.search(r'\*\s*EXCLUSIONS AND DISCLAIMERS\s*(.*?)Some exclusions apply\.', body, re.IGNORECASE | re.MULTILINE | re.DOTALL)
if exclusions_match:
exclusions = exclusions_match.group(1).strip()
# Extract expiry date from email body
expiry_date = extract_expiry_date(body)
# Check if a promo code was found before printing or saving the extracted data
if promo_code:
# Print or save the extracted data
print('______________________________')
print("msg_to:", msg_to)
print("date:", date)
print("expiry_date:", expiry_date)
print("subject:", subject)
print("promo_code:", promo_code)
print("exclusions:", exclusions)
print("supplier:", supplier)
print("message_id:", message_id)
print('______________________________')
# Close the mailbox
my_mail.logout()
# Sleep for a while before checking for new emails again
time.sleep(60) # Sleep for 60 seconds before checking again
except Exception as e:
print(f"An error occurred: {str(e)}")
The code runs into the exception every 60 sec even though when no email is received. I just want it to check for new incoming email that is unread and extract the required data from it if it exists.
I'm not exactly sure, but I would guess you have an email with a non-UTF-8 character somewhere in your inbox. Since your script is running in a continuous loop with a 60-second sleep interval between iterations, every minute it attempts to read this email which then causes the exception.
I would recommend wrapping your decoding statement in a try-catch block to handle errors gracefully:
try:
body = part.get_payload(decode=True).decode('utf-8')
except UnicodeDecodeError:
print(f"Error decoding email {message_id}. Skipping.")
continue
Also it might make sense to only process unread emails instead of all emails:
_, data = my_mail.search(None, 'UNSEEN')