Limiting asynchrony with task parallelism

When I started re-writing this website, I wanted to make good use of my multi-core CPU. Generating hundreds of pages using XSL transforms and plenty of pre-processing in C#, there's a lot of parallelism to be had.

I began by using the TPL's data parallelism features: mainly Parallel.ForEach and Parallel.Invoke. These are super easy to use, and made an immediate huge difference.

Then the Visual Studio 11 developer preview came out, and I felt compelled to make use of its new async features. This meant ditching the Parallel methods all together and writing for task parallelism.

There are still parts of the .NET Framework which don't support async, and XML is one of them. Because I'm reading relatively small documents, I was able to work around these limitations by asynchronously filling a MemoryStream from a file and feeding the MemoryStream to the XML classes:

Task<FileStream> OpenReadAsync(string fileName)
{
    return Task.Factory.StartNew(state =>
        new FileStream((string)state, FileMode.Open, FileAccess.Read,
                       FileShare.Read, 4096, true), fileName);
}

async Task<XmlReader> CreateXmlReader(string fileName,
                                      XmlReaderSettings settings = null)
{
    MemoryStream ms = new MemoryStream();
    
    using (FileStream fs = await OpenReadAsync(fileName))
    {
        await fs.CopyToAsync(ms);
    }

    ms.Position = 0;
    return XmlReader.Create(ms, settings, fileName);
}

But I had one more problem to solve. For efficiency, Parallel.ForEach partitions its items into ranges which will be operated on concurrently. A side effect of this that I was relying on was that only so many I/O operations would be able to happen at once. In my new code I'm simply launching all these tasks at once rather than partitioning—this absolutely killed performance as potentially hundreds of concurrent I/Os caused my disk to seek like crazy.

What I ended up doing here was creating a ticket system which can be used to allow only a limited number of I/Os to happen concurrently: essentially a safe task-based semaphore.

sealed class AsyncLimiter
{
    public AsyncLimiter(int max);
    public Task<IDisposable> Lock();
}

The full implementation is available in Subversion and under a 2-clause BSD license. Using it is very simple:

AsyncLimiter limiter = new AsyncLimiter(4);

async Task<FileStream> OpenReadAsync(string fileName)
{
    using (IDisposable limiterlock = await limiter.Lock())
    {
        return await Task.Factory.StartNew(state =>
            new FileStream((string)state, FileMode.Open, FileAccess.Read,
                           FileShare.Read, 4096, true), fileName);
    }
}

async Task<XmlReader> CreateXmlReader(string fileName,
                                      XmlReaderSettings settings = null)
{
    MemoryStream ms = new MemoryStream();

    using (FileStream fs = await OpenReadAsync(fileName))
    using (IDisposable limiterlock = await limiter.Lock())
    {
        await fs.CopyToAsync(ms);
    }

    ms.Position = 0;
    return XmlReader.Create(ms, settings, fileName);
}

When the lock gets disposed, it'll let the next operation in line progress. This was simple to implement efficiently using Interlocked methods and a ConcurrentQueue.

Some operations—file opening and existence testing, directory creation, etc.—have no asynchronous analog. For these there is no good solution, so I simply wrapped them in a task as in the OpenReadAsync example above. They're rare enough that it hasn't been a problem.

The end result? Actually about 50% better performance than using the Parallel methods. When all the files are in cache, I'm able to generate this entire website from scratch in about 0.7 seconds.

Posted on February 04, 2012 in Asynchronous, Coding, Scalability

Related Posts