I'm trying to create graph with edges only for nodes/(records index in dataframe) that have the same values in any 2 or more columns.
What I'm doing - I create a list with all possible combination pairs of column names and go through them searching for duplicates, for which I extract indexes and create edges.
The problem is that for huge datasets (millions of records) - this solution is too slow and requires too much memory.
What I do:
df = pd.DataFrame({
'A': [1, 2, 3, 4, 5],
'B': [1, 1, 1, 1, 2],
'C': [1, 1, 2, 3, 3],
'D': [2, 7, 9, 8, 4]})
A | B | C | D | |
---|---|---|---|---|
0 | 1 | 1 | 1 | 2 |
1 | 2 | 1 | 1 | 7 |
2 | 3 | 1 | 2 | 9 |
3 | 4 | 1 | 3 | 8 |
4 | 5 | 2 | 3 | 4 |
Here, rows 0 and 1 have 2 same values in columns B and C.
So, for nodes 0,1,2,3,4 I need to create edge 0-1. Other records have at maximum 1 same field between each other.
graph = nk.Graph(num_nodes, directed=False, weighted=False)
# Get the indices of all unique pairs
indices = np.triu_indices(len(column_names), k=1)
# Get the unique pairs of column names
unique_pairs = np.column_stack((column_names[indices[0]], column_names[indices[1]]))
for col1, col2 in unique_pairs:
# Filter the dataframe directly
duplicated_rows = df[[col1, col2]].dropna()
duplicated_rows = duplicated_rows[duplicated_rows.duplicated(subset=[col1, col2], keep=False)]
for _, group in duplicated_rows.groupby([col1, col2]):
tb_ids = group.index.tolist()
for i in range(len(tb_ids)):
for j in range(i + 1, len(tb_ids)):
graph.addEdge(tb_ids[i], tb_ids[j])
Main question - how to speed up / improve this solution? I was thinking about parallelization by column combination - but in this case can't figure out how to create edges in a graph properly.
Appreciate any help.
Memory Problem
Your multi-million record input generates so many pairs, they cannot all be kept in memory.
You will have to give up storing everything in memory. You will need to store the data in a highly optimized database. I suggest SQLite. bring input data into memory as required and store the pairs to the database as they are found. If you properly optimize your use of SQLite then the performance hit will be minimal and you will not run out of memory
Performance problem
Storing pairs to a database will slow the performance slightly.
You will need to optimize how you use the database. The two most important optimizations are:
Transaction Grouping. Initially, keep the pairs as they are found in memory. When the pair count reaches a specified number, write them all to the database in one transaction.
Asynchronous Write. Once you have handed off the writes to the db engine, do not wait for confirmation that the write succeeded - just blaze ahead with the pair search.
You forgot to state your performance requirement! However, whatever your requirement might be, I will assume that you will need to squeeze out a significant improvement.
I see that you are using python. This is an interpreted language, so the performance will be sluggish. Switching to a compiled language will give you a significant performance boost. For example using well coded C++ can give an improvement of up to 50 times.
Algorithm
SET T number of pairs to writ in one DB transaction
LOOP N over all records
IF N has 2 or more identical values
LOOP M over records N+1 to last
LOOP C over columns
LOOP D over cols C+1 to last
IF N[C] == N[D] == M[C] == M[D]
SAVE M,N to memory pair store
IF memory pair store size >= T
WRITE memory pair store to DB
CLEAR memory pair store
WRITE memory pair store to DB
Example:
Here is an implementation of these ideas in C++ that finds ~6,000,000 pairs in 100,000 records in 40 seconds on a modest laptop.
#include <string>
#include <fstream>
#include <sstream>
#include <iostream>
#include <vector>
#include <algorithm>
#include <time.h>
#include "sqlite3.h"
#include "cRunWatch.h" // https://ravenspoint.wordpress.com/2010/06/16/timing/
std::vector<std::vector<int>> vdata;
class cPairStorage
{
std::vector<std::pair<int, int>> vPair;
sqlite3 *db;
char *dbErrMsg;
int transactionCount;
public:
cPairStorage();
void add(int r1, int r2)
{
vPair.push_back(std::make_pair(r1, r2));
if (vPair.size() > transactionCount)
writeDB();
}
void writeDB();
int count();
std::pair<int, int> get(int index);
};
cPairStorage pairStore;
cPairStorage::cPairStorage()
: transactionCount(500)
{
int ret = sqlite3_open("pair.db", &db);
if (ret)
throw std::runtime_error("failed to open db");
ret = sqlite3_exec(db,
"CREATE TABLE IF NOT EXISTS pair (r1, r2);",
0, 0, &dbErrMsg);
ret = sqlite3_exec(db,
"DELETE FROM pair;",
0, 0, &dbErrMsg);
ret = sqlite3_exec(db,
"PRAGMA schema.synchronous = 0;",
0, 0, &dbErrMsg);
}
void cPairStorage::writeDB()
{
//raven::set::cRunWatch aWatcher("writeDB");
sqlite3_stmt *stmt;
int ret = sqlite3_prepare_v2(
db,
"INSERT INTO pair VALUES ( ?1, ?2 );",
-1, &stmt, 0);
ret = sqlite3_exec(
db,
"BEGIN TRANSACTION;",
0, 0, &dbErrMsg);
for (auto &p : vPair)
{
ret = sqlite3_bind_int(stmt, 1, p.first);
ret = sqlite3_bind_int(stmt, 2, p.second);
ret = sqlite3_step(stmt);
ret = sqlite3_reset(stmt);
}
ret = sqlite3_exec(
db,
"END TRANSACTION;",
0, 0, &dbErrMsg);
//std::cout << "stored " << vPair.size() << "\n";
vPair.clear();
}
int cPairStorage::count()
{
int ret;
sqlite3_stmt *stmt;
ret = sqlite3_prepare_v2(
db,
"SELECT count(*) FROM pair;",
-1, &stmt, 0);
ret = sqlite3_step(stmt);
int count = sqlite3_column_int(stmt, 0);
ret = sqlite3_reset(stmt);
return count;
}
std::pair<int, int> cPairStorage::get(int index)
{
if (0 > index || index >= count())
throw std::runtime_error("bad pair index");
std::pair<int, int> pair;
int ret;
sqlite3_stmt *stmt;
ret = sqlite3_prepare_v2(
db,
"SELECT * FROM pair WHERE rowid = ?1;",
-1, &stmt, 0);
ret = sqlite3_bind_int(stmt, 1, index);
ret = sqlite3_step(stmt);
pair.first = sqlite3_column_int(stmt, 0);
pair.second = sqlite3_column_int(stmt, 1);
ret = sqlite3_reset(stmt);
return pair;
}
void generateRandom(
int colCount,
int rowCount,
int maxValue)
{
srand(time(NULL));
for (int krow = 0; krow < rowCount; krow++)
{
std::vector<int> vrow;
for (int kcol = 0; kcol < colCount; kcol++)
vrow.push_back(rand() % maxValue + 1);
vdata.push_back(vrow);
}
}
bool isPair(int r1, int r2)
{
auto &v1 = vdata[r1];
auto &v2 = vdata[r2];
for (int kc1 = 0; kc1 < v1.size(); kc1++)
{
for (int kc2 = kc1 + 1; kc2 < v1.size(); kc2++)
{
int tv = v1[kc1];
if (tv != v1[kc2])
continue;
if (tv != v2[kc1])
continue;
if (tv != v2[kc2])
continue;
return true;
}
}
return false;
}
void findPairs()
{
raven::set::cRunWatch aWatcher("findPairs");
int colCount = vdata[0].size();
for (int kr1 = 0; kr1 < vdata.size(); kr1++)
{
bool pairPossible = false;
for (int kc1 = 0; kc1 < colCount; kc1++) {
for (int kc2 = kc1 + 1; kc2 < colCount; kc2++) {
if (vdata[kr1][kc1] == vdata[kr1][kc2])
{
// row has two cols with equal values
// so it can be part of a row pair
pairPossible = true;
break;
}
}
if (!pairPossible)
break;
}
if (!pairPossible)
continue;
for (int kr2 = kr1 + 1; kr2 < vdata.size(); kr2++)
if (isPair(kr1, kr2))
pairStore.add(kr1, kr2);
}
pairStore.writeDB();
}
void display()
{
std::cout << "\nFound " << pairStore.count() << " pairs in " << vdata.size() << " records\n\n";
std::cout << "First 2 pairs found:\n\n";
for (int kp = 0; kp < 2; kp++)
{
auto p = pairStore.get(kp+1);
for (int v : vdata[p.first])
std::cout << v << " ";
std::cout << "\n";
for (int v : vdata[p.second])
std::cout << v << " ";
std::cout << "\n\n";
}
raven::set::cRunWatch::Report();
}
main(int ac, char *argc[])
{
int rowCount = 10;
if (ac == 2)
rowCount = atoi(argc[1]);
raven::set::cRunWatch::Start();
generateRandom(
5, // columns
rowCount, // rows
20); // max value
findPairs();
display();
return 0;
}
Output from a test run
>matcher --rows 100000 --trans 10000 --seed 571
unit tests passed
Found 6238872 pairs in 100000 records
First 2 pairs found:
4 4 13 18 18
4 4 1 10 7
4 4 13 18 18
4 4 11 3 1
raven::set::cRunWatch code timing profile
Calls Mean (secs) Total Scope
1 40.3924 40.3924 findPairs
Complete application with documentation in github repo https://github.com/JamesBremner/RecordMatcher
Multithreading
It is straightforward to split the data to be searched into two parts and search each part in its own thread. As often happens with multithreading applications the performance results at first are disappointing. However, by tuning the configuration parameters, I have achieved what seems like a worthwhile improvement.
Finds ~6,000,000 pairs in 100,000 records in 30 seconds on a modest laptop.
>matcher --rows 100000 --trans 10000 --seed 571 --multi
unit tests passed
Found 6238872 pairs in 100000 records
First 2 pairs found:
4 4 13 18 18
4 4 1 10 7
4 4 13 18 18
4 4 11 3 1
raven::set::cRunWatch code timing profile
Calls Mean (secs) Total Scope
1 29.6909 29.6909 findPairs