casynchronousioaio

Callbacks in AIO asynchronous I/O


I have found discussion on using callbacks in AIO asynchronous I/O on the internet. However, what I have found has left me confused. An example code is listed below from a site on Linux AIO. In this code, AIO is being used to read in the contents of a file. My problem is that it seems to me that a code that actually processes the contents of that file must have some point where some kind of block is made to the execution until the read is completed. This code here has no block like that at all. I was expecting to see some kind of call analogous to pthread_mutex_lock in pthread programming. I suppose I could put in a dummy loop after the aio_read() call that would block execution until the read is completed. But that puts me right back to the simplest way of blocking the execution, and then I don't see what is gained by all the coding overhead that goes into establishing a callback. I am obviously missing something. Could someone tell me what it is?

Here is the code. (BTW, the original is in C++; I have adapted it to C.)

#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <aio.h>
//#include <bits/stdc++.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>

const int BUFSIZE = 1024;




void aio_completion_handler(sigval_t sigval)
{
    struct aiocb *req;

    req = (struct aiocb *)sigval.sival_ptr; //Pay attention here.
    /*Check again if the asynchrony is complete?*/
    if (aio_error(req) == 0)
    {
        int ret = aio_return(req);
        printf("ret == %d\n", ret);
        printf("%s\n", (char *)req->aio_buf);
    }
    close(req->aio_fildes);
    free((void *)req->aio_buf);
    while (1)
    {
        printf("The callback function is being executed...\n");
        sleep(1);
    }
}



int main(void)
{
    struct aiocb my_aiocb;
    int fd = open("file.txt", O_RDONLY);
    if (fd < 0)
        perror("open");
    bzero((char *)&my_aiocb, sizeof(my_aiocb));

    my_aiocb.aio_buf = malloc(BUFSIZE);
    if (!my_aiocb.aio_buf)
        perror("my_aiocb.aio_buf");

    my_aiocb.aio_fildes = fd;
    my_aiocb.aio_nbytes = BUFSIZE;
    my_aiocb.aio_offset = 0;

    //Fill in callback information
    /*
    Using SIGEV_THREAD to request a thread callback function as a notification method
    */
    my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD;
    my_aiocb.aio_sigevent.sigev_notify_function = aio_completion_handler;
    my_aiocb.aio_sigevent.sigev_notify_attributes = NULL;
    /*
    The context to be transmitted is loaded into the handler (in this case, a reference to the aiocb request itself).
    In this handler, we simply refer to the arrived sigval pointer and use the AIO function to verify that the request has been completed.
    */
    my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;

    int ret = aio_read(&my_aiocb);
    if (ret < 0)
        perror("aio_read");
    /*    <---- A real code would process the data read from the file.
     *          So execution needs to be blocked until it is clear that the
     *          read is complete.  Right here I could put in:
     *          while (aio_error(%my_aiocb) == EINPROGRESS) {}
     *          But is there some other way involving a callback?
     *          If not, what has creating a callback done for me?
     */
    //The calling process continues to execute
    while (1)
    {
        printf("The main thread continues to execute...\n");
        sleep(1);
    }
    return 0;
}

Solution

  • I was expecting to see some kind of call analogous to pthread_mutex_lock in pthread programming.

    You are thinking in the right direction. POSIX AIO runs a SIGEV_THREAD completion handler in an adjacent POSIX thread after the operation completes (whether it succeeds or fails). So use POSIX thread synchronization primitives: mutex and condition variable.

    I suppose I could put in a dummy loop after the aio_read() call that would block execution until the read is completed.

    To wait for the completion of an asynchronous operation, there is the aio_suspend function. And yes, it should be used in a loop of the same kind:

    const struct aiocb *list[1] = { &my_aiocb };
    
    while (aio_error (&my_aiocb) == EINPROGRESS)
        aio_suspend (list, 1, NULL);
    

    But in the case of using a callback function, such completion waiting is a waste of resources and is in itself incorrect: if you need a callback function, then you obviously need to synchronize with its completion, and not with the completion of the asynchronous operation after which this function is called.

    So I'll reiterate: use the POSIX thread synchronization primitives to synchronize POSIX threads. Usage example:

    #include <errno.h>
    #include <stdio.h>
    #include <string.h>
    
    #include <aio.h>
    #include <fcntl.h>
    #include <pthread.h>
    #include <unistd.h>
    
    typedef void aio_cb (union sigval u);
    
    static
    int aio_pread (struct aiocb *o, int fd, void *data, size_t count, off_t offset,
               aio_cb cb, void *cookie)
    {
        o->aio_fildes = fd;
        o->aio_buf    = data;
        o->aio_nbytes = count;
        o->aio_offset = offset;
    
        o->aio_sigevent.sigev_value.sival_ptr   = cookie;
        o->aio_sigevent.sigev_notify            = SIGEV_THREAD;
        o->aio_sigevent.sigev_notify_function   = cb;
        o->aio_sigevent.sigev_notify_attributes = NULL;
    
        return aio_read (o);
    }
    
    struct work {
        pthread_mutex_t lock;
        pthread_cond_t  cond;
        int done;
        struct aiocb cb;
    };
    
    static void work_cont (union sigval u)
    {
        struct work *o = u.sival_ptr;
        const int error = aio_error (&o->cb);
    
        if (error != 0)
            fprintf (stderr, "E: aio read: %s\n", strerror (error));
        else
            fprintf (stderr, "D: got %zd bytes\n", aio_return (&o->cb));
    
        close (o->cb.aio_fildes);  /* or do some other work with result */
    
        fprintf (stderr, "D: Signal that we are done our work\n");
    
        pthread_mutex_lock (&o->lock);
        o->done = 1;
        pthread_cond_signal (&o->cond);
        pthread_mutex_unlock (&o->lock);
    }
    
    static void work_wait (struct work *o)
    {
        pthread_mutex_lock (&o->lock);
    
        while (!o->done)
            pthread_cond_wait (&o->cond, &o->lock);
    
        fprintf (stderr, "D: Got done signal\n");
    
        pthread_mutex_unlock (&o->lock);
    }
    
    int main (int argc, char *argv[])
    {
        int fd;
        struct work o = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
        char line[80];
    
        if (argc != 2) {
            fprintf (stderr, "usage:\n\taio-cb-test <file-to-read>\n");
            return 1;
        }
    
        if ((fd = open (argv[1], O_RDONLY)) == -1) {
            perror (argv[1]);
            return 1;
        }
    
        if (aio_pread (&o.cb, fd, line, sizeof (line), 0, work_cont, &o) != 0) {
            perror ("E: aio read");
            goto no_read;
        }
    
        work_wait (&o);
        return 0;
    no_read:
        close (fd);
        return 1;
    }
    

    Let's build and run the test:

    $ cc aio-cb-test.c -o aio-cb-test
    $ ./aio-cb-test aio-cb-test.c 
    D: got 80 bytes
    D: Signal that we are done our work
    D: Got done signal
    $ 
    

    Note that here only work with the done flag (the end marker of our work) is protected by the mutex. Working with aiocb is protected by the operation protocol: after the successful launch of an asynchronous operation, ownership of aiocb passes to the AIO subsystem, when the operation is completed, ownership passes to the callback function.