12

I am testing kernel asynchronous io functions (not posix aio) and am trying to figure out how it works. The code below is a complete program where I simply write an array repeatedly to a file opened using O_DIRECT. I get an error in the callback function "write missed bytes expect 1024 got 0" (see the fprintf statement in work_done()).

For those not familiar with kernel aio, the code below does the following:

  1. Init some structs
  2. Prepare aio (io_prep_pwrite)
  3. Submit io requests (io_submit)
  4. Check for event completion (io_getevents)
  5. Call a callback function to see if everything went ok.

I get an error at step 5. If I do not open the file using O_DIRECT, things work fine, but it beats the purpose of having async writes. Can someone tell me what I am doing wrong? Is this the correct usage of kernel aio, for example, is my use of callbacks correct? Are there any restrictions on the usage of O_DIRECT?

I compile using 'gcc -Wall test.c -laio'

Thanks in advance.

/* 
 * File:   myaiocp.c
 * Author: kmehta
 *
 * Created on July 11, 2011, 12:50 PM
 *
 *
 * Testing kernel aio. 
 * Program creates a 2D matrix and writes it multiple times to create a file of desired size. 
 * Writes are performed using kernel aio functions (io_prep_pwrite, io_submit, etc.)
 */
#define _GNU_SOURCE
#define _XOPEN_SOURCE 600

#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <pthread.h>
#include <fcntl.h>
#include <string.h>
#include <sys/uio.h>
#include <sys/time.h>
#include <omp.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <libaio.h>

char ** buf;
long seg_size;
int seg_rows;
double total_size;
char * filename;
static int wait_count = 0;

void io_task();
void cleanup();
void allocate_2D_matrix(int[]);
int file_open(char *);
void wr_done(io_context_t ctx, struct iocb* iocb, long res, long res2);

int main(int argc, char **argv) {
    total_size  = 1048576;      //1MB
    seg_size    = 1024;         //1kB
    seg_rows    = 1024;
    filename    = "aio.out";

    int dims[] = {seg_rows, seg_size};
    allocate_2D_matrix(dims);   //Creates 2D matrix

    io_task();
    cleanup();

    return 0;
}

/*
 * Create a 2D matrix
 */
void allocate_2D_matrix(int dims[2]) {
    int i;
    char *data;

    //create the matrix
    data = (char *) calloc(1, dims[0] * dims[1] * sizeof (char));
    if (data == NULL) {
        printf("\nCould not allocate memory for matrix.\n");
        exit(1);
    }

    buf = (char **) malloc(dims[0] * sizeof (char *));
    if (buf == NULL) {
        printf("\nCould not allocate memory for matrix.\n");
        exit(1);
    }

    for (i = 0; i < dims[0]; i++) {
        buf[i] = &(data[i * dims[1]]);
    }
}

static void io_error(const char *func, int rc)
{
    if (rc == -ENOSYS)
        fprintf(stderr, "AIO not in this kernel\n");
    else if (rc < 0)
        fprintf(stderr, "%s: %s\n", func, strerror(-rc));
    else
        fprintf(stderr, "%s: error %d\n", func, rc);

    exit(1);
}

/*
 * Callback function
 */
static void work_done(io_context_t ctx, struct iocb *iocb, long res, long res2)
{

    if (res2 != 0) {
        io_error("aio write", res2);
      }

      if (res != iocb->u.c.nbytes) {
            fprintf(stderr, "write missed bytes expect %lu got %ld\n",
                  iocb->u.c.nbytes, res2);
            exit(1);
      }
      wait_count --;
      printf("%d ", wait_count);
}

/*
 * Wait routine. Get events and call the callback function work_done()
 */
int io_wait_run(io_context_t ctx, long iter)
{
      struct io_event events[iter];
      struct io_event *ep;
      int ret, n;

      /*
       * get up to aio_maxio events at a time.
       */
      ret = n = io_getevents(ctx, iter, iter, events, NULL);
      printf("got %d events\n", n);
      /*
       * Call the callback functions for each event.
       */
      for (ep = events ; n-- > 0 ; ep++) {
            io_callback_t cb = (io_callback_t)ep->data ; struct iocb *iocb = ep->obj ; cb(ctx, iocb, ep->res, ep->res2);
      }
      return ret;
}

