pythonpython-3.xnumpysetn-gram

Fastest way to compute n-gram overlap matrix in Python


I have a large corpus of documents that I want to merge if they have a notable n-gram overlap (in my case, I'm considering bigrams). Consider the list of sets:

corpus = [{'example', 'bigram'}, {'another', 'example'}, {'some', 'outlier'}]

I have the following code to compute the similarity matrix:

import numpy as np

sim_matrix = np.zeros((len(corpus), len(corpus)))
for i in range(len(corpus)):
    for j in range(i+1, len(corpus)):
        sim_matrix[i][j] = get_ngram_overlap(corpus[i], corpus[j])
        
np.fill_diagonal(sim_matrix, 1)

Where the get_ngram_overlap function is defined as follows:

def get_ngram_overlap(ngrams_s1, ngrams_s2):

    if min(len(ngrams_s1), len(ngrams_s2)) == 0:
        return 0

    common_ngrams = ngrams_s1 & ngrams_s2

    return len(common_ngrams)/min(len(ngrams_s1), len(ngrams_s2))

The result is a MxM matrix, where the M is the number of n-grams in my corpus

print(sim_matrix)
>> [[1.  0.5  0.]
   [0.  1.   0.]
   [0.  0.   1.]]

The problem is, when the M grows large, the code is really slow and this becomes really computationally expensive, because of the necessity to compare all unique pairs. Are there ways to compute it more efficiently? Like using an optimized distance matrix computation with a custom similarity metric that would work for sets of strings. Any help is appreciated!


Solution

  • You can make the computation faster to first build an index and run the get_ngram_overlap() only on combinations which have common words:

    import numpy as np
    from itertools import combinations
    
    # make the items in corpus frozensets (to make them hashable)
    corpus = [frozenset({'example', 'bigram'}), frozenset({'another', 'example'}), frozenset({'some', 'outlier'})]
    
    def get_ngram_overlap(ngrams_s1, ngrams_s2):
        mn = min(len(ngrams_s1), len(ngrams_s2))
        if mn == 0:
            return 0
        common_ngrams = ngrams_s1 & ngrams_s2
        return len(common_ngrams)/mn
    
    
    index, nums = {}, {}
    for i, b in enumerate(corpus):
        for word in b:
            index.setdefault(word, []).append(b)
        nums.setdefault(b, []).append(i)
    
    index2 = {}
    for k, v in index.items():
        for c in combinations(v, 2):
            index2[c] = get_ngram_overlap(*c)
    
    sim_matrix = np.zeros((len(corpus), len(corpus)))
    
    for a, b in index2:
        for x in nums[a]:
            for y in nums[b]:
                sim_matrix[(x, y) if x < y else (y, x)] = index2[(a, b)]
    
    np.fill_diagonal(sim_matrix, 1)
    print(sim_matrix)
    

    Prints:

    [[1.  0.5 0. ]
     [0.  1.  0. ]
     [0.  0.  1. ]]
    

    A benchmark with 10k bigrams of 500 different words:

    import random
    import numpy as np
    from timeit import timeit
    from itertools import combinations
    
    
    random.seed(123)
    corpus = [frozenset({f'word{random.randint(1, 500)}', f'word{random.randint(1, 500)}'}) for _ in range(10_000)]
    
    def get_ngram_overlap(ngrams_s1, ngrams_s2):
        mn = min(len(ngrams_s1), len(ngrams_s2))
        if mn == 0:
            return 0
        common_ngrams = ngrams_s1 & ngrams_s2
        return len(common_ngrams)/mn
    
    
    def fn1():
        sim_matrix = np.zeros((len(corpus), len(corpus)))
        for i in range(len(corpus)):
            for j in range(i+1, len(corpus)):
                sim_matrix[i][j] = get_ngram_overlap(corpus[i], corpus[j])
    
        np.fill_diagonal(sim_matrix, 1)
        return sim_matrix
    
    def fn2():
        index, nums = {}, {}
        for i, b in enumerate(corpus):
            for word in b:
                index.setdefault(word, []).append(b)
            nums.setdefault(b, []).append(i)
    
        index2 = {}
        for k, v in index.items():
            for c in combinations(v, 2):
                index2[c] = get_ngram_overlap(*c)
    
        sim_matrix = np.zeros((len(corpus), len(corpus)))
    
        for a, b in index2:
            for x in nums[a]:
                for y in nums[b]:
                    sim_matrix[(x, y) if x < y else (y, x)] = index2[(a, b)]
    
        np.fill_diagonal(sim_matrix, 1)
        return sim_matrix
    
    assert np.array_equal(fn1(), fn2())
    
    t1 = timeit(fn1, number=1)
    t2 = timeit(fn2, number=1)
    
    print(t1)
    print(t2)
    

    Prints on my machine (Python 3.10/AMD Ryzen 5700x):

    19.253960204077885
    0.37714757514186203