dataframedaskbag

Dask bag from multiple files into Dask dataframe with columns


I am given a list of filenames files which contain comma-delimited data which has to be cleaned as well as further extended by columns containing information based on the filenames. Thus, I implemented a small read_file function, which handles both, the initial cleaning, as well as the computation of additional columns. Using db.from_sequence(files).map(read_file), I am mapping the read function to all of the files, getting a list of dictionaries each.

However, rather than a list of dictionaries, I want my bag to contain each individual line of the input files as an entry. Subsequently, I want to map the keys of the dictionaries to column names in a dask dataframe.

from dask import bag as db

def read_file(filename):
    ret = []
    with open(filename, 'r') as fp:
        ... # reading line of file and storing result in dict
        ret.append({'a': val_a, 'b': val_b, 'c': val_c})

    return ret

from dask import bag as db
files = ['a.txt', 'b.txt', 'c.txt']
my_bag = db.from_sequence(files).map(read_file)
# a,b,c are the keys of the dictionaries returned by read_file
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])

Could someone let me know what I have to change to get this code running? Are there different approaches that would be more suitable?

Edit: I have created three test files a_20160101.txt, a_20160102.txt, a_20160103.txt. All of them contain just a few lines with a single string each.

asdf
sadfsadf
sadf
fsadff
asdf
sadfasd
fa
sf
ads 
f

Previously I had a small error in read_file, but now, calling my_bag.take(10) after mapping to the reader works fine:

([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b':    datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],)

However my_df = my_bag.to_dataframe(columns=['a', 'b', 'c']) and subsequently my_df.head(10) still raises dask.async.AssertionError: 3 columns passed, passed data had 10 columns


Solution

  • You probably need to call flatten

    Your bag of filenames looks like this:

    ['a.txt', 
     'b.txt', 
     'c.txt']
    

    After you call map your bag looks like this:

    [[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}],
     [{'a': 1, 'b': 2, 'c': 3}],
     [{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]]
    

    Each file was turned into a list of dicts. Now your bag is kind of like a list-of-lists-of-dicts.

    The .to_dataframe method wants you to have a list-of-dicts. So lets concatenate our bag to be a single flattened collection of dicts

    my_bag = db.from_sequence(files).map(read_file).flatten()
    
    [{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30},
     {'a': 1, 'b': 2, 'c': 3},
     {'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]