cmultiprocessingexecution-time

Same function takes more time to execute in child


SHORT: My problem is that the following function which sum the content of a given array in a given range, doesn't take the same execution time for the same task as the parent if called from a child. And by same I mean similar values. Because after some tests, the differences is for about a ~40% more for the child to execute.

LONG: I'm studying computer science and I'm working on the final project of a course. The problem is to sum all the cells of an int array by n processes distributing the workload. then confront it to a single calculation made by parent. The point is to demonstrate that, with big arrays, multiple process can reduce the execution time. However, the sum of all children's times is always more than the parent's even with 50milions data.

Following just the function and the struct I use to pass results.

typedef struct cResults{
    double time;
    int result;
}cResult;

cResult arraySum(int start, int length, int* dataset)
{
    /*Time unit to calculate the computation time*/
    clock_t tic, toc = 0;
    cResult chunk = {.result = 0, .time =0};
    
    tic = clock(); // Start measure time

    for (int i = start; i < start + length; i++)
    {
        chunk.result += dataset[i];
    }

    toc = clock();  // Stop measure time
    chunk.time = ((double) (toc - tic))/ CLOCKS_PER_SEC;

    printf("B: start at: %d, length: %d, sum: %d, in: %f\n", start, length, chunk.result, chunk.time); //! DEBUG

    return chunk; 
}

WHAT I'VE TRIED SO FAR:

  1. Since the array is dynamically allocated, I've thought that it could be a bottleneck on the memory access. However, this question (Malloc returns same address in parent and child process after fork) lifted all doubt that even if heap allocated, they are not the same, but a copy of the array.
  2. I've double checked that the parent will sum correctly and only once the elapsed time communicated by all the children, and then added the print() statement just to read and sum manually on the terminal all the results. And again, all checks.
  3. I've tried moving the function call by parent from before to after all children were done, but no changes, then I've tried make the parent sleep() right after fork() (this was counterproductive for the purpose of the project but just to make sure) for avoiding resource queue.
  4. The random numbers in the array are produced in a repeatable way through a seed, so I've tried same datasets that of course will give almost identical outputs, and again times will change slightly yet maintaining the single execution faster.
  5. Ultimately I've tried to fork a single child and make it calculate the same range as the parent (so all the array). The time is on average 45% slower on children.

Surely I'm missing a simple thing but i run out of ideas... please be patient I'm learning by my self at my best.


UPDATE 1:

Since I've been asked for a full program, I've refactored mine. However, whereas the project required a single source file I've removed all of non regarding parts of our issue, hence it should be lighter. Unfortunately the frame you'll see that handle the pipe communications is a bit complex I'm afraid but it serve for other purpose that has been removed yet essential to make this works. I do hope it won't be a problem.

#include <stdio.h> 
#include <stdlib.h>    
#include <stdbool.h>  
#include <limits.h>     
#include <string.h>    
#include <time.h>      
#include <unistd.h>     
#include <sys/types.h>  
#include <sys/wait.h>   

/* ----- PROTOTYPES ----- */

#define MIN 5           // Least number of process and threads
#define MAX 10          // Maximum number of process and threads
#define MSGSIZE 4       // Size of pipe messages

/* =================
    *PIPE MESSAGES*

sum = Sum from the next given start to the next given roof.
end = End process
tmr = Return the computation time measured by the process

================= */ 

/// @brief Struct containing proces id (pid), the result and it's pipe file descriptors
struct process{
    pid_t pid;
    long int result;
    bool done;
    double time;
    int fd[2];
};

/// @brief Struct that containts the result of the computation
typedef struct cResults{
    double time;
    long int  result;
}cResult;

/// @brief W = write, R = read
enum PIPECONTROLLER{
    R = 0,  
    W = 1  
};

/* ----- PROTOTYPES ----- */
cResult arraySum(int start, int length, int* dataset);

