clistperformancedefragmentation

Efficient list compacting


Suppose you have a list of unsigned ints. Suppose some elements are equal to 0 and you want to push them back. Currently I use this code (list is a pointer to a list of unsigned ints of size n

for (i = 0; i < n; ++i) 
{    
    if (list[i])
        continue;
    int j;
    for (j = i + 1; j < n && !list[j]; ++j);
    int z;
    for (z = j + 1; z < n && list[z]; ++z);
    if (j == n)
        break;
    memmove(&(list[i]), &(list[j]), sizeof(unsigned int) * (z - j)));
    int s = z - j + i;
    for(j = s; j < z; ++j) 
        list[j] = 0;
    i = s - 1;
} 

Can you think of a more efficient way to perform this task?

The snippet is purely theoretical, in the production code, each element of list is a 64 bytes struct

EDIT: I'll post my solution. Many thanks to Jonathan Leffler.

void RemoveDeadParticles(int * list, int * n)
{
    int i, j = *n - 1;
    for (; j >= 0 && list[j] == 0; --j);
    for (i = 0; i <= j; ++i)
    {   
        if (list[i])
            continue;
        memcpy(&(list[i]), &(list[j]), sizeof(int));
        list[j] = 0;
        for (; j >= 0 && list[j] == 0; --j);
        if (i == j)
            break;
    }   

    *n = i;
}

