casynchronouslustre

Need test to determine if asynchronous I/O is actually happening in C code


I am new to asynchronous I/O. I need to get it working in some C and Fortran programs on a Linux system. I managed to write a little C test code (included below) that reads asynchronously from two files. The code compiled and ran. What I am wondering, though, is whether I am truly getting asynchronous I/O, or is the I/O really serial? The lustre file system I am dealing with is a bit outdated and it is not clear that it actually supports asynchronous I/O, and no one seems to have a definite answer. So I am wondering are there some timing statements or any kind of output I can add to the code to determine whether it is functioning in a truly asynchronous manner. I'm betting I'll need much larger files than what I am dealing with to do a meaningful test. No idea what else I need.

The code is:

#include <stdio.h>
#include <stdlib.h>
/*  for "open()" ... */
#include <fcntl.h>
/* for "bzero()" ... */
#include<strings.h>
/* for asynch I/O ... */
#include <aio.h>
/* for EINPROGRESS ... */
#include <errno.h>
/* for "usleep()" ... */
#include <unistd.h>

#define BUFSIZE 1024


int main() {

        int fd0, fd1, readstat0, readstat1;
        struct aiocb *ioobjs[2];

        ioobjs[0] = malloc(sizeof(struct aiocb));
        ioobjs[1] = malloc(sizeof(struct aiocb));

        fd0 = open("file.txt", O_RDONLY);
        if (fd0 < 0) perror("open");
        fd1 = open("otherfile.txt", O_RDONLY);
        if (fd1 < 0) perror("open");

        bzero((char *)ioobjs[0], sizeof(struct aiocb));
        bzero((char *)ioobjs[1], sizeof(struct aiocb));

        ioobjs[0]->aio_buf = malloc(BUFSIZE+1);
        if (!ioobjs[0]->aio_buf) perror("malloc 0");
        ioobjs[1]->aio_buf = malloc(BUFSIZE+1);
        if (!ioobjs[1]->aio_buf) perror("malloc 0");

        ioobjs[0]->aio_fildes = fd0;
        ioobjs[0]->aio_nbytes = BUFSIZE;
        ioobjs[0]->aio_offset = 0;
        /* Don't forget this!  With list I/O, there is no
         * particular function call to make.  You have to
         * tell what you want to do via this member of
         * your aiocb struct:
         */
        ioobjs[0]->aio_lio_opcode = LIO_READ;
        ioobjs[1]->aio_fildes = fd1;
        ioobjs[1]->aio_nbytes = BUFSIZE;
        ioobjs[1]->aio_offset = 0;
        ioobjs[1]->aio_lio_opcode = LIO_READ;

        readstat0 = aio_read(ioobjs[0]);
        if (readstat0 < 0) perror("reading 0");
        readstat1 = aio_read(ioobjs[1]);
        if (readstat1 < 0) perror("reading 1");

        lio_listio(LIO_NOWAIT, ioobjs, 2, NULL);


        /* don't completely understand.  gives system time to
         * "wrap things up".  without this, one of the outputs
         * below (maybe both) will have no output to give.
         */
        usleep(100);

        if ((readstat0 = aio_return( ioobjs[0] )) > 0) {
                printf(">>>\n");
                printf("%s\n", (char *)(ioobjs[0]->aio_buf));
                printf("<<<\n");
        } else {
                perror("return");
        }
        if ((readstat1 = aio_return( ioobjs[1] )) > 0) {
                printf(">>>\n");
                printf("%s\n", (char *)(ioobjs[1]->aio_buf));
                printf("<<<\n");
        } else {
                perror("return");
        }


}

