pythonstringalgorithm

Algorithm to extract the common part of all strings in a list


My goal is to classify and show the types of errors in log files. I've solved the part of clustering them into buckets, but now I want to show the user the error string that is common across all the errors in each bucket. Right now I'm showing the user the first string of the bucket, but it would be nicer to show them a glob expression or a regex that matches all the strings in the bucket but that keeps the common parts of the strings and substitutes the different parts with something that matches those different parts (maximizing the amount of characters that match).

For example, if I have these error strings in a bucket:

"Error: [MODULE_FOO] File foo has 5 unsolved dependencies and 4 errors."
"Error: [MODULE_BLA] Files bar and yaz have 123 unsolved dependencies."
"Error: [MODULE_123] File baz has 45 unsolved dependencies and 3 warnings."

The common glob string that matches all of them would be: "Error: [MODULE_*] File* * ha* * unsolved dependencies*."

This example is just to illustrate how the different parts of the strings are converted to asterisks and the common parts are kept as they are. Don't assume this is the actual structure of the errors.

I'm working on Python, but if there's no library that can help me, any non-language specific pseudo-code is also welcome.

To cluster the strings I'm using difflib. Here's the code, I'm not sure it's relevant for the question, but I wonder if I could use more of difflib's functionality to also help create this algorithm to extract common parts between strings.

def cluster_strings(strings: list[str], threshold: float = 0.9) -> list[list[str]]:
    clusters = []
    cluster_num = 0
    while len(strings) > 0:
        string = strings.pop()
        clusters.append([string])
        indexes_to_remove = []
        for idx, string_to_match in enumerate(strings):
            if difflib.SequenceMatcher(None, string, string_to_match).quick_ratio() > threshold:
                clusters[cluster_num].append(string_to_match)
                indexes_to_remove.append(idx)
        strings = [string for idx, string in enumerate(strings) if idx not in indexes_to_remove]
        cluster_num += 1
    return clusters

Solution

  • I found a way of doing it with the help of the difflib library.

    This is the "dirty" way that is not 100% technically correct. It fails to get the best answer in corner cases like ["ab", "ba", "cb", "bc"], where the result is "*" instead of "*b*". For most real world use cases this is probably good enough.

    import difflib
    
    def globify_strings(strings: list[str]) -> str:
        if len(strings) == 0:
            return ""
        result = strings[0]
        if len(strings) == 1:
            return result
        for string in strings[1:]:
            # Reverse the opcodes so that indexes don't change as the operations are applied.
            for tag, i1, i2, _j1, _j2 in reversed(difflib.SequenceMatcher(None, result, string).get_opcodes()):
                if tag != "equal":
                    result = result[:i1] + "*" + result[i2:]
        return result
    

    The result using the example as input: "Error: [MODULE_*] File* * ha* * unsolved dependencies*."

    This is a slower version that tries to be more exhaustive and gets rid of bad answers for those corner cases. It calls the other function multiple times, each time swapping the first element in the list for a different one, going through all of them.

    import difflib
    
    def globify_strings_exhaustive(strings: list[str]) -> str:
        if len(strings) == 0:
            return ""
        result = strings[0]
        if len(strings) == 1:
            return result
        globs = set()
        for i, string in enumerate(strings):
            globs.add(globify_strings([string] + test_strings[:i] + test_strings[i + 1 :]))
        return sorted(globs, key=len, reverse=True)[0]
    

    Here is a version that remove duplicates first and then caches results. I tested this with ~1000 strings and it reduced the execution time from ~17 seconds to ~0.20 seconds.

    import difflib
    
    _sequence_matcher_cache: dict[tuple[str, str], str] = {}
    
    def _globify_strings_internal_cached(strings: list[str]) -> str:
        result = strings[0]
        for string in strings[1:]:
            if (result, string) not in _sequence_matcher_cache:
                result_prev = copy.copy(result)
                for tag, i1, i2, _j1, _j2 in reversed(difflib.SequenceMatcher(None, result, string).get_opcodes()):
                    if tag != "equal":
                        result = result[:i1] + "*" + result[i2:]
                _sequence_matcher_cache[(result_prev, string)] = result
            else:
                result = _sequence_matcher_cache[(result, string)]
        return result
    
    def globify_strings_exhaustive(strings: list[str]) -> str:
        strings = list(set(strings))  # Remove duplicates.
        if len(strings) == 0:
            return ""
        if len(strings) == 1:
            return strings[0]
        results = []
        for i, string in enumerate(strings):
            cmp_strings = [string] + strings[:i] + strings[i + 1 :]
            results.append(_globify_strings_internal_cached(cmp_strings))
        _sequence_matcher_cache.clear()
        return max(results, key=len)