I have a nice straight working pipe, where the task I run via luigi on the command line triggers all the required upstream data fetch and processing in it's proper sequence till it trickles out into my database.
class IMAP_Fetch(luigi.Task):
"""fetch a bunch of email messages with data in them"""
date = luigi.DateParameter()
uid = luigi.Parameter()
…
def output(self):
loc = os.path.join(self.data_drop, str(self.date))
# target for requested message
yield LocalTarget(os.path.join(loc, uid+".msg"))
def run(self):
# code to connect to IMAP server and run FETCH on given UID
# message gets written to self.output()
…
class RecordData(luigi.contrib.postgres.CopyToTable):
"""copy the data in one email message to the database table"""
uid = luigi.Parameter()
date = luigi.DateParameter()
table = 'msg_data'
columns = [(id, int), …]
def requires(self):
# a task (not shown) that extracts data from one message
# which in turn requires the IMAP_Fetch to pull down the message
return MsgData(self.date, self.uid)
def rows(self):
# code to read self.input() and yield lists of data values
Great stuff. Unfortunately that first data fetch talks to a remote IMAP server, and every fetch is a new connection and a new query: very slow. I know how to get all the individual message files in one session (task instance). I don't understand how to keep the downstream tasks just as they are, working on one message at a time, since the task that requires one message triggers a fetch of just that one message, not a fetch of all the messages available. I apologize in advance for missing obvious solutions, but it has stumped me so far how to keep my nice simple stupid pipe mostly the way it is but have the funnel at the top suck in all the data in one call. Thanks for your help.
What is missing to me from your explanation is where the list of uid
values that are being sent to the RecordData
task are coming from. For this explanation, I will assume you have a group of uid
values which you would like to consolidate into a single ImapFetch
request.
One possible approach is to define a batch_id
, in addition your uid
, whereby the batch_id
refers to the group of messages you would like to fetch in a single session.. Where the association between a uid
and a batch_id
is stored is up to you. It can be a parameter passed to the pipeline, or stored externally. The task you left out, MsgData
, the one whose requires
method returns a single ImapFetch
Task with a uid
parameter at the moment, should instead require an ImapFetch
Task that takes a batch_id
parameter. The first ImapFetch
Task required by a MsgData
task would retrieve all uid
values associated with that batch_id
, and then retrieve those messages in a single session. All of the other MsgData
tasks would required (and be waiting on) this one batch ImapFetch
to complete, and then they would all be able to execute on their individual message as would the rest of the pipeline. As such, tuning the batch size may prove important to the overall processing throughput.
Another downside is that it is less atomic- atomic at the batch level rather than the individual item level, as the batch ImapFetch
would fail if just one of the uid
values was not successfully retrieved.
A second approach is to open the Imap session as more of a singleton resource per process (worker), and have the ImapFetch tasks reuse the same session.