Solution

  • From man aio, note that aio_* is wholly a glibc [userspace] implementation.

    So, as mentioned, it has some limitations.

    The way to see what's going on, timewise, is to have an event log with timestamps.

    The naive approach is to just use [debug] printf calls. But, for precision time measurements, the overhead of printf can disrupt the real/actual timing. That is, we don't measure "the system under test", but, rather, "the system under test + the timing/benchmark overhead".

    One way is to run your program under strace with appropriate timestamp options. The strace log will have information about the syscalls used. But, because aio is implemented in userspace, it may not be able to drill down to a fine enough grain. And, strace itself can impose an overhead.

    Another way is to create a trace/event log mechanism and instrument your code. Basically, it implements a fixed length ring queue of "trace elements". So, the trace data is stored in memory, so it's very fast.

    A standard utility that can help with this is dtrace. I've not done this myself, as I've preferred to "roll my own". See below for some actual code I've used.

    Then, instrument your code with (e.g.):

    evtadd(TRACE_HELLO,"hello world");
    

    Where TRACE_* is from an enum you define as you wish.


    Anyway, here's some event trace code I've used in the past. The event struct can be extended to add whatever extra data you wish to store at an event point.

    #define _GNU_SOURCE
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <time.h>
    #include <sys/syscall.h>
    #include <pthread.h>
    
    typedef long long tsc_t;
    typedef unsigned long long u32;
    typedef unsigned long long u64;
    
    // trace buffer element
    typedef struct {
        tsc_t evt_tsc;
        pid_t evt_tid;
        const char *evt_name;
        u64 evt_xid;
    } evtelem_t;
    
    // trace buffer indexes
    typedef struct {
        u32 ring_enq;
        u32 ring_deq;
    } evtring_t;
    
    // trace buffer control
    typedef struct {
        evtring_t que_ring;
        u32 que_max;
        evtelem_t *que_base;
        pthread_mutex_t que_lock;
    } evtctl_t;
    
    evtctl_t evtctl;
    
    // advance queue index [wrap if necessary]
    #define EVTINC(_qidx) \
        do { \
            _qidx += 1; \
            if (_qidx >= evtctl.que_max) \
                _qidx = 0; \
        } while (0)
    
    // tscget -- get timestamp
    tsc_t
    tscget(void)
    {
        struct timespec ts;
        static tsc_t tsczero = 0;
        tsc_t tsc;
    
        clock_gettime(CLOCK_MONOTONIC,&ts);
    
        tsc = ts.tv_sec;
        tsc *= 1000000000;
        tsc += ts.tv_nsec;
    
        if (tsczero == 0)
            tsczero = tsc;
    
        tsc -= tsczero;
    
        return tsc;
    }
    
    // tscsec -- convert timestamp to fractional seconds
    double
    tscsec(tsc_t tsc)
    {
        double sec;
    
        sec = tsc;
        sec /= 1e9;
    
        return sec;
    }
    
    // evtptr -- point to trace element
    evtelem_t *
    evtptr(u32 idx)
    {
    
        return &evtctl.que_base[idx];
    }
    
    // evtinit -- initialize trace
    void
    evtinit(u32 qmax)
    {
    
        evtctl.que_base = calloc(qmax,sizeof(evtelem_t));
        evtctl.que_max = qmax;
    
        evtctl.que_ring.ring_deq = 0;
        evtctl.que_ring.ring_enq = 0;
    
        pthread_mutex_init(&evtctl.que_lock,NULL);
    }
    
    // evtnew -- locate new event slot
    evtelem_t *
    evtnew(void)
    {
        evtring_t *nring;
        evtelem_t *evt;
    
        pthread_mutex_lock(&evtctl.que_lock);
    
        nring = &evtctl.que_ring;
    
        evt = evtptr(nring->ring_enq);
    
        // advance enqueue pointer [wrap if necessary]
        EVTINC(nring->ring_enq);
    
        // if queue full advance dequeue pointer to maintain space
        if (nring->ring_enq == nring->ring_deq)
            EVTINC(nring->ring_deq);
    
        pthread_mutex_unlock(&evtctl.que_lock);
    
        return evt;
    }
    
    // evtadd -- add trace element
    evtelem_t *
    evtadd(u64 xid,const char *name)
    {
        tsc_t tsc;
        evtelem_t *evt;
    
        tsc = tscget();
    
        evt = evtnew();
        evt->evt_tsc = tsc;
        evt->evt_xid = xid;
        evt->evt_name = name;
        evt->evt_tid = syscall(SYS_gettid);
    
        return evt;
    }
    
    // _evtdump -- dump queue element
    void
    _evtdump(evtelem_t *evt,tsc_t *tscptr,FILE *xfinfo)
    {
        tsc_t tscprev;
        tsc_t tscnow;
        double elap;
        double delta;
    
        // get timestamp for this entry
        tscnow = evt->evt_tsc;
    
        tscprev = *tscptr;
        if (tscprev == 0)
            tscprev = tscnow;
    
        // get time delta from previous entry
        tscprev = tscnow - tscprev;
        delta = tscsec(tscprev);
    
        // get elapsed time from start
        elap = tscsec(tscnow);
    
        fprintf(xfinfo,"%.9f/%.9f %8d %s [%llu]\n",
            elap,delta,evt->evt_tid,evt->evt_name,evt->evt_xid);
    
        *tscptr = evt->evt_tsc;
    }
    
    // evtdump -- dump the queue
    void
    evtdump(const char *file)
    {
        FILE *evtxf;
        evtring_t ring;
        evtelem_t *evt;
        tsc_t tscprev;
    
        evtxf = fopen(file,"w");
        if (evtxf == NULL) {
            perror(file);
            exit(1);
        }
    
        ring = evtctl.que_ring;
    
        // initialize previous timestamp
        tscprev = 0;
    
        while (ring.ring_enq != ring.ring_deq) {
            evt = evtptr(ring.ring_deq);
            EVTINC(ring.ring_deq);
            _evtdump(evt,&tscprev,evtxf);
        }
    
        fclose(evtxf);
    }