cfileoptimizationparallel-processingopenmp

Is it possible to read a file in parallel by extending this function?


I developed this function in C to read a file consisting of one word per line, like a standard wordlist. The function has already been optimized to a reasonable extent, but I would like to know if there is a way to parallelize the file reading process using OpenMP. I have tried various approaches, but I couldn’t find a working solution.

The idea I had was to divide the task among threads so that each thread uses a private array to store the words it reads, and then later merge these words into the output array (used in the main function) in parallel. However, I wasn’t able to implement this approach successfully. Is there a possible solution to my problem? Here's the code of the function:

    int file_read(const char *filename, unsigned char (*output)[MAX_WORD_LENGTH]) {
            int fd = open(filename , O_RDONLY);
            if ( fd < 0 ){
                printf("Errore nella lettura del file\n");
                perror("fd < 0");
                exit(1);
            }
            // dimensione  file in byte
            off_t file_size = lseek(fd, 0, SEEK_END);
            lseek(fd, 0, SEEK_SET);

            // Mappiamo il file in memoria
            char *file_data = (char *) mmap(NULL, file_size, PROT_READ, MAP_PRIVATE, fd, 0);
            //mmap funzione che permette di mappare in memoria il file
            //NULL significa che decide l'os dove metterlo
            //file_size quanto è grande, calcolato prima con lseek
            //PROT_READ dice che il file è solo in lettura
            //MAP_PRIVATE privata al processo, alternativamente MAP_SHARED
            //fd file descriptor
            //0 dove iniziare a leggere il file, 0 = inizio
            if (file_data == MAP_FAILED) {
                perror("Errore nella mappatura del file");
                close(fd);
                exit(1);
            }

            close(fd);

            int num_words = 0;
            int i=0, temp_len=0;
            char tempChar;
            while ( i < file_size && num_words < MAX_WORDS ){
                tempChar = file_data[i];
                if(tempChar=='\n'){
                    if(temp_len <= MAX_WORD_LENGTH){
                        if(temp_len > 0){
                            output[num_words][temp_len]='\0';
                            temp_len = 0;
                            num_words++;
                        }
                    }
                    else{
                        printf("Parola troppo grande");
                        exit(1);
                    }
                }
                else if(tempChar!='\r'){
                    output[num_words][temp_len++]=tempChar;
                }
                i++;
            }

            //rilascio la memoria su cui era inserito il file
            munmap(file_data, file_size);

            return num_words;

        }

P.S. This code is designed to read a large amount of data, such as one million words and the max lenght of a word is 56 in my case. Thanks.


Solution

  • I tried your code after creating an input file of 10^6 lines with words between 1 and 55 characters. And I compared to a version using using fopen and getline(), without any mmap. The file is on a low-end SSD. When the file is already in cache, your code is 30% faster (0.09 sec. vs 0.13 s), but when the file is not in cache, your code is more than 2x slower (0.33 sec. vs 0.15 sec.).

    With the following code using OpenMP I get about a 30% speed-up (with 4 threads) when the file is in cache. When the file is not in cache there's no significant speed up.

    Disclaimer: I didn't check the correctness of the results!

     int file_read_omp(const char *filename, unsigned char (*output)[MAX_WORD_LENGTH]) {
            int fd = open(filename , O_RDONLY);
            if ( fd < 0 ){
                printf("Errore nella lettura del file\n");
                perror("fd < 0");
                exit(1);
            }
            // dimensione  file in byte
            off_t file_size = lseek(fd, 0, SEEK_END);
            lseek(fd, 0, SEEK_SET);
    
            // Mappiamo il file in memoria
            char *file_data = (char *) mmap(NULL, file_size, PROT_READ, MAP_PRIVATE, fd, 0);
            if (file_data == MAP_FAILED) {
                perror("Errore nella mappatura del file");
                close(fd);
                exit(1);
            }
    
            close(fd);
    
            int num_words = 0;
            int nt;
            #pragma omp parallel
            #pragma omp single
            nt = omp_get_num_threads();
            
            size_t istart[nt+1];
            istart[nt] = file_size;
            int iwstart[nt];
            iwstart[0] = 0;
            #pragma omp parallel reduction(+:num_words)
            {
                int it = omp_get_thread_num();
                istart[it] = (file_size*it)/nt;
                if (it > 0) {
                    while (file_data[istart[it]-1] != '\n') istart[it]++;
                }
                
                #pragma omp barrier    
                
                unsigned char* output_local = malloc(MAX_WORDS*MAX_WORD_LENGTH);
                
                int temp_len=0, i=istart[it];
                char tempChar;
                while ( i < istart[it+1] && num_words < MAX_WORDS ){
                    tempChar = file_data[i];
                    if(tempChar=='\n'){
                        if(temp_len <= MAX_WORD_LENGTH){
                            if(temp_len > 0){
                                output_local[num_words*MAX_WORD_LENGTH+temp_len]='\0';
                                temp_len = 0;
                                num_words++;
                            }
                        }
                        else{
                            printf("Parola troppo grande");
                            exit(1);
                        }
                    }
                    else if(tempChar!='\r'){
                        output_local[num_words*MAX_WORD_LENGTH+temp_len]=tempChar;
                        temp_len++;
                    }
                    i++;
                }
                
                iwstart[it] = 0;
                #pragma omp barrier    
                
                #pragma omp critical    
                for (int j=it+1; j<nt; j++) iwstart[j] += num_words;
                
                #pragma omp barrier    
                
                for (int iw=0; iw<num_words; iw++) {
                    strcpy(output[iwstart[it]+iw],output_local+iw*MAX_WORD_LENGTH);
                }
                free(output_local);
            }
                
            //rilascio la memoria su cui era inserito il file
            munmap(file_data, file_size);
            
            return num_words;
    
        }