mysqlgoogle-cloud-platformpython-multithreadingexecutemanypython-dedupe

Values are not inserted into MySQL table using pool.apply_async in python2.7


I am trying to run the following code to populate a table in parallel for a certain application. First the following function is defined which is supposed to connect to my db and execute the sql command with the values given (to insert into table).

def dbWriter(sql, rows) :
   # load cnf file
    MYSQL_CNF = os.path.abspath('.') + '/mysql.cnf'
    conn = MySQLdb.connect(db='dedupe',
                       charset='utf8',
                       read_default_file = MYSQL_CNF)

    cursor = conn.cursor()
    cursor.executemany(sql, rows)
    conn.commit()
    cursor.close()

    conn.close()

And then there is this piece:

pool = dedupe.backport.Pool(processes=2)

done = False

while not done :
    chunks = (list(itertools.islice(b_data, step)) for step in 
      [step_size]*100)


    results = []

    for chunk in chunks :
        print len(chunk)
        results.append(pool.apply_async(dbWriter,
                                    ("INSERT INTO blocking_map VALUES (%s, %s)",
                                     chunk)))

    for r in results :

        r.wait()

    if len(chunk) < step_size :
        done = True


pool.close()

Everything works and there are no errors. But at the end, my table is empty, meaning somehow the insertions were not successful. I have tried so many things to fix this (including adding column names for insertion) after many google searches and have not been successful. Any suggestions would be appreciated. (running code in python2.7, gcloud (ubuntu). note that indents may be a bit messed up after pasting here)

Please also note that "chunk" follows exactly the required data format.

Note. This is part of this example Please note that the only thing I am changing in the above example (linked) is that I am separating the steps for creation of and inserting into the tables since I am running my code on gcloud platform and it enforces GTID standards.


Solution

  • Solution was changing dbwriter function to:

    conn = MySQLdb.connect(host = # host ip,
                     user = # username, 
                     passwd = # password,
                     db = 'dedupe')
    cursor = conn.cursor()
    cursor.executemany(sql, rows)
    cursor.close()
    conn.commit()
    conn.close()