I'm trying to use the Dedupe
package to merge a small messy data to a canonical table. Since the canonical table is very large (122 million rows), I can't load it all into memory.
The current approach that I'm using based off this takes an entire day on test data: a 300k row table of messy data stored in a dict, and a 600k row table of canonical data stored in mysql. If I do it all in memory (read the canonical table in as a dict) it only takes half an hour.
Is there a way to make this more efficient?
blocked_pairs = block_data(messy_data, canonical_db_cursor, gazetteer)
clustered_dupes = gazetteer.matchBlocks(blocked_pairs, 0)
def block_data(messy_data, c, gazetteer):
block_groups = itertools.groupby(gazetteer.blocker(messy_data.viewitems()),
lambda x: x[1])
for (record_id, block_keys) in block_groups:
a = [(record_id, messy_data[record_id], set())]
c.execute("""SELECT *
FROM canonical_table
WHERE record_id IN
(SELECT DISTINCT record_id
FROM blocking_map
WHERE block_key IN %s)""",
(tuple(block_key for block_key, _ in block_keys),))
b = [(row[self.key], row, set()) for row in c]
if b:
yield (a, b)
Sped it up dramatically by splitting up the query into two queries. I'm using mysql
and all the columns used in the example are indexed...
def block_data(messy_data, c, gazetteer):
block_groups = itertools.groupby(gazetteer.blocker(messy_data.viewitems()),
lambda x: x[1])
for (record_id, block_keys) in block_groups:
a = [(record_id, messy_data[record_id], set())]
c.execute("""SELECT DISTINCT record_id
FROM blocking_map
WHERE block_key IN %s""",
(tuple(block_key for block_key, _ in block_keys),))
values = tuple(row['record_id'] for row in c)
if values:
c.execute("""SELECT *
FROM canonical_table
WHERE record_id IN %s""",
(values,))
b = [(row['record_id'], row, set())
for row in c]
if b:
yield (a, b)