arrayscmultiprocessingpipefork

Passing arrays from child process to parent process using pipes


I am trying to send a large array which half of its values are calculated by the child process and the other half is calculated by the parent process. I would like the parent process to obtain the calculated values for half of the array from the child process and merge/join both arrays from both processes together so that I have one final array which is the product of the child and parent array calculations added together. I am trying to use the piping method only. My current code uses a "dummy" array which is the determined array calculations for the child process. This array is then passed into the parent process through piping which allows for the Test->iterations array results from the parent process to be added all together. The array is defined in a structure called Test which is in the format of:

typedef struct{
int *iterations
int height;
int width;
int start;
int end;
} Test;

Since this is a 2D array, the size of iterations is Test->width * Test->height * sizeof(int)). I am also sending the array using chunk read and write functions since it is a very large array. My parallel program function (function where the two processes are spawned) follows the format of

int main(){
Test t;

t.height=1000;
t.width=1000;

if ((t->iterations = malloc(t->width * t->height * sizeof(int))) == 
NULL) {
    perror("Cannot allocate memory (iterations)");
    exit(EXIT_FAILURE);
}

processes(Test *);

}

void processes(Test *){
int p2c[2], c2p[2];
int i,j,k;
int half;


int *dummy = malloc(Test->width*Test->height*sizeof(int));
    if(dummy==NULL){
        perror("Malloc Error");
        exit(EXIT_FAILURE);
    }

for (i = 0; i < Test->height; i++) {
    for (j = 0; j < Test->width; j++) {
        dummy[i * Test->width + j] = 0.0;
    }
}

    
half=Test->height >> 1;

pipe(p2c);
pipe(c2p);

if(fork()==0){
    
    Test->start=0;
    Test->end=half;

    Compute(Test);

    for(i=Test->start;i<Test->end;i++){
        for(j=0;j<Test->width;j++){

        dummy[i*Test->width+j]=Test->iterations[i*Test->width+j];
            
        }
    }
    
    chwrite(c2p[WRITE], (char*)dummy, Test->width * half * sizeof(int), 2048);
    
    printf("Sending Iterations\n");

    close(p2c[READ]);
    close(c2p[WRITE]);
    exit(EXIT_SUCCESS);
    
}

else{

    

    Test->start=half;
    Test->end=Test->height;
    
    Compute(Test);
    sleep(1);

    chread(c2p[READ], (char*)dummy, Test->width * half * sizeof(int), 2048);
    
    printf("Reading Iterations\n");

    wait(NULL);

    
    for(i=0;i<Test->height;i++){
        for(j=0;j<Test->width;j++){
        Test->iterations[i*Test->width+j]=dummy[i*Test->width+j];
            
        }
    }

However, when i do this, the iterations array is full of zeros. What is the problem and how can i fix this?

My chunk read and write functions are as follows:

void chwrite(int fd, char *buf, int count, int chunksize){

int numChunks = (int)(count/chunksize);

int remsize = count-numChunks*chunksize; 

int i, j;

for(i=0,j=0; i<numChunks; j+=chunksize,i++){

    write(fd, &(buf[j]), chunksize);
    
}

write(fd, &(buf[j]),remsize);

}

void chread(int fd, char *buf, int count, int chunksize){

int numChunks = (int)(count/chunksize);
int remsize=count-numChunks*chunksize;

int i,j;

for(i=0,j=0; i<numChunks; j+=numChunks, i++){
    read(fd, &(buf[j]),chunksize);

}

read(fd, &(buf[j]) ,remsize);

}

Solution

  • The chwrite and chread functions do not check for errors or incomplete writes or reads. Also, there is a bug in chread where j is being increased by numChunks instead of by chunksize:

    for(i=0,j=0; i<numChunks; j+=numChunks, i++){
        read(fd, &(buf[j]),chunksize);
    
    }
    

    Here is a possible replacement for chread that checks for errors and allows incomplete reads. It returns the amount read if anything has been read, returns 0 if nothing has been read and either the amount to be read was 0 or an end-of-file condition was encountered, or -1 if nothing was read and an error was encountered:

    int chread(int fd, char *buf, int count, int chunksize){
       int done = 0;
       int rc = count ? -1 : 0;
       while (done < count) {
           if (chunksize > done - count) {
               chunksize = done - count;
           }
           rc = read(fd, &buf[done], chunksize);
           if (rc <= 0) {
               break;
           }
           done += rc;
       }
       return done ? done : rc;
    }
    

    Here is a replacement chwrite function. It is similar to chread except that there is no check for an end-of-file condition. (An end-of-file condition doesn't make sense on write, and a properly functioning write call should not return 0 unless called with a count of 0.):

    int chwrite(int fd, const char *buf, int count, int chunksize){
       int done = 0;
       int rc = count ? -1 : 0;
       while (done < count) {
           if (chunksize > done - count) {
               chunksize = done - count;
           }
           rc = write(fd, &buf[done], chunksize);
           if (rc < 0) {
               break;
           }
           done += rc;
       }
       return done ? done : rc;
    }
    

    There is also a problem in the parent process overwriting the second half of the test results with zeros here:

        for(i=0;i<Test->height;i++){
            for(j=0;j<Test->width;j++){
            Test->iterations[i*Test->width+j]=dummy[i*Test->width+j];
                
            }
        }
    

    (Note: OP's incomplete code has a variable t of type Test so the the use of a pointer called Test is very probably incorrect and shouldn't even compile, but let's go with it for now. OP's incomplete code has other errors, such as the function call processes(Test *); and the function definition void processes(Test *){ … }.)

    The outer for loop condition should be i<half or i<Test->start:

        for(i=0;i<Test->start;i++){
            for(j=0;j<Test->width;j++){
            Test->iterations[i*Test->width+j]=dummy[i*Test->width+j];
                
            }
        }