Cache Liveness Testing with Java’s Scheduled Thread Pool Executor

by

Perhaps needless to say, unchecked generics were not the only thing I wanted to clean up in LingPipe. Reading the Effective Java book’s section on threading convinced me to use the java.util.concurrent libraries for execution rather than managing my own threads. It also reminded me that without variables being synchronized on both sides or being declared volatile, changes are not guaranteed to be visible across threads. There aren’t member variables changing here, but there are values in arrays that change.

So I wondered about the liveness of LingPipe’s util.FastCaceh thread-safe cache. The existing tests were for safety, not liveness. What if every thread had to recreate its own cache entries?

First, a batch of imports:

import com.aliasi.util.FastCache;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Then the definition of the java.lang.Runnable, instances of which will be run in multiple threads simultaneously. The first part just sets up the member variables, which store the cache, number of entries, and number of cache hits:

    static class CacheTest implements Runnable {
        final FastCache<Integer,Integer> mCache;
        final int mNumEntries;
        int mHits = 0;
        CacheTest(FastCache<Integer,Integer> cache, 
                        int numEntries) {
            mCache = cache;
            mNumEntries = numEntries;
        }

An instance of FastCache<A,B> implements java.util.Map<A,B>; in this case, the cache maps integers to integers. In our HMM decoder implementation, the cache maps strings (tokens) to arrays of doubles (emission probabilities indexed by tag/category ID).

The meat is in the run() method:

        public void run() {
            Random random = new Random();
            for (int j = 0; j < mNumEntries; ++j) {
                try {
                    Thread.sleep(random.nextInt(5));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (mCache.get(j) != null) {
                    synchronized (this) {
                        ++mHits;
                    }
                } else {
                    mCache.put(j, j/2);
                }
            }
        }

Basically, it looks up each entry in the cache, and if it finds it, increments the number of hits, and if it doesn’t find it, it adds it to the cache. It sleeps a small random number of milliseconds for each entry; without the sleep, the first thread populates the cache before the others even get going!

The actual test harness is run through JUnit (3.8 — I haven’t gone over to the annotation version — that’s coming up, too). Here’s the method:

    public void testMulti() throws InterruptedException {
        int numThreads = 16; 
        int numEntries = 64;
        FastCache<Integer,Integer> cache 
            = new FastCache<Integer,Integer>(numEntries/2, 0.75);  
        CacheTest[] cacheTests = new CacheTest[numThreads];
        for (int i = 0; i < numThreads; ++i)
            cacheTests[i] = new CacheTest(cache,numEntries);
        ExecutorService executor 
            = new ScheduledThreadPoolExecutor((numThreads*3)/4);
        for (CacheTest testCache : cacheTests)
            executor.execute(testCache);
        executor.shutdown();
        executor.awaitTermination(30,TimeUnit.SECONDS);
        for (int j = 0; j < numEntries; ++j) {
            Integer val = (Integer) cache.get(new Integer(j));
            if (val == null) continue;
            assertEquals(val, new Integer(j/2));
        }
        int sumHits = 0;
        for (int i = 0; i < numThreads; ++i)
            sumHits += cacheTests[i].mHits;
        int misses = numThreads * numEntries - sumHits;
        System.out.println("hits=" + sumHits 
                           + " misses=" + misses
                           + " numEntries=" + numEntries);
        
    }

It’s very straightforward: set up the cache, then create an array for all of the test class instances, then create the executor, execute the test classes, then shut down the executor, wait for all the threads it’s working on to terminate, validate the results, then calculate the number of cache hits to print. Note that the values are chosen so the cache’s size and load factor guarantee it’ll have to expire entries.

Here’s what the run produces on my dual quad-core Xeon Windows Vista 64 workstation:

% lingpipetrunk>ant -Dtest.class=util.FastCache test1
...
    [junit] Running com.aliasi.test.unit.util.FastCacheTest
    [junit] Testsuite: com.aliasi.test.unit.util.FastCacheTest
    [junit] Tests run: 5, Failures: 0, Errors: 0, Time elapsed: 1.095 sec
    [junit] Output:
    [junit] hits=819 misses=205 numEntries=64

BUILD SUCCESSFUL
Total time: 2 seconds

Overall, I’m happy with the way this works.

Of course, any of you thread gurus out there who want to help make it better, I’m all ears. I’m still a relative novice.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s


Follow

Get every new post delivered to your Inbox.

Join 797 other followers