I/O completion ports made easy

I de­scribed the ba­sics of I/O com­ple­tion ports in my last post, but there is still the ques­tion of what the eas­i­est way to use them. Here I’ll show a call­back-based ap­pli­ca­tion de­sign that I’ve found makes a fully asyn­chro­nous pro­gram re­ally sim­ple to do.

I touched briefly on at­tach­ing our own con­text data to the OVERLAPPED struc­ture we pass along with I/O op­er­a­tions. It’s this same idea that I’ll ex­pand on here. This time, we de­fine a generic struc­ture to use with all our op­er­a­tions, and how our threads will han­dle them while de­queu­ing pack­ets:

struct io_context
{
  OVERLAPPED ovl;
  void (*on_completion)(DWORD error, DWORD transferred,
                        struct io_context *ctx);
};

OVERLAPPED *ovl;
ULONG_PTR completionkey;
DWORD transferred;

BOOL ret = GetQueuedCompletionStatus(iocp, &transferred,
   &completionkey, &ovl, INFINITE);

if(ret)
{
  struct io_context *ctx = (struct io_context*)ovl;
  ctx->on_completion(ERROR_SUCCESS, transferred, ctx);
}
else if(ovl)
{
  DWORD err = GetLastError();

  struct io_context *ctx = (struct io_context*)ovl;
  ctx->on_completion(err, transferred, ctx);
}
else
{
  // error out
}

With this, all our I/O op­er­a­tions will have a call­back as­so­ci­ated with them. When a com­ple­tion packet is de­queued it gets the error in­for­ma­tion, if any, and runs the call­back. Hav­ing every I/O op­er­a­tion use a sin­gle call­back mech­a­nism greatly sim­pli­fies the de­sign of the en­tire pro­gram.

Lets say our app was read­ing a file and send­ing out it’s con­tents. We also want it to prefetch the next buffer so we can start send­ing right away. Here’s our con­nec­tion con­text:

struct connection_context
{
  HANDLE file;
  SOCKET sock;

  WSABUF readbuf;
  WSABUF sendbuf;

  struct io_context readctx;
  struct io_context sendctx;
};

A struc­ture like this is nice be­cause ini­ti­at­ing an I/O op­er­a­tion will need no al­lo­ca­tions. Note that we need two io_­con­text mem­bers be­cause we’re doing a read and send con­cur­rently.

Now the code to use it:

#define BUFFER_SIZE 4096

void begin_read(struct connection_context *ctx)
{
  if(ctx->readbuf.buf)
  {
    // only begin a read if one isn't already running.
    return;
  }

  ctx->readbuf.buf = malloc(BUFFER_SIZE);
  ctx->readbuf.len = 0;

  // zero out io_context structure.
  memset(&ctx->readctx, 0, sizeof(ctx->readctx));

  // set completion callback.
  ctx->readctx.on_completion = read_finished;

  ReadFile(ctx->file, ctx->readbuf.buf, BUFFER_SIZE,
           NULL, &ctx->readctx.ovl);
}

void read_finished(DWORD error, DWORD transferred,
                   struct io_context *ioctx)
{
  // get our connection context.
  struct connection_context *ctx =
     (struct connection_context*)((char*)ioctx -
        offsetof(struct connection_context, readctx));

  if(error != ERROR_SUCCESS)
  {
    // handle error.
    return;
  }

  if(!transferred)
  {
    // reached end of file, close out connection.
    free(ctx->readbuf.buf);
    ctx->readbuf.buf = 0;
    return;
  }

  // send out however much we read from the file.

  ctx->readbuf.len = transferred;

  begin_send(ctx);
}

This gives us a very ob­vi­ous chain of events: read_finished is called when a read com­pletes. Since we only get an io_context struc­ture in our call­back, we need to ad­just the pointer to get our full connection_context.

Send­ing is easy too:

void begin_send(struct connection_context *ctx)
{
  if(ctx->sendbuf.buf)
  {
    // only begin a send if one
    // isn't already running.
    return;
  }

  if(!ctx->recvbuf.len)
  {
    // only begin a send if the
    // read buffer has something.
    return;
  }

  // switch buffers.
  ctx->sendbuf = ctx->readbuf;

  // clear read buffer.
  ctx->readbuf.buf = NULL;
  ctx->readbuf.len = 0;

  // zero out io_context structure.
  memset(&ctx->sendctx, 0, sizeof(ctx->sendctx));

  // set completion callback.
  ctx->sendctx.on_completion = send_finished;

  WSASend(ctx->sock, &ctx->sendbuf, 1, NULL, 0,
          &ctx->sendctx.ovl, NULL);

  // start reading next buffer.
  begin_read(ctx);
}

void send_finished(DWORD error, DWORD transferred,
                   struct io_context *ioctx)
{
  // get our connection context.
  struct connection_context *ctx =
     (struct connection_context*)((char*)ioctx -
        offsetof(struct connection_context, sendctx));

  if(error != ERROR_SUCCESS)
  {
    // handle error.
    return;
  }

  // success, clear send buffer and start next send.

  free(ctx->sendbuf.buf);
  ctx->sendbuf.buf = NULL;

  begin_send(ctx);
}

Pretty much more of the same. Again for brevity I’m leav­ing out some error check­ing code and as­sum­ing the buffer gets sent out in full. I’m also as­sum­ing a sin­gle-threaded de­sign—socket and file func­tions them­selves are thread-safe and have noth­ing to worry about, but the buffer man­age­ment code here would need some extra lock­ing since it could be run con­cur­rently. But the idea should be clear.

Up­date: this sub­ject con­tin­ued in Tips for ef­fi­cient I/O.

Related Posts