int main()
{   
    /* =================
        *USER DEFINED*
    ================= */ 
    int dataLength = 50000000; // Set the length of the dataset
    int nProcess = 1;          // How many child process do you want
    unsigned int seed = 1;     // Change the randomization seed of the data

    // System
    int* data;
    cResult chunk; // Used for storing temporary computational values

    // Task related
    int taskLenght;
    int remainder;

    // Pipe related
    char pipeMSG[MSGSIZE];
    int msgCheck = 1;

    /// @brief Processes dashboard
    struct process processes[MAX + 1] = { {.pid = 0, .result = 0, .done = false, .time = 0} };

    data = malloc(sizeof(int) * dataLength);

    srand(seed);

    for (int i = 0; i < dataLength; i++)
    {
        /*Random population between 0-100*/
        data[i] = rand() % 100;
    }

    chunk = arraySum(0, dataLength, data);

    processes[nProcess + 1].result = chunk.result;
    processes[nProcess + 1].time = chunk.time;

    printf("\nCHECK SUM: %ld\n", chunk.result);// ! Debug

    #pragma region "START PROCESSES"

    /*Calculate how to separate the overhead for the processes*/
    taskLenght = dataLength / nProcess;
    remainder = dataLength % nProcess;

    pid_t myPid = 0;
    int startPoint = 0;

    processes[nProcess + 1 ].pid = getpid();

    /*Open child to parent pipe*/
    if (pipe(processes[nProcess + 1 ].fd) == -1)
    {
        printf("Failed to open pipe on parent\n");
        return 1;
    }
    
    for (int i = 0; i < nProcess; i++)
    {
        /*Open new parent to child pipe*/
        if (pipe(processes[i].fd) == -1)
        {
            printf("Failed to open pipe on parent\n");
            return 1;
        }

        myPid = fork();        

        switch (myPid)
        {
            case -1: // Error on fork
                printf("An error occured while forking the %d process.\n", i);
                return 1;
            break;

            case 0: // Child case
                /*Record pid in the dashboard*/
                processes[i].pid = getpid();

                /*Handle the pipes descriptors*/
                close(processes[i].fd[W]);
                close(processes[nProcess + 1 ].fd[R]);

                i = nProcess;
            break;

            default: // Parent case
                /* Record the pid process into the dashrboard and increment the starting for the next*/
                processes[i].pid = myPid;
                startPoint += taskLenght; 

                /*Handle the pipes descriptors*/
                close(processes[i].fd[R]);
            break;
        }  
    }

    /*=========== CHILD PROCESS HANDLER ===========*/
    if(myPid == 0)
    {
        int myID;
        bool keepAlive = true;

        for(myID = 0; myID < nProcess; myID++)
        {
            if (processes[myID].pid == getpid())
            {
                break;
            }
        }

        /*Calculate first iteration of the sum*/
        cResult temp = arraySum(startPoint, taskLenght, data);
        chunk.result = temp.result;
        chunk.time = temp.time;

        while(keepAlive)
        {            
            /*Comunicate the id of the process*/
            if (write(processes[nProcess + 1 ].fd[W], &myID, sizeof(int)) < 0)
            {
                printf("An error occured from the process %d while sending message to parent\n", getpid());

                return 1;
            }

            /*Communicate the result of the operation*/
            if (write(processes[nProcess + 1 ].fd[W], &chunk.result, sizeof(int)) < 0)
            {
                printf("An error occured from the process %d while sending message to parent\n", getpid());

                return 1;
            }

            /*Communicate the time elapsed for the operation*/
            if (write(processes[nProcess + 1 ].fd[W], &chunk.time, sizeof(double)) < 0)
            {
                printf("An error occured from the process %d while sending message to parent\n", getpid());

                return 1;
            }
            
            /*Waits for further instruction*/
            msgCheck = read(processes[myID].fd[R], pipeMSG, MSGSIZE);

            if(msgCheck < 0)
            {
                printf("An error occured from the process %d while reading message from parent\n", getpid());

                return 1;
            } 
            
            /*Sum command*/
            if(!strcmp(pipeMSG, "sum"))
            {
                msgCheck = read(processes[myID].fd[R], &startPoint, sizeof(int));

                if(msgCheck < 0)
                {
                    printf("An error occured from the process %d while reading message from parent\n", getpid());

                    return 1;
                }

                msgCheck = read(processes[myID].fd[R], &taskLenght, sizeof(int));

                if(msgCheck < 0)
                {
                    printf("An error occured from the process %d while reading message from parent\n", getpid());

                    return 1;
                }

                /*Calculate second iteration for the remaining part*/
                temp = arraySum(startPoint, taskLenght, data);
                chunk.result += temp.result;
                chunk.time += temp.time;
            }
            
            /*Close command*/
            if(!strcmp(pipeMSG, "end"))
            {
                keepAlive = false;
            }
        }

        free(data);
        close(processes[myID].fd[R]);

        exit(0);
    }

    /*=========== PARENT PROCESS HANDLER ===========*/
    if(myPid != 0)
    {
        /*Close descriptor for writing on main pipe.*/
        close(processes[nProcess + 1 ].fd[W]);

        int targetProcess = nProcess + 1; // Target self
        bool onGoing = true;

        chunk.result = 0;
        chunk.time = 0;

        while(onGoing)
        {   
            /*Listen from processes if someone ended the task*/
            msgCheck = read(processes[nProcess + 1 ].fd[R], &targetProcess, sizeof(int));

            if(msgCheck < 0)
            {
                printf("An error occured from the process %d while reading message from parent\n", getpid());

                return 1;
            } 
            
            /*Get result from child process*/
            msgCheck = read(processes[nProcess + 1 ].fd[R], &processes[targetProcess].result, sizeof(int));

            if(msgCheck < 0)
            {
                printf("An error occured from the process %d while reading message from parent\n", getpid());

                return 1;
            }

            /*Get elapsed time from child process*/
            msgCheck = read(processes[nProcess + 1 ].fd[R], &processes[targetProcess].time, sizeof(double));

            if(msgCheck < 0)
            {
                printf("An error occured from the process %d while reading message from parent\n", getpid());

                return 1;
            }

            processes[targetProcess].done = true;

            /*Check if remainder to start new task*/
            if(remainder != 0)
            {
                startPoint = taskLenght * nProcess;

                processes[targetProcess].done = false;

                if (write(processes[targetProcess].fd[W], "sum", MSGSIZE) < 0)
                {
                    printf("An error occured from the process %d while sending message to parent\n", getpid());

                    return 1;
                }

                if (write(processes[targetProcess].fd[W], &startPoint, sizeof(int)) < 0)
                {
                    printf("An error occured from the process %d while sending message to parent\n", getpid());

                    return 1;
                }

                if (write(processes[targetProcess].fd[W], &remainder, sizeof(int)) < 0)
                {
                    printf("An error occured from the process %d while sending message to parent\n", getpid());

                    return 1;
                }

                remainder = 0; //Avoid looping task
            }
            
            /*Check for pending response and process final result*/
            for (int i = 0; i < nProcess; i++)
            {
                if(processes[i].done)
                {
                    chunk.result += processes[i].result;
                    chunk.time += processes[i].time;

                    onGoing = false;
                    continue;
                }
                
                onGoing = true;

                /*Reset total calculations*/
                chunk.result = 0;
                chunk.time = 0;

                break;
            }
            
            /*Reset to self target*/
            targetProcess = nProcess + 1;
        }

        printf("Parent calculated: %ld in = %fs\n", processes[nProcess + 1].result, processes[nProcess + 1].time); //! Debug
        printf("Processes calculated: %ld in = %fs\n", chunk.result, chunk.time); //! Debug
    }
}

