pythondictionarydaskdask-delayedbag

Parallelizing list filtering


I have a list of items that I need to filter based on some conditions. I'm wondering whether Dask could do this filtering in parallel, as the list is very long (a few dozen million records).

Basically, what I need to do is this:

items = [
    {'type': 'dog', 'weight': 10},
    {'type': 'dog', 'weight': 20},
    {'type': 'cat', 'weight': 15},
    {'type': 'dog', 'weight': 30},
]

def item_is_valid(item):
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions

    return item_is_valid

items_filtered = [item for item in items if item_is_valid(item)]

With Dask, what I have achieved to do is the following:

def item_is_valid_v2(item):
    """Return the whole item if valid."""
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions
    
    if item_is_valid:
        return item

results = []
item = []
for item in items:
    delayed = dask.delayed(item_is_valid)(item)
    results.append(delayed)

results = dask.compute(*results)

However, the result I get contains a few None values, which then need to be filtered out somehow in a non-parallel way.

({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)

Solution

  • Perhaps the bag API will work you, this is a rough pseudo-code:

    import dask.bag as db
    
    bag = db.from_sequence() # or better yet read it from disk
    result = bag.filter(item_is_valid) # note this uses the first version (bool)
    

    To inspect if this is working, inspect the outcome of result.take(5) and if that is satisfactory:

    computed_result = result.compute()