I have a list of items that I need to filter based on some conditions. I'm wondering whether Dask could do this filtering in parallel, as the list is very long (a few dozen million records).
Basically, what I need to do is this:
items = [
{'type': 'dog', 'weight': 10},
{'type': 'dog', 'weight': 20},
{'type': 'cat', 'weight': 15},
{'type': 'dog', 'weight': 30},
]
def item_is_valid(item):
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
return item_is_valid
items_filtered = [item for item in items if item_is_valid(item)]
With Dask, what I have achieved to do is the following:
def item_is_valid_v2(item):
"""Return the whole item if valid."""
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
if item_is_valid:
return item
results = []
item = []
for item in items:
delayed = dask.delayed(item_is_valid)(item)
results.append(delayed)
results = dask.compute(*results)
However, the result I get contains a few None
values, which then need to be filtered out somehow in a non-parallel way.
({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)
Perhaps the bag
API will work you, this is a rough pseudo-code:
import dask.bag as db
bag = db.from_sequence() # or better yet read it from disk
result = bag.filter(item_is_valid) # note this uses the first version (bool)
To inspect if this is working, inspect the outcome of result.take(5)
and if that is satisfactory:
computed_result = result.compute()