I am new to asynchronous I/O. I need to get it working in some C and Fortran programs on a Linux system. I managed to write a little C test code (included below) that reads asynchronously from two files. The code compiled and ran. What I am wondering, though, is whether I am truly getting asynchronous I/O, or is the I/O really serial? The lustre file system I am dealing with is a bit outdated and it is not clear that it actually supports asynchronous I/O, and no one seems to have a definite answer. So I am wondering are there some timing statements or any kind of output I can add to the code to determine whether it is functioning in a truly asynchronous manner. I'm betting I'll need much larger files than what I am dealing with to do a meaningful test. No idea what else I need.
The code is:
#include <stdio.h>
#include <stdlib.h>
/* for "open()" ... */
#include <fcntl.h>
/* for "bzero()" ... */
#include<strings.h>
/* for asynch I/O ... */
#include <aio.h>
/* for EINPROGRESS ... */
#include <errno.h>
/* for "usleep()" ... */
#include <unistd.h>
#define BUFSIZE 1024
int main() {
int fd0, fd1, readstat0, readstat1;
struct aiocb *ioobjs[2];
ioobjs[0] = malloc(sizeof(struct aiocb));
ioobjs[1] = malloc(sizeof(struct aiocb));
fd0 = open("file.txt", O_RDONLY);
if (fd0 < 0) perror("open");
fd1 = open("otherfile.txt", O_RDONLY);
if (fd1 < 0) perror("open");
bzero((char *)ioobjs[0], sizeof(struct aiocb));
bzero((char *)ioobjs[1], sizeof(struct aiocb));
ioobjs[0]->aio_buf = malloc(BUFSIZE+1);
if (!ioobjs[0]->aio_buf) perror("malloc 0");
ioobjs[1]->aio_buf = malloc(BUFSIZE+1);
if (!ioobjs[1]->aio_buf) perror("malloc 0");
ioobjs[0]->aio_fildes = fd0;
ioobjs[0]->aio_nbytes = BUFSIZE;
ioobjs[0]->aio_offset = 0;
/* Don't forget this! With list I/O, there is no
* particular function call to make. You have to
* tell what you want to do via this member of
* your aiocb struct:
*/
ioobjs[0]->aio_lio_opcode = LIO_READ;
ioobjs[1]->aio_fildes = fd1;
ioobjs[1]->aio_nbytes = BUFSIZE;
ioobjs[1]->aio_offset = 0;
ioobjs[1]->aio_lio_opcode = LIO_READ;
readstat0 = aio_read(ioobjs[0]);
if (readstat0 < 0) perror("reading 0");
readstat1 = aio_read(ioobjs[1]);
if (readstat1 < 0) perror("reading 1");
lio_listio(LIO_NOWAIT, ioobjs, 2, NULL);
/* don't completely understand. gives system time to
* "wrap things up". without this, one of the outputs
* below (maybe both) will have no output to give.
*/
usleep(100);
if ((readstat0 = aio_return( ioobjs[0] )) > 0) {
printf(">>>\n");
printf("%s\n", (char *)(ioobjs[0]->aio_buf));
printf("<<<\n");
} else {
perror("return");
}
if ((readstat1 = aio_return( ioobjs[1] )) > 0) {
printf(">>>\n");
printf("%s\n", (char *)(ioobjs[1]->aio_buf));
printf("<<<\n");
} else {
perror("return");
}
}
From man aio
, note that aio_*
is wholly a glibc
[userspace] implementation.
So, as mentioned, it has some limitations.
The way to see what's going on, timewise, is to have an event log with timestamps.
The naive approach is to just use [debug] printf
calls. But, for precision time measurements, the overhead of printf
can disrupt the real/actual timing. That is, we don't measure "the system under test", but, rather, "the system under test + the timing/benchmark overhead".
One way is to run your program under strace
with appropriate timestamp options. The strace
log will have information about the syscalls used. But, because aio
is implemented in userspace, it may not be able to drill down to a fine enough grain. And, strace
itself can impose an overhead.
Another way is to create a trace/event log mechanism and instrument your code. Basically, it implements a fixed length ring queue of "trace elements". So, the trace data is stored in memory, so it's very fast.
A standard utility that can help with this is dtrace
. I've not done this myself, as I've preferred to "roll my own". See below for some actual code I've used.
Then, instrument your code with (e.g.):
evtadd(TRACE_HELLO,"hello world");
Where TRACE_*
is from an enum
you define as you wish.
Anyway, here's some event trace code I've used in the past. The event struct can be extended to add whatever extra data you wish to store at an event point.
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <sys/syscall.h>
#include <pthread.h>
typedef long long tsc_t;
typedef unsigned long long u32;
typedef unsigned long long u64;
// trace buffer element
typedef struct {
tsc_t evt_tsc;
pid_t evt_tid;
const char *evt_name;
u64 evt_xid;
} evtelem_t;
// trace buffer indexes
typedef struct {
u32 ring_enq;
u32 ring_deq;
} evtring_t;
// trace buffer control
typedef struct {
evtring_t que_ring;
u32 que_max;
evtelem_t *que_base;
pthread_mutex_t que_lock;
} evtctl_t;
evtctl_t evtctl;
// advance queue index [wrap if necessary]
#define EVTINC(_qidx) \
do { \
_qidx += 1; \
if (_qidx >= evtctl.que_max) \
_qidx = 0; \
} while (0)
// tscget -- get timestamp
tsc_t
tscget(void)
{
struct timespec ts;
static tsc_t tsczero = 0;
tsc_t tsc;
clock_gettime(CLOCK_MONOTONIC,&ts);
tsc = ts.tv_sec;
tsc *= 1000000000;
tsc += ts.tv_nsec;
if (tsczero == 0)
tsczero = tsc;
tsc -= tsczero;
return tsc;
}
// tscsec -- convert timestamp to fractional seconds
double
tscsec(tsc_t tsc)
{
double sec;
sec = tsc;
sec /= 1e9;
return sec;
}
// evtptr -- point to trace element
evtelem_t *
evtptr(u32 idx)
{
return &evtctl.que_base[idx];
}
// evtinit -- initialize trace
void
evtinit(u32 qmax)
{
evtctl.que_base = calloc(qmax,sizeof(evtelem_t));
evtctl.que_max = qmax;
evtctl.que_ring.ring_deq = 0;
evtctl.que_ring.ring_enq = 0;
pthread_mutex_init(&evtctl.que_lock,NULL);
}
// evtnew -- locate new event slot
evtelem_t *
evtnew(void)
{
evtring_t *nring;
evtelem_t *evt;
pthread_mutex_lock(&evtctl.que_lock);
nring = &evtctl.que_ring;
evt = evtptr(nring->ring_enq);
// advance enqueue pointer [wrap if necessary]
EVTINC(nring->ring_enq);
// if queue full advance dequeue pointer to maintain space
if (nring->ring_enq == nring->ring_deq)
EVTINC(nring->ring_deq);
pthread_mutex_unlock(&evtctl.que_lock);
return evt;
}
// evtadd -- add trace element
evtelem_t *
evtadd(u64 xid,const char *name)
{
tsc_t tsc;
evtelem_t *evt;
tsc = tscget();
evt = evtnew();
evt->evt_tsc = tsc;
evt->evt_xid = xid;
evt->evt_name = name;
evt->evt_tid = syscall(SYS_gettid);
return evt;
}
// _evtdump -- dump queue element
void
_evtdump(evtelem_t *evt,tsc_t *tscptr,FILE *xfinfo)
{
tsc_t tscprev;
tsc_t tscnow;
double elap;
double delta;
// get timestamp for this entry
tscnow = evt->evt_tsc;
tscprev = *tscptr;
if (tscprev == 0)
tscprev = tscnow;
// get time delta from previous entry
tscprev = tscnow - tscprev;
delta = tscsec(tscprev);
// get elapsed time from start
elap = tscsec(tscnow);
fprintf(xfinfo,"%.9f/%.9f %8d %s [%llu]\n",
elap,delta,evt->evt_tid,evt->evt_name,evt->evt_xid);
*tscptr = evt->evt_tsc;
}
// evtdump -- dump the queue
void
evtdump(const char *file)
{
FILE *evtxf;
evtring_t ring;
evtelem_t *evt;
tsc_t tscprev;
evtxf = fopen(file,"w");
if (evtxf == NULL) {
perror(file);
exit(1);
}
ring = evtctl.que_ring;
// initialize previous timestamp
tscprev = 0;
while (ring.ring_enq != ring.ring_deq) {
evt = evtptr(ring.ring_deq);
EVTINC(ring.ring_deq);
_evtdump(evt,&tscprev,evtxf);
}
fclose(evtxf);
}