Solution

  • The code below implements the linear algorithm I outlined in a comment:

    There's a straight linear O(N) algorithm if you're careful; your's is O(N2). Given that order is irrelevant, every time you encounter a zero going forwards through the array, you swap it with the last element that might not be a zero. That's one pass through the array. Care is required on the boundary conditions.

    Care was required; the acid test of list3[] in the test code caused grief until I got the the boundary conditions right. Note that a list of size 0 or 1 is already in the correct order.

    #include <stdio.h>
    #define DIM(x)  (sizeof(x)/sizeof(*(x)))
    
    extern void shunt_zeroes(int *list, size_t n);
    
    void shunt_zeroes(int *list, size_t n)
    {
        if (n > 1)
        {
            size_t tail = n;
            for (size_t i = 0; i < tail - 1; i++)
            {
                if (list[i] == 0)
                {
                    while (--tail > i + 1 && list[tail] == 0)
                        ;
                    if (tail > i)
                    {
                        int t = list[i];
                        list[i] = list[tail];
                        list[tail] = t;
                    }
                }
            }
        }
    }
    
    static void dump_list(const char *tag, int *list, size_t n)
    {
        printf("%-8s", tag);
        for (size_t i = 0; i < n; i++)
        {
            printf("%d ", list[i]);
        }
        putchar('\n');
        fflush(0);
    }
    
    static void test_list(int *list, size_t n)
    {
        dump_list("Before:", list, n);
        shunt_zeroes(list, n);
        dump_list("After:", list, n);
    }
    
    int main(void)
    {
        int list1[] = { 1, 0, 2, 0, 3, 0, 4, 0, 5 };
        int list2[] = { 1, 2, 2, 0, 3, 0, 4, 0, 0 };
        int list3[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0 };
        int list4[] = { 0, 1 };
        int list5[] = { 0, 0 };
        int list6[] = { 0 };
        test_list(list1, DIM(list1));
        test_list(list2, DIM(list2));
        test_list(list3, DIM(list3));
        test_list(list4, DIM(list4));
        test_list(list5, DIM(list5));
        test_list(list6, DIM(list6));
    }
    

    Example run:

    $ shuntzeroes
    Before: 1 0 2 0 3 0 4 0 5 
    After:  1 5 2 4 3 0 0 0 0 
    Before: 1 2 2 0 3 0 4 0 0 
    After:  1 2 2 4 3 0 0 0 0 
    Before: 0 0 0 0 0 0 0 0 0 
    After:  0 0 0 0 0 0 0 0 0 
    Before: 0 1 
    After:  1 0 
    Before: 0 0 
    After:  0 0 
    Before: 0 
    After:  0 
    $
    

    Complexity of code

    I've asserted that the original code in the question and in the answer by UmNyobe is O(N2) but that this is O(N). However, there is a loop inside a loop in all three cases; why is this answer linear when the others are O(N2)?

    Good question!

    The difference is that the inner loop in my code scans backwards over the array, finding a non-zero value to swap with the zero that was just found. In doing so, it reduces the work to be done by the outer loop. So, the i index scans forward, once, and the tail index scans backwards once, until the two meet in the middle. By contrast, in the original code, the inner loops start at the current index and work forwards to the end each time, which leads to the quadratic behaviour.


    Demonstration of complexity

    Both UmNyobe and Argeman have asserted that the code in UmNyobe's answer is linear, O(N), and not quadratic, O(N2) as I asserted in comments to the answer. Given two counter-views, I wrote a program to check my assertions.

    Here is the result of a test that amply demonstrates this. The code described by "timer.h" is my platform neutral timing interface; its code can be made available on request (see my profile). The test was performed on a MacBook Pro with 2.3 GHz Intel Core i7, Mac OS X 10.7.5, GCC 4.7.1.

    The only changes I made to UmNyobe's code were to change the array indexes from int to size_t so that the external function interface was the same as mine, and for internal consistency.

    The test code includes a warmup exercise to show that the functions produce equivalent answers; UmNyobe's answer preserves order in the array and mine does not. I've omitted that information from the timing data.

    $ make on2
    gcc -O3 -g -I/Users/jleffler/inc -std=c99 -Wall -Wextra -L/Users/jleffler/lib/64 on2.c -ljl -o on2
    $
    

    Timing

    Set 1: on an older version of the test harness without UmNyobe's amended algorithm.

    shunt_zeroes:        100    0.000001
    shunt_zeroes:       1000    0.000005
    shunt_zeroes:      10000    0.000020
    shunt_zeroes:     100000    0.000181
    shunt_zeroes:    1000000    0.001468
    pushbackzeros:       100    0.000001
    pushbackzeros:      1000    0.000086
    pushbackzeros:     10000    0.007003
    pushbackzeros:    100000    0.624870
    pushbackzeros:   1000000   46.928721
    shunt_zeroes:        100    0.000000
    shunt_zeroes:       1000    0.000002
    shunt_zeroes:      10000    0.000011
    shunt_zeroes:     100000    0.000113
    shunt_zeroes:    1000000    0.000923
    pushbackzeros:       100    0.000001
    pushbackzeros:      1000    0.000097
    pushbackzeros:     10000    0.007077
    pushbackzeros:    100000    0.628327
    pushbackzeros:   1000000   41.512151
    

    There was at most a very light background load on the machine; I'd suspended the Boinc calculations that I normally have running in the background, for example. The detailed timing isn't as stable as I'd like, but the conclusion is clear.

    Set 2: With UmNyobe's amended algorithm

    Also including Patrik's before and after algorithms, and Wildplasser's algorithm (see source below); test program renamed from on2 to timezeromoves.

    $ ./timezeromoves -c -m 100000 -n 1
    shunt_zeroes: (Jonathan)
    shunt_zeroes:        100    0.000001
    shunt_zeroes:       1000    0.000003
    shunt_zeroes:      10000    0.000018
    shunt_zeroes:     100000    0.000155
    RemoveDead: (Patrik)
    RemoveDead:          100    0.000001
    RemoveDead:         1000    0.000004
    RemoveDead:        10000    0.000018
    RemoveDead:       100000    0.000159
    pushbackzeros2: (UmNyobe)
    pushbackzeros2:      100    0.000001
    pushbackzeros2:     1000    0.000005
    pushbackzeros2:    10000    0.000031
    pushbackzeros2:   100000    0.000449
    list_compact: (Wildplasser)
    list_compact:        100    0.000004
    list_compact:       1000    0.000005
    list_compact:      10000    0.000036
    list_compact:     100000    0.000385
    shufflezeroes: (Patrik)
    shufflezeroes:       100    0.000003
    shufflezeroes:      1000    0.000069
    shufflezeroes:     10000    0.006685
    shufflezeroes:    100000    0.504551
    pushbackzeros: (UmNyobe)
    pushbackzeros:       100    0.000003
    pushbackzeros:      1000    0.000126
    pushbackzeros:     10000    0.011719
    pushbackzeros:    100000    0.480458
    $
    

    This shows that UmNyobe's amended algorithm is O(N), as are the other solutions. The original code is shown to be O(N2), as was UmNyobe's original algorithm.

    Source

    This is the amended test program (renamed to testzeromoves.c). The algorithm implementations are at the top. The test harness is after the comment 'Test Harness'. The command can do the checks or the timing or both (default); it does two iterations by default; it goes up to a size of one million entries by default. You can use the -c flag to omit checking, the -t flag to omit timing, the -n flag to specify the number of iterations, and the -m flag to specify the maximum size. Be cautious about going above one million; you'll probably run into issues with the VLA (variable length array) blowing the stack. It would be easy to modify the code to use malloc() and free() instead; it doesn't seem necessary, though.

    #include <string.h>
    
    #define MAX(x, y)   (((x) > (y)) ? (x) : (y))
    
    extern void shunt_zeroes(int *list, size_t n);
    extern void pushbackzeros(int *list, size_t n);
    extern void pushbackzeros2(int *list, size_t n);
    extern void shufflezeroes(int *list, size_t n);
    extern void RemoveDead(int *list, size_t n);
    extern void list_compact(int *arr, size_t cnt);
    
    void list_compact(int *arr, size_t cnt)
    {
        size_t dst,src,pos;
    
        /* Skip leading filled area; find start of blanks */
        for (pos=0; pos < cnt; pos++) {
            if ( !arr[pos] ) break;
        }
        if (pos == cnt) return;
    
        for(dst= pos; ++pos < cnt; ) { 
            /* Skip blanks; find start of filled area */
            if ( !arr[pos] ) continue;
    
            /* Find end of filled area */
            for(src = pos; ++pos < cnt; ) {
                if ( !arr[pos] ) break;
            }   
            if (pos > src) {
                memmove(arr+dst, arr+src, (pos-src) * sizeof arr[0] );
                dst += pos-src;
            }   
        }
    }
    
    /* Cannot change j to size_t safely; algorithm relies on it going negative */
    void RemoveDead(int *list, size_t n)
    {
        int i, j = n - 1;
        for (; j >= 0 && list[j] == 0; --j)
            ;
        for (i = 0; i <= j; ++i)
        {   
            if (list[i])
                continue;
            memcpy(&(list[i]), &(list[j]), sizeof(int));
            list[j] = 0;
            for (; j >= 0 && list[j] == 0; --j);
            if (i == j)
                break;
        }   
    }
    
    void shufflezeroes(int *list, size_t n)
    {
        for (size_t i = 0; i < n; ++i) 
        {    
            if (list[i])
                continue;
            size_t j;
            for (j = i + 1; j < n && !list[j]; ++j)
                ;
            size_t z;
            for (z = j + 1; z < n && list[z]; ++z)
                ;
            if (j == n)
                break;
            memmove(&(list[i]), &(list[j]), sizeof(int) * (z - j));
            size_t s = z - j + i;
            for(j = s; j < z; ++j) 
                list[j] = 0;
            i = s - 1;
        } 
    }
    
    static int nextZero(int* list, size_t start, size_t n){
       size_t i = start;
       while(i < n && list[i])
            i++;
       return i;
    }
    
    static int nextNonZero(int* list, size_t start, size_t n){
       size_t i = start;
       while(i < n && !list[i])
            i++;
       return i;
    }
    
    void pushbackzeros(int* list, size_t n){
        size_t i = 0;
        size_t j = 0;
        while(i < n && j < n){
             i = nextZero(list,i, n);
             j = nextNonZero(list,i, n);
             if(i >= n || j >=n)
                 return;
             list[i] = list[j];
             list[j] = 0;
        }
    }
    
    /* Amended algorithm */
    void pushbackzeros2(int* list, size_t n){
        size_t i = 0;
        size_t j = 0;
        while(i < n && j < n){
             i = nextZero(list, i, n);
             j = nextNonZero(list, MAX(i,j), n);
             if(i >= n || j >=n)
                 return;
             list[i] = list[j];
             list[j] = 0;
        }
    }
    
    void shunt_zeroes(int *list, size_t n)
    {
        if (n > 1)
        {
            size_t tail = n;
            for (size_t i = 0; i < tail - 1; i++)
            {
                if (list[i] == 0)
                {
                    while (--tail > i + 1 && list[tail] == 0)
                        ;
                    if (tail > i)
                    {
                        int t = list[i];
                        list[i] = list[tail];
                        list[tail] = t;
                    }
                }
            }
        }
    }
    
    /* Test Harness */
    
    #include <unistd.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include "timer.h"
    
    #define DIM(x)      (sizeof(x)/sizeof(*(x)))
    
    typedef void (*Shunter)(int *list, size_t n);
    
    typedef struct FUT      /* FUT = Function Under Test */
    {
        Shunter function;
        const char *name;
        const char *author;
    } FUT;
    
    static int tflag = 1;   /* timing */
    static int cflag = 1;   /* checking */
    static size_t maxsize = 1000000;
    
    static void dump_list(const char *tag, int *list, size_t n)
    {
        printf("%-8s", tag);
        for (size_t i = 0; i < n; i++)
        {
            printf("%d ", list[i]);
        }
        putchar('\n');
        fflush(0);
    }
    
    static void test_list(int *list, size_t n, Shunter s)
    {
        dump_list("Before:", list, n);
        (*s)(list, n);
        dump_list("After:", list, n);
    }
    
    static void list_of_tests(const FUT *f)
    {
        int list1[] = { 1, 0, 2, 0, 3, 0, 4, 0, 5 };
        int list2[] = { 1, 2, 2, 0, 3, 0, 4, 0, 0 };
        int list3[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0 };
        int list4[] = { 0, 1 };
        int list5[] = { 0, 0 };
        int list6[] = { 0 };
    
        test_list(list1, DIM(list1), f->function);
        test_list(list2, DIM(list2), f->function);
        test_list(list3, DIM(list3), f->function);
        test_list(list4, DIM(list4), f->function);
        test_list(list5, DIM(list5), f->function);
        test_list(list6, DIM(list6), f->function);
    }
    
    static void test_timer(int *list, size_t n, const FUT *f)
    {
        Clock t;
        clk_init(&t);
        clk_start(&t);
        f->function(list, n);
        clk_stop(&t);
        char buffer[32];
        printf("%-15s  %7zu  %10s\n", f->name, n, clk_elapsed_us(&t, buffer, sizeof(buffer)));
        fflush(0);
    }
    
    static void gen_test(size_t n, const FUT *f)
    {
        int list[n];
        for (size_t i = 0; i < n/2; i += 2)
        {
            list[2*i+0] = i;
            list[2*i+1] = 0;
        }   
        test_timer(list, n, f);
    }
    
    static void timed_run(const FUT *f)
    {
        printf("%s (%s)\n", f->name, f->author);
        if (cflag)
            list_of_tests(f);
        if (tflag)
        {
            for (size_t n = 100; n <= maxsize; n *= 10)
                gen_test(n, f);
        }
    }
    
    static const char optstr[] = "cm:n:t";
    static const char usestr[] = "[-ct][-m maxsize][-n iterations]";
    
    int main(int argc, char **argv)
    {
        FUT functions[] =
        {
            { shunt_zeroes,   "shunt_zeroes:",   "Jonathan"    },   /* O(N) */
            { RemoveDead,     "RemoveDead:",     "Patrik"      },   /* O(N) */
            { pushbackzeros2, "pushbackzeros2:", "UmNyobe"     },   /* O(N) */
            { list_compact,   "list_compact:",   "Wildplasser" },   /* O(N) */
            { shufflezeroes,  "shufflezeroes:",  "Patrik"      },   /* O(N^2) */
            { pushbackzeros,  "pushbackzeros:",  "UmNyobe"     },   /* O(N^2) */
        };
        enum { NUM_FUNCTIONS = sizeof(functions)/sizeof(functions[0]) };
        int opt;
        int itercount = 2;
    
        while ((opt = getopt(argc, argv, optstr)) != -1)
        {
            switch (opt)
            {
            case 'c':
                cflag = 0;
                break;
            case 't':
                tflag = 0;
                break;
            case 'n':
                itercount = atoi(optarg);
                break;
            case 'm':
                maxsize = strtoull(optarg, 0, 0);
                break;
            default:
                fprintf(stderr, "Usage: %s %s\n", argv[0], usestr);
                return(EXIT_FAILURE);
            }
        }
    
        for (int i = 0; i < itercount; i++)
        {
            for (int j = 0; j < NUM_FUNCTIONS; j++)
                timed_run(&functions[j]);
            if (tflag == 0)
                break;
            cflag = 0;  /* Don't check on subsequent iterations */
        }
    
        return 0;
    }