luigidata-pipeline

luigi upstream task should run once to create input for set of downstream tasks


I have a nice straight working pipe, where the task I run via luigi on the command line triggers all the required upstream data fetch and processing in it's proper sequence till it trickles out into my database.

class IMAP_Fetch(luigi.Task):
  """fetch a bunch of email messages with data in them"""
  date = luigi.DateParameter()
  uid = luigi.Parameter()
…
  def output(self):
    loc = os.path.join(self.data_drop, str(self.date))
    # target for requested message
    yield LocalTarget(os.path.join(loc, uid+".msg"))

  def run(self):
     # code to connect to IMAP server and run FETCH on given UID
     # message gets written to self.output()
…

class RecordData(luigi.contrib.postgres.CopyToTable):
  """copy the data in one email message to the database table"""
  uid = luigi.Parameter()
  date = luigi.DateParameter()
  table = 'msg_data'
  columns = [(id, int), …]

  def requires(self):
    # a task (not shown) that extracts data from one message  
    # which in turn requires the IMAP_Fetch to pull down the message
    return MsgData(self.date, self.uid) 

  def rows(self):
    # code to read self.input() and yield lists of data values 

Great stuff. Unfortunately that first data fetch talks to a remote IMAP server, and every fetch is a new connection and a new query: very slow. I know how to get all the individual message files in one session (task instance). I don't understand how to keep the downstream tasks just as they are, working on one message at a time, since the task that requires one message triggers a fetch of just that one message, not a fetch of all the messages available. I apologize in advance for missing obvious solutions, but it has stumped me so far how to keep my nice simple stupid pipe mostly the way it is but have the funnel at the top suck in all the data in one call. Thanks for your help.


Solution

  • What is missing to me from your explanation is where the list of uid values that are being sent to the RecordData task are coming from. For this explanation, I will assume you have a group of uid values which you would like to consolidate into a single ImapFetch request.

    One possible approach is to define a batch_id, in addition your uid, whereby the batch_id refers to the group of messages you would like to fetch in a single session.. Where the association between a uid and a batch_id is stored is up to you. It can be a parameter passed to the pipeline, or stored externally. The task you left out, MsgData, the one whose requires method returns a single ImapFetch Task with a uid parameter at the moment, should instead require an ImapFetch Task that takes a batch_id parameter. The first ImapFetch Task required by a MsgData task would retrieve all uid values associated with that batch_id, and then retrieve those messages in a single session. All of the other MsgData tasks would required (and be waiting on) this one batch ImapFetch to complete, and then they would all be able to execute on their individual message as would the rest of the pipeline. As such, tuning the batch size may prove important to the overall processing throughput.

    Another downside is that it is less atomic- atomic at the batch level rather than the individual item level, as the batch ImapFetch would fail if just one of the uid values was not successfully retrieved.

    A second approach is to open the Imap session as more of a singleton resource per process (worker), and have the ImapFetch tasks reuse the same session.