cResult arraySum(int start, int length, int* dataset)
{
    /*Time unit to calculate the computation time*/
    clock_t tic, toc = 0;
    cResult chunk = {.result = 0, .time =0};
    
    tic = clock(); // Start measure time

    for (int i = start; i < start + length; i++)
    {
        chunk.result += dataset[i];
    }

    toc = clock();  // Stop measure time
    chunk.time = ((double) (toc - tic))/ CLOCKS_PER_SEC;
    printf("start at: %d, length: %d, sum: %ld, in: %f\n", start, length, chunk.result, chunk.time); //! Debug
    return chunk; 
}

If you want to try this out you'll find in USER DEFINED some variable to play with.

I'll share some of my results, with seed = 1.

LENGTH PARENT 1 CHILD 5 CHILDREN 10 CHILDREN
1'000 0.000001 0.000002 0.000003 0.000006
100'000 0.000085 0.000107 0.000115 0.000120
10'000'000 0.008693 0.015143 0.016120 0.015982
100'000'000 0.089563 0.148095 0.146698 0.149421
500'000'000 0.669474 0.801828 0.744381 0.816883

As you can see even repeating the task in a single child process in some cases required twice the time.

As ArkadiuszDrabczyk pointed out, it could be a scheduling issue, still why has always to be the child the slowest one?


UPDATE 2:

Since pipe's problems arose, I wrote another source just to exclude these concerns and, of course, the problem remained.

However I wasn't convinced about the deep-copy of the dataset[] array stored on heap hence some research later I found this: What is copy-on-write?. After this new found knowledge, I've added this useless function to be called right before the arraySum() :

void justCheck(int start, int length, int* dataset)
{
    for (int i = start; i < start + length; i++)
    {
        dataset[i] = dataset[i];
    }

    return;
}

This little snippet managed to level the differences between times. At least now they lay on the same order of magnitude. Down the results:

LENGTH PARENT 1 CHILD 5 CHILDREN 10 CHILDREN
1'000 0.000002 0.000001 0.000004 0.000003
100'000 0.000099 0.000110 0.000124 0.000121
10'000'000 0.009496 0.008686 0.009316 0.009248
100'000'000 0.090267 0.092168 0.089862 0.093356
500'000'000 - - - -

Unfortunately this edit rise another problem. With big set of data this function freeze. I know this isn't the right way to force the COW so... Any suggestions?


Solution

  • SHORT: Turns out that in order to prove the thesis of the project I was using the wrong approach. Summing all of the clock times taken by each processes and comparing to the single one is like trying to prove that baking 4 pieces of bread in a row takes less than baking all simultaneously counting the oven time instead of watching my watch. I should have measured the wall time instead of clock time.

    LONG: That said, regarding why the same function called by only one child takes more time than parent: As I said in my 2nd update I've managed to trim time to a more plausible value forcing the copy-on-write of used heap on child memory page before calling the actual calculating function. So after several experiments, turns out it's aleatory. Sometimes it's faster, sometimes is almost equal and sometimes slower. So, I thinks it depends on how the task scheduling works, which I don't have control on it neither know how to.

    Talking about the other problem mentioned:

    Unfortunately this edit rise another problem. With big set of data this function freeze. I know this isn't the right way to force the COW so... Any suggestions?

    Well the point is that forcing COW on each processes means that, according to Algorithms, 4th Edition by Robert Sedgewick and Kevin Wayne, if I can fit ~ 256mln INT value in 1GB of memory the problem is the massive devour of memory my program cause. Checking with my system monitor, I could validate that using 500mln values into my array result in almost 2GB of RAM taken. Which once fork it became equal to nProcess + 1.

    I thought to answer these question will help posterity.