Geeks With Blogs

News This is the *old* blog. The new one is at blog.sixeyed.com
Elton Stoneman
This is the *old* blog. The new one is at blog.sixeyed.com

It’s surprising how easily you can bring down a .NET app.

We’ll be starting the new year with a bulk load of data into the system we’re building, and now we’re load testing and verifying everything gets processed as expected. We’re fortunate that we have a snapshot of the expected data load, which contains around 70,000 incoming messages.

We rely heavily on a separate system providing static data to enrich each message and we want to be sure that system has data for all the entities we’ll be pushing through. So I put together a quick tool to verify that. It’s simple enough – read a message from the incoming queue, get the entities in the message, call the static data API for each entity, and save the results (OK, not found, exception etc.).

Incoming messages are XML which can be 20Kb to 2Mb in size; to get the entity IDs we need to run a handful of XPath queries, and then we usually call the API twice for each message. The tool needed to be fast enough that we could run it in an environment, find and fix any data issues and then re-run without impacting the timescales for our actual load tests.

The first version of the tool was single-threaded:

while (reader.Read()) 
{ 
    var xml = reader.GetString(0); 
    Load(xml); 
}

It had a low memory profile but took 5 minutes to process the first 1,000 messages, so needed to be speeded up. So next I started a task from the factory for each message:

while (reader.Read()) 
{ 
    var xml= reader.GetString(0); 
    Task.Factory.StartNew(() => Load(xml)); 
}

The TPL has a scheduler which decides how many tasks to run in parallel and how many to leave in the queue, based on CPU usage - but it doesn’t take memory usage into account. The Load() method uses an XDocument to run the XPath queries, which is quite memory hungry. With this version the TPL was pushing as many tasks through as the CPU could handle, which didn’t give the garbage collector time to run and clean up the XDocument resources between tasks. The tool processed 1,000 messages in 30 seconds, and then crashed with an out of memory exception.

The final approach was to group the tasks into batches, let each batch finish, then call GC.Collect() before starting on the next batch. I wrapped that up into a reusable TaskBatcher class:

public class TaskBatcher : IDisposable 
{ 
    //…   

    public void Add(Action action) 
    { 
        if (_batchIndex == _batchSize) 
        { 
            Task.WaitAll(_tasks.ToArray()); 
            GC.Collect(); 
            _batchIndex = 0; 
            _tasks = new List(); 
        } 
        _tasks.Add(Task.Factory.StartNew(action)); 
        _batchIndex++; 
    }

And used the task batcher like this, to process the messages in batches of 1,000:

using (var batcher = new TaskBatcher(1000)) 
{ 
    while (reader.Read()) 
    { 
        var xml= reader.GetString(0);  
        batcher.Add(() => Load(xml)); 
    } 
}

The batcher will start a new task with the TPL factory until it hits the batch limit, then wait for any remaining tasks to complete before starting a new batch. TaskBatcher implements IDisposable, so when it goes out of scope there’s a final WaitAll() to ensure all tasks are completed:

public void Dispose() 
{ 
    if (_batchIndex > 0) 
    { 
        Task.WaitAll(_tasks.ToArray()); 
    } 
}

The full code is available as a gist here: TaskBatcher, for batching up memory-intensive parallel tasks,  and with that in place the tool processed all 70,000 messages in just over two minutes, spiking memory towards the end of each batch but then calming down after the GC.Collect() call.

Posted on Monday, December 9, 2013 4:44 PM Code Snippet , .NET 4.0 , github | Back to top


Comments on this post: Batching up memory-intensive parallel tasks

No comments posted yet.
Your comment:
 (will show your gravatar)


Copyright © Elton Stoneman | Powered by: GeeksWithBlogs.net