void io_task() {
    long offset = 0;
    int bufIndex = 0;

    //Open file
    int fd = file_open(filename);

    //Initialize structures
    long i; 
    long iter = total_size / seg_size;  //No. of iterations to reach desired file size (total_size)
    io_context_t myctx;
    if(0 != io_queue_init(iter, &myctx))
    {
        perror("Could not initialize io queue");
        exit(EXIT_FAILURE);
    }
    struct iocb * ioq[iter];

    //loop through iter times to reach desired file size
    for (i = 0; i < iter; i++) {
        struct iocb *io = (struct iocb*) malloc(sizeof (struct iocb));
        io_prep_pwrite(io, fd, buf[bufIndex], seg_size, offset);
        io_set_callback(io, work_done);
        ioq[i] = io;

        offset += seg_size;
        bufIndex ++;
        if (bufIndex > seg_rows - 1)    //If entire matrix written, start again from index 0
            bufIndex = 0;
    }

    printf("done preparing. Now submitting..\n");
    if(iter != io_submit(myctx, iter, ioq))
    {
        perror("Failure on submit");
        exit(EXIT_FAILURE);
    }

    printf("now awaiting completion..\n");
    wait_count = iter;
    int res;

    while (wait_count) {
        res = io_wait_run(myctx, iter);
        if (res < 0)
            io_error("io_wait_run", res);
    }

    close(fd);
}

void cleanup() {
    free(buf[0]);
    free(buf);
}

int file_open(char *filename) {
    int fd;
    if (-1 == (fd = open(filename, O_DIRECT | O_CREAT | O_WRONLY | O_TRUNC, 0666))) {
        printf("\nError opening file. \n");
        exit(-1);
    }

    return fd;
}
Paul Sonier
  • 38,903
  • 3
  • 77
  • 117
Korizon
  • 3,677
  • 7
  • 37
  • 52

1 Answers1

10

First of all, good job using libaio instead of POSIX aio.

Are there any restrictions on the usage of O_DIRECT ?

I'm not 100% sure this is the real problem, but O_DIRECT has some requirements (quoting mostly from TLPI):

  • The data buffer being transferred must be aligned on a memory boundary that is a multiple of the block size (use posix_memalign)
  • The file or device offset at which data transfer commences must be a multiple of the block size
  • The length of the data to be transferred must be a multiple of the block size

At a glance, I can see you are not taking aby precautions to align memory in allocate_2D_matrix.

If I do not open the file using O_DIRECT, things work fine, but it beats the purpose of having async writes.

This happens not to be the case. Asynchronous I/O works well without O_DIRECT (for instance think of the number of system calls slashed).

cnicutar
  • 178,505
  • 25
  • 365
  • 392
  • @cnicutar Great. That fixed it. A couple of questions though: 1. Why did you suggest to use libaio instead of posix aio? I dont have much experience with posix aio, so I dont know. 2. Why did you say async I/O works well w/o O_DIRECT? How does it reduce the no. of system calls? In fact I thought O_DIRECT is more efficient since it bypasses kernel buffering. – Korizon Aug 02 '11 at 23:25
  • 2
    `mmap` is probably a better way to obtain 4k-aligned memory than `posix_memalign`. The latter will surely have to waste 4k at the beginning of the allocation for the few bytes of bookkeeping it needs, because it allocates on page granularity (assuming you're allocating enough that `posix_memalign` services the request via `mmap` and not `brk`). – R.. GitHub STOP HELPING ICE Aug 02 '11 at 23:29
  • 2
    But according to Robert Love's Linux System Programming book, Linux supports aio on regular files only if opened with O_DIRECT. – ovais.tariq Nov 07 '12 at 10:05
  • I'm just starting to look into linux async io. I'm unfamiliar with the the callback functionality, but the way the sample here uses it seems quite odd. Usually you supply a callback that some subsystem will use to call you when it's done with the task you asked it to do. The sample here appears to be calling io_getevents() to get the list of events which have completed and then calling the callback for each. That doesn't sound correct. In that case there is no need for the callback. –  Aug 28 '16 at 01:46
  • Warning: [`libaio`/Linux Kernel AIO can exhibit blocking behaviour at **io_submit()** time if you don't use `O_DIRECT`](https://stackoverflow.com/a/46377629/2732969) and thus *silently* behave in a *synchronous* manner! – Anon Apr 25 '21 at 01:28