Use the Java concurrency API to speed up time-consuming tasks
Until recently, true concurrency has been  impossible on most computers marketed to consumers. Most have been  one-processor models capable of executing only a single thread in any given  time slice. Operating systems simulate doing many things at once by rapidly  time-slicing between threads, a practice known as temporal multithreading. 
  True concurrency, also known as simultaneous  multithreading (SMT), occurs when multiple threads execute instructions during  the same clock cycle. While in the past, SMT has been possible only on  high-end, multiple-processor systems, the advent of inexpensive multiple-core  processors is bringing SMT-capable systems to average consumers. The laptop you  pick up at Best Buy this holiday season might very well sport a Centrino Duo  capable of simultaneously executing two threads. Intel recently announced a new  line of quad-core processors. Soon, single-processor, single-core systems will  be considered relics. 
  These new systems give you, the high-performance Java developer, a  tremendous opportunity for speeding up your programs. However, they won't  inherit the advantages of SMT automatically; you must adapt your algorithms so  the time-consuming sections are performed by multiple threads designed to run  simultaneously. The addition of high-level concurrency utilities to Java 5  greatly eases this process. Using two important classes from the package java.util.concurrent, I'll demonstrate how to speed up time-consuming  tasks by having them use the optimal number of threads for the systems on which  they are executed. 
  A simple approach
  The basic approach is rather simple:
- Develop a single-threaded, sequential, robust, and clearly organized version of your algorithm. If you're reading this article to speed up something that exists, you may already have this step covered.
- Identify the subtasks. Examine the algorithm to identify the discrete stages. This shouldn't be difficult if you performed Step 1 adequately. Each subtask probably has its own method, or at least a clearly-delineated block of code. If you have difficulty identifying the subtasks, you probably need to understand the algorithm better or reorganize your code. In that case, return to Step 1.
- Benchmark the algorithm. In other words, time the identified subtasks to determine the fraction of time consumed by each. This should be a trivial matter of adding some timing and output statements.
- Delegate the most time-consuming subtasks to a thread pool. Now we come to the most difficult part: reorganizing your code so the slowest subtasks are performed concurrently by multiple subtask threads. Luckily, two utility classes in java.util.concurrent greatly simplify the process.
Introduction to K-means
  For an example, I'll show you how to adapt  K-means clustering to SMT. Don't panic if you've never heard of K-means. It's a  real-world clustering algorithm that is easy to understand, and it benefits  enormously from SMT. K-means is the algorithm on which I cut my SMT teeth. 
  My employer became frustrated when our main product took hours and hours  to cluster a particularly large dataset. Overhearing him mutter in the elevator  about needing to convert K-means from Java to C was all the motivation I  needed. I more than doubled the product's speed by simple refactoring and  optimization. Then, knowing that the processing server was a quad-CPU Sun, I  decided to make key sections concurrent. Since this was before Java 5, I used  utilities included with version 1.0.3 of the Colt libraries. (These utilities,  written by Doug Lea, have since been integrated into java.util.concurrent.) The SMT adaptation achieved a further  tripling in speed. My total efforts sped up K-means eight-fold and made the  boss so happy, we avoided the C conversion nightmare. 
  Before going into K-means further, I  recommend that you download the files kmeans.jar and kmeans_src.jar from Resources. Only the key sections of code will  be included in this article, so you'll want to browse the full listings. 
  Extract the sources from the jar using the command jar xvf kmeans_src.jar. It contains three versions of K-means  in the files: BasicKMeans.java, BenchmarkedKMeans.java, and ConcurrentKMeans.java. These files correspond to the results of  Steps 1, 3, and 4 of our basic approach. 
  K-means splits a large set of coordinates  into groups called clusters. More formally:
- K-means partitions N coordinates into K clusters, where N and K are positive integers, and K is less than N. The number of clusters K is an input parameter to K-means. K-means doesn't determine what K should be.
- A coordinate is a series of numbers. All have the same length M. The version of K-means presented in this article stores them in a two-dimensional N by M array of doubles.
- A cluster consists of a center and a list of the coordinates indices that belong to it. The center, also of length M, is simply the average of the member coordinates.
- At the conclusion of K-means, every coordinate belongs to one and only one cluster.
K-means partitions the coordinates into clusters through the following iterative sequence:
- Cluster initialization. K clusters must somehow be initialized. Although K-Means may do this in several different ways, the version included in this article randomly picks K of the coordinates to serve as the initial cluster centers. This is done in the method initCenters().
- Distance computation. To determine which cluster a coordinate should be assigned to, K-means compares the distance between the coordinate and each of the clusters. To minimize the number of times distances must be computed, this version keeps all NK distances stored in a two-dimensional array of doubles. This step computes them all in the method computeDistances().
- Initial cluster assignment. Each coordinate is assigned to the nearest cluster by the method makeAssignments().
- Cluster center computation. Now K-means enters an iteration loop. The first step in the loop is to compute the cluster centers changed by the last call to makeAssignments().
- Distance computation. K-means calls computeDistances() again, but this time, it only computes distances to clusters changed by the last call to makeAssignments().
- Cluster assignment. Coordinates are again assigned to clusters by makeAssignments(). This time, the number of moves—the number of coordinates switching clusters—is noted. K-means loops back to Step 4 unless at least one of two stopping criteria is met: the number of moves is 0 or the iteration counter reaches an upper limit. If either is met, K-means exits.
Try running K-means from the jar you downloaded using the command java -Xms200m -Xmx200m -jar kmeans.jar. If necessary, spell out the path to java.exe, version 5 or later. The two virtual machine options set the minimum and maximum heap sizes. Increase or lower these as  appropriate for your system. Set them to the same value so expansions of the  heap do not affect the timing results. 
    
    User interface for testing K-means. 
  The figure above shows the graphical user interface. Three text fields let  you specify the number of coordinates, clusters, and the seed for the random  number generator. (An explicit seed permits repeatable results between runs.)  The combo box lets you select one of three implementations of K-means: BasicKMeans, BenchmarkedKMeans, or ConcurrentKMeans. When the last is selected, another  text field is enabled, letting you enter the number of threads used in the  SMT-adapted steps. This field is initialized to the optimal number of threads  for your system, which is  conveniently obtained by Runtime.getRuntime().availableProcessors(). If you see 1 in this field when you  first bring up the program, your system is unfortunately not SMT-capable. You  can still run the example, but you won't observe a speedup when you increase  the number of threads. 
  The Run KMeans button executes the selected  version. As it executes, you will see real-time results in the text area.  You'll probably notice the absence of input fields for coordinate length and  maximum number of iterations. The test program always uses coordinates of  length 100 randomly generated using a Gaussian distribution. The iteration  limit is always 500. 
  Identifying the subtasks of K-means
  Following the steps of our basic approach, let's now identify our  algorithm's subtasks, which we can see in BasicKMeans's run() method: 
  Listing 1. The run method of BasicKMeans 
  
  public void run() {
  try  {    
  
  //  Note the start time.
  long startTime = System.currentTimeMillis();
  
  postKMeansMessage("K-Means clustering started");
  
  //  Randomly initialize the cluster centers creating the
  //  array mProtoClusters.
  initCenters();
  
  postKMeansMessage("... centers initialized");
    //  Perform the initial computation of distances.
  computeDistances();
    //  Make the initial cluster assignments.
  makeAssignments();
  
  //  Number of moves in the iteration and the iteration counter.
  int  moves = 0, it = 0;
  
  //  Main Loop:
  //
  //  Two stopping criteria:
  //  - no moves in makeAssignments 
  //   (moves == 0)
  //  OR
  //  - the maximum number of iterations has been reached
  //   (it == mMaxIterations)
  //
  do  {
       // Compute the centers of the clusters that need updating.
  computeCenters();
  
  // Compute the stored distances between the updated clusters and the
  // coordinates.
  computeDistances();
       // Make this iteration's assignments.
  moves = makeAssignments();
       it++;
  
  postKMeansMessage("...  iteration " + it + " moves = " + moves);
} while (moves > 0 && it < mMaxIterations);
    //  Transform the array of ProtoClusters to an array
  //  of the simpler class Cluster.
  mClusters = generateFinalClusters();
  
  long executionTime = System.currentTimeMillis() - startTime;
  
  postKMeansComplete(mClusters, executionTime);
  
  }  catch (Throwable t) {
  
  postKMeansError(t);
  
  }  finally {
    //  Clean up temporary data structures used during the algorithm.
  cleanup();
  }
  } 
                     
  Even though I described six steps in the K-means algorithm, it has only  four subtasks, since computeDistances() and makeAssignments() each handle two steps. We may think of  K-means, therefore, as consisting of the following subtasks: 
- Cluster initialization: Performed by initCenters()
- Distance computation: Performed by computeDistances()
- Cluster assignment: Performed by makeAssignments()
- Cluster center computation: Performed by computeCenters()
Benchmarking K-means
  Now that the subtasks have been identified, it's time to go to the  benchmarking phase. BenchMarkedKMeans.java is the version to use for  benchmarking. It is nothing but BasicKMeans.java with the addition of a few statements  for timing of the subtasks. Listing 2 shows its computeDistances() method, which is the same as BasicKMeans's method, except for the two statements that  update the object variable mComputeDistancesMS upon every call to the method. Similar  timing statements are in the other subtasks methods to increment other timing  variables. 
  Listing 2. The computeDistances()method of  BenchMarkedKMeans 
  
  /**
  *  Compute distances between coordinates and cluster centers,
  *  storing them in the distance cache.  Only  distances that
  * need  to be computed are computed.  This is  determined by
  *  distance update flags in the protocluster objects.
  */
  private void computeDistances() throws  InsufficientMemoryException {
   //  Mark the time the method was entered.
  long  t = System.currentTimeMillis();
   int  numCoords = mCoordinates.length;
  int  numClusters = mProtoClusters.length;
   if  (mDistanceCache == null) { // Distance cache instantiated on first call.
  //  Explicit garbage collection to reduce likelihood of insufficient
  //  memory.
  System.gc();
  //  Ensure there is enough memory available for the distances.
  //  Throw an exception if not.
  long memRequired = 8L * numCoords * numClusters;
  if  (Runtime.getRuntime().freeMemory() < memRequired) {
  throw new InsufficientMemoryException();
  }
  //  Instantiate an array to hold the distances between coordinates
  //  and cluster centers.
  mDistanceCache = new double[numCoords][numClusters];
  }
   for  (int coord=0; coord < numCoords; coord++) {
  //  Update the distances between the coordinate and all
  //  clusters currently in contention with update flags set.
  for (int clust=0; clust < numClusters;  clust++) {
  ProtoCluster cluster = mProtoClusters[clust];
  if (cluster.getConsiderForAssignment() && cluster.needsUpdate())  {
  mDistanceCache[coord][clust] =
  distance(mCoordinates[coord], cluster.getCenter());
  }
  }
  }
   //  Increment the timing variable by the number of ms taken by the method
  //  call.
  mComputeDistancesMS += (System.currentTimeMillis() - t);
  } 
                     
  Run BenchMarkedKMeans with the default numbers of  coordinates and clusters, 25,000 and 300, respectively, and you'll see the  timing results in the text area. On my dual-processor Xeon system, 93 percent  of the time is eaten up by distance computation, with cluster assignment coming  in a distant second of 5 percent. Cluster initialization doesn't even register,  which is not surprising, since it's performed only once and involves no  extensive calculations. You might be a little surprised that cluster center  computation consumes so little time, considering that it is done every  iteration and contains numerous averaging operations. The lesson here: never  skip the benchmarking step! If I had assumed computeCenters() took a significant part of the  processing time, I might have expended a great deal of time and energy adapting  it to SMT to attain a trivial gain at most. 
Adapting K-means to SMT
Now it is time for the nitty-gritty third phase of SMT adaptation. If you haven't done so already, take a break to study the code in BasicKMeans.java so you'll fully understand the listings that follow. This file is rigorously commented, as are the other files that accompany it.
Based upon the benchmarks, computeDistances() and makeAssignments() are adapted to SMT in the final version of K-means, found in ConcurrentKMeans.java. Before jumping into the code, I shouldcomment on two very important utility classes  from java.util.concurrent: ThreadPoolExecutor and CyclicBarrier. 
  As you probably guess from the name, ThreadPoolExecutor contains a thread pool. It implements the Executor interface, which mandates the single method public void execute(Runnable command). An Executor  executes a Runnable by calling its run method in some manner. A ThreadPoolExecutor executes a Runnable  on one of the threads it keeps in an internal pool. When a Runnable is submitted to its execute method, it finds an idle  thread in the pool and gives it the Runnable  to execute. 
Introduction to ThreadPoolExecutor and CyclicBarrier
ThreadPoolExecutor may be configured with either a variable or a  fixed number of threads. The threads may be configured to time out if not given  a task for a sufficient length of time. For the purposes of SMT-adaptation  described here, a simple fixed-size pool with threads that do not time out is  desired. Such a pool is obtained using the simple factory method in the Executors class: public  Executor newFixedThreadPool(int nThreads). 
  The other class, CyclicBarrier, is needed for coordination. When an SMT-adapted step is  being performed by the pool's threads, the controlling thread needs to know  when they all have finished. The next step's success depends upon what the  threads in the pool have done. A CyclicBarrier instance informs the controlling thread when an  SMT-adapted step is complete, so the controlling thread knows when to proceed. 
  An appropriate CyclicBarrier is instantiated using the constructor: public CyclicBarrier(int parties, Runnable  barrierAction). The argument  parties is the number of threads that the barrier will  coordinate. In this case, this is the number of threads in the thread pool. The  barrierAction is a Runnable  for the barrier to execute whenever it is reached. Reaching the barrier means  that all of its parties—the subtask threads in the thread pool—have informed  the barrier that they are finished. These threads inform the barrier of their  completion by calling the barrier's await()  method. The barrier tracks the number of calls to await(), and when that number becomes equal to parties, the barrier executes barrierAction. 
  In ConcurrentKMeans, the controlling thread waits while the thread pool's  threads are doing one of the subtasks. The execution of the barrier action  breaks the controlling thread out of the wait, allowing K-means to continue. 
  Now let's plunge into the code. Compare the run  method of ConcurrentKMeans in Listing 3 below, with that of BasicKMeans in Listing 1. The only significant difference is the  instantiation of an object variable of type SubtaskManager. This object manages the SMT-adapted subtasks of the  concurrent version of K-means. 
  Listing 3. The run method of ConcurrentKMeans 
  
 public void run() {
  try {
// Note the start time.
long startTime = System.currentTimeMillis();
    postKMeansMessage("K-Means clustering started");
// Randomly initialize the cluster centers.
initCenters();
    postKMeansMessage("... centers initialized");
// Instantiate the subtask manager.
mSubtaskManager = new SubtaskManager(mThreadCount);
// Post a message about the state of concurrent subprocessing.
    if (mThreadCount > 1) {
      postKMeansMessage("... concurrent processing mode with "
+ mThreadCount + " subtask threads");
    } else {
      postKMeansMessage("... non-concurrent processing mode");
}
// Perform the initial computation of distances.
computeDistances();
// Make the initial cluster assignments.
makeAssignments();
// Number of moves in the iteration and the iteration counter.
int moves = 0, it = 0;
// Main Loop:
//
// Two stopping criteria:
// - no moves in makeAssignments
// (moves == 0)
// OR
// - the maximum number of iterations has been reached
// (it == mMaxIterations)
//
    do {
// Compute the centers of the clusters that need updating.
computeCenters();
// Compute the stored distances between the updated clusters and the
// coordinates.
computeDistances();
// Make this iteration's assignments.
moves = makeAssignments();
it++;
      postKMeansMessage("... iteration " + it + " moves = " + moves);
} while (moves > 0 && it < mMaxIterations);
// Transform the array of ProtoClusters to an array
// of the simpler class Cluster.
mClusters = generateFinalClusters();
long executionTime = System.currentTimeMillis() - startTime;
postKMeansComplete(mClusters, executionTime);
  } catch (Throwable t) {
postKMeansError(t);
  } finally {
// Clean up temporary datastructures used during the algorithm.
cleanup();
}
}
                     
  Compare the version of the computeDistances() from Listing 2 with the ConcurrentKMeans version, shown in Listing 4. The former uses an outer for  loop to iterate over the coordinates, computing the distances to changed  clusters in the inner loop. This for loop has been replaced in ConcurrentKMeans by a call to mSubtaskManager.computeDistances(). Similarly, ConcurrentKMeans delegates the bulk of the work of the method makeAssignments() to the SubtaskManager. 
  Listing 4. computeDistances() from ConcurrentKMeans 
  
/**
* Compute distances between coordinates and cluster centers,
* storing them in the distance cache. Only distances that
* need to be computed are computed. This is determined by
* distance update flags in the protocluster objects.
*/
 private void computeDistances() throws InsufficientMemoryException {        
   if (mDistanceCache == null) { // Distance cache instantiated on first call.
int numCoords = mCoordinates.length;
int numClusters = mProtoClusters.length;
// Explicit garbage collection to reduce likelihood of insufficient
// memory. System.gc();
// Ensure there is enough memory available for the distances.
// Throw an exception if not.
long memRequired = 8L * numCoords * numClusters;
     if (Runtime.getRuntime().freeMemory() < memRequired) {
throw new InsufficientMemoryException();
}
// Instantiate an array to hold the distances between coordinates
// and cluster centers.
mDistanceCache = new double[numCoords][numClusters];
}
// Bulk of the work is delegated to the
// SubtaskManager.
mSubtaskManager.computeDistances();
}
Managing the subtasks
SubtaskManager is a non-static class nested within ConcurrentKMeans to manage the SMT-adapted steps. It has to be non-static  so that it can access the object fields of the enclosing ConcurrentKMeans. Nested in SubtaskManager is yet  another class, a non-static Runnable called Worker.  While only one instance of SubtaskManager is created, the number of Worker instances is equal to the number of subtask threads in the  thread pool. The Workers are the Runnables  submitted to the ThreadPoolExecutor to do the grunt work. 
  Look at the SubtaskManager code shown in Listing 5 between Lines 6 and 31. The  integer field mDoing, having possible values DOING_NOTHING, COMPUTING_DISTANCES, and MAKING_ASSIGNMENTS, is used to track the current subtask. The Boolean mWorking is a flag set to indicate when at least one of the Worker objects is executing. The field mExecutor is declared not as a ThreadPoolExecutor, but as the more general type Executor. You'll understand why when you read the explanation of  the constructor. The CyclicBarrier is assigned to the reference field mBarrier. Finally, references to the Worker  instances are placed in the array mWorkers. 
  The constructor, beginning on Line 39, takes the  number of threads to use in the thread pool. This must, of course, be at least  1, or it will throw a much deserved IllegalArgumentException. Line 52 reduces numThreads  to the number of coordinates, in case someone attempts something ridiculous, such  as clustering 100 coordinates with 200 threads. Lines 55-79 instantiate the Worker objects, dividing up the coordinates among them as evenly  as possible and ensuring every coordinate is covered. The for loop beginning on  Line 68 exists simply to apportion any leftover coordinates, since the number  of coordinates may not be evenly divisible by the number of threads. 
  On Line 81, things get interesting. If numThreads equals 1, Line 85 sets mExecutor  to an anonymous inner implementation that calls its Runnable directly. In other words, when only one subtask thread is  requested, there is no thread pool. Execution of the single Worker is done on the controlling thread. Also, the CyclicBarrier is not set, because it is unnecessary. 
  The code within the else clause between Lines 95  and 111 configures the SubtaskManager for the concurrent case. First, mBarrier is instantiated and given a barrier action that calls the SubtaskManager method workersDone(). Then, mExecutor  is set to a fixed size ThreadPoolExecutor. 
  To understand how SubtaskManager accomplishes a subtask, look at its methods makeAssignments() and computeDistances() on Lines 119 and 130, respectively. Both simply  set the flag mDoing and call the method work(). This method, beginning on Line 141, returns a Boolean to  indicate success of the subtask. It begins by initializing an ok flag to false on Line 142. Then it sets the flag mWorking. Entering the try-catch-finally block, on Line 149, mBarrier is reset if it is non-null. (Remember, the barrier is null  if using only one thread.) To reuse the barrier, reset()  must be called. The for loop beginning on Line 152 submits each of the Worker objects to the executor. If mExecutor  is a ThreadPoolExecutor, as it is when the number of subtask threads is  more than one, the Worker objects are executed not on the controlling  thread running work(), but on threads in the pool. However, if the  number of subtask threads is one, the single Worker  executes on the controlling thread. 
  Immediately after the for loop, if mBarrier is not null, the method waitOnWorkers() blocks the controlling thread in wait mode until the  method workersDone() causes it to unblock. Recall that workersDone() is called by the barrier action Runnable when the barrier is reached. Then the ok flag is set to true if mBarrier.isBroken() returns false. The barrier reports a broken state if one  of the worker threads is interrupted or throws an exception while running. 
  If, on the other hand, mBarrier is null after the for loop, the one Worker is executed on the calling thread and there is no reason  to block. In that case, the ok flag is set to true. 
  The catch clause traps a RejectedExecutionException, an exception that ThreadPoolExecutor's execute method may throw if insufficient  threads are available in the pool when the execute request was made, or if the  thread pool was shut down before submitting the request. Because of the code's  structure, neither of these cases can happen; however, to conform to good  coding practice, the exception is trapped. (The impossible has a way of  happening in software, doesn't it?) Before returning ok, the method ensures mWorking  is set back to false in the finally clause. 
  The blocking and unblocking functions of SubtaskManager are completed by the methods waitOnWorkers() and workersDone(). Look at workersDone() first, which begins on Line 200. It sets mWorking to false, then calls notifyAll() to break the controlling thread out of its wait. Now look at waitOnWorkers() on Line 180. This method calls wait() inside a while loop that exits when mWorking is false. 
  I learned the hard way that waitOnWorkers() should check mWorking  before calling wait() the first time, because it is possible for all  of the workers to finish before the controlling thread even enters waitOnWorkers(). As the workers finish, they call the barrier's await() method, which eventually triggers the barrier action that  calls workersDone(). If they finish their work so quickly that this  happens before the controlling thread enters waitOnWorkers(), the thread gets stuck if it calls wait(). Thus, it ascertains that mWorking  is true first. This flag can only be true if workersDone() did not precede the controlling thread's call to waitOnWorkers(). 
  Now look at the code for Worker beginning on Line 234. As stated before, this class does  the grunt work. It has integer object variables mStartCoord and mNumCoords to define the range of coordinates over which  an instance operates. Worker's run  method (see Line 268) contains a switch-case statement keyed on the SubtaskManager field mDoing. When mDoing  is equal to COMPUTING_DISTANCES, the method calls workerComputeDistances(). If mDoing  is equal to MAKING_ASSIGNMENTS, the method calls workerMakeAssignments(). After returning from one of these methods, the  Worker calls mBarrier's  await() method if it is non-null. When all Workers have called await(),  the barrier action calls workersDone() to unblock SubtaskManager's controlling thread. 
  If you look at the Worker methods workerComputeDistances() and workerMakeAssignments(), you'll see that they resemble the methods computeDistances() and makeAssignments() in BasicKMeans. The chief difference is that the Worker methods loop through a subset of the coordinates instead  of all of them. 
  Keeping track of the number of  cluster-assignment moves is a little more complicated than before. Each Worker must keep a separate tally in the field mMoves. The SubtaskManager method numberOfMoves() returns their sum. ConcurrentKMeans.makeAssignments() calls mSubtaskManager.makeAssignments(), then returns the result of mSubtaskManager.numberOfMoves(). 
  Listing 5. The nested class SubtaskManager 
  
001 /**
002 * The class which manages the SMT-adapted subtasks.
003 */
004 private class SubtaskManager {
005
006 // Codes used to identify what step is being done.
007 static final int DOING_NOTHING = 0;
008 static final int COMPUTING_DISTANCES = 1;
009 static final int MAKING_ASSIGNMENTS = 2;
010
011 // What the object is currently doing. Set to one of the
012 // three codes above.
013 private int mDoing = DOING_NOTHING;
014
015 // True if at least one of the Workers is doing something.
016 private boolean mWorking;
017
018 // The executor that runs the Workers.
019 // When in multiple-processor mode, this is a ThreadPoolExecutor
020 // with a fixed number of threads. In single-processor mode, it's
021 // a simple implementation that calls the single worker's run
022 // method directly.
023 private Executor mExecutor;
024
025 // A Barrier to wait on multiple Workers to finish up the current task.
026 // In single-processor mode, there is no need for a barrier, so it
027 // is not set.
028 private CyclicBarrier mBarrier;
029
030 // The worker objects which implement Runnable.
031 private Worker[] mWorkers;
032
033 /**
034 * Constructor
035 *
036 * @param numThreads the number of worker threads to be used for
037 * the subtasks.
038 */
039     SubtaskManager(int numThreads) {
040
041         if (numThreads <= 0) {
042             throw new IllegalArgumentException("number of threads <= 0: "
043 + numThreads);
044 }
045
046 int coordCount = mCoordinates.length;
047
048 // There would be no point in having more workers than
049 // coordinates, since some of the workers would have nothing
050 // to do.
051         if (numThreads > coordCount) {
052 numThreads = coordCount;
053 }
054
055 // Create the workers.
056 mWorkers = new Worker[numThreads];
057
058 // To hold the number of coordinates for each worker.
059 int[] coordsPerWorker = new int[numThreads];
060
061 // Initialize with the base amounts.
062 Arrays.fill(coordsPerWorker, coordCount/numThreads);
063
064 // There may be some leftovers, since coordCount may not be
065 // evenly divisible by numWorkers. Add a coordinate to each
066 // until all are covered.
067 int leftOvers = coordCount - numThreads * coordsPerWorker[0];
068         for (int i = 0; i < leftOvers; i++) {
069 coordsPerWorker[i]++;
070 }
071
072 int startCoord = 0;
073 // Instantiate the workers.
074         for (int i = 0; i < numThreads; i++) {
075 // Each worker needs to know its starting coordinate and the
076 // number of coordinates it handles.
077 mWorkers[i] = new Worker(startCoord, coordsPerWorker[i]);
078 startCoord += coordsPerWorker[i];
079 }
080
081         if (numThreads == 1) { // Single-processor mode.
082
083 // Create a simple executor that directly calls the single
084 // worker's run method. Do not set the barrier.
085             mExecutor = new Executor() {
086                 public void execute(Runnable runnable) {
087                     if (!Thread.interrupted()) {
088 runnable.run();
089                     } else {
090 throw new RejectedExecutionException();
091 }
092 }
093 };
094
095         } else { // Multiple-processor mode.
096
097 // Need the barrier to notify the controlling thread when the
098 // Workers are done.
099             mBarrier = new CyclicBarrier(numThreads, new Runnable() {
100                 public void run() {
101 // Method called after all workers haved called await() on the
102 // barrier. The call to workersDone()
103 // unblocks the controlling thread.
104 workersDone();
105 }
106 });
107
108 // Set the executor to a fixed thread pool with
109 // threads that do not time out.
110 mExecutor = Executors.newFixedThreadPool(numThreads);
111 }
112 }
113
114 /**
115 * Make the cluster assignments.
116 *
117 * @return true if nothing went wrong.
118 */
119     boolean makeAssignments() {
120 mDoing = MAKING_ASSIGNMENTS;
121 return work();
122 }
123
124 /**
125 * Compute the distances between the coordinates and those centers
126 * with update flags set.
127 *
128 * @return true if nothing went wrong.
129 */
130     boolean computeDistances() {
131 mDoing = COMPUTING_DISTANCES;
132 return work();
133 }
134
135 /**
136 * Perform the current subtask, waiting until all the workers
137 * finish their part of the current task before returning.
138 *
139 * @return true if the subtask succeeded.
140 */
141     private boolean work() {
142 boolean ok = false;
143 // Set the working flag to true.
144 mWorking = true;
145         try {
146             if (mBarrier != null) {
147 // Resets the barrier so it can be reused if
148 // this is not the first call to this method.
149 mBarrier.reset();
150 }
151 // Now execute the run methods on the Workers.
152             for (int i = 0; i < mWorkers.length; i++) {
153 mExecutor.execute(mWorkers[i]);
154 }
155             if (mBarrier != null) {
156 // Block until the workers are done. The barrier
157 // triggers the unblocking.
158 waitOnWorkers();
159 // If the isBroken() method of the barrier returns false,
160 // no problems.
161 ok = !mBarrier.isBroken();
162             } else {
163 // No barrier, so the run() method of a single worker
164 // was called directly and everything must have worked
165 // if we made it here.
166 ok = true;
167 }
168         } catch (RejectedExecutionException ree) {
169 // Possibly thrown by the executor.
170         } finally {
171 mWorking = false;
172 }
173 return ok;
174 }
175
176 /**
177 * Called from work() to put the controlling thread into
178 * wait mode until the barrier calls workersDone().
179 */
180     private synchronized void waitOnWorkers() {
181 // It is possible for the workers to have finished so quickly that
182 // workersDone() has already been called. Since workersDone() sets
183 // mWorking to false, check this flag before going into wait mode.
184 // Not doing so could result in hanging the SubtaskManager.
185         while (mWorking) {
186             try {
187 // Blocks until workersDone() is called.
188 wait();
189             } catch (InterruptedException ie) {
190 // mBarrier.isBroken() will return true.
191 break;
192 }
193 }
194 }
195
196 /**
197 * Notifies the controlling thread that it can come out of
198 * wait mode.
199 */
200     private synchronized void workersDone() {
201 // If this gets called before waitOnWorkers(), setting this
202 // to false prevents waitOnWorkers() from entering a
203 // permanent wait.
204 mWorking = false;
205 notifyAll();
206 }
207
208 /**
209 * Shutdown the thread pool when k-means is finished.
210 */
211     void shutdown() {
212         if (mExecutor instanceof ThreadPoolExecutor) {
213 // This terminates the threads in the thread pool.
214 ((ThreadPoolExecutor) mExecutor).shutdownNow();
215 }216 }
217
218 /**
219 * Returns the number of cluster assignment changes made in the
220 * previous call to makeAssignments().
221 */
222     int numberOfMoves() {
223 // Sum the contributions from the workers.
224 int moves = 0;
225         for (int i=0; i<mWorkers.length; i++) {
226 moves += mWorkers[i].numberOfMoves();
227 }
228 return moves;
229 }
230
231 /**
232 * The class which does the hard work of the subtasks.
233 */
234     private class Worker implements Runnable {
235
236 // Defines range of coordinates to cover.
237 private int mStartCoord, mNumCoords;
238
239 // Number of moves made by this worker in the last call
240 // to workerMakeAssignments(). The SubtaskManager totals up
241 // this value from all the workers in numberOfMoves().
242 private int mMoves;
243
244 /**
245 * Constructor
246 *
247 * @param startCoord index of the first coordinate covered by
248 * this Worker.
249 * @param numCoords the number of coordinates covered.
250 */
251         Worker(int startCoord, int numCoords) {
252 mStartCoord = startCoord;
253 mNumCoords = numCoords;
254 }
255
256 /**
257 * Returns the number of moves this worker made in the last
258 * execution of workerMakeAssignments()
259 */
260         int numberOfMoves() {
261 return mMoves;
262 }
263
264 /**
265 * The run method. It accesses the SubtaskManager field mDoing
266 * to determine what subtask to perform.
267 */
268         public void run() {
269
270             try {
271                 switch (mDoing) {
272 case COMPUTING_DISTANCES:
273 workerComputeDistances();
274 break;
275 case MAKING_ASSIGNMENTS:
276 workerMakeAssignments();
277 break;
278 }
279             } finally {
280 // If there's a barrier, call its await() method. To
281 // ensure that it gets done, it's placed in the finally clause.
282                 if (mBarrier != null) {
283                     try {
284 mBarrier.await();
285 // barrier.isBroken() will return true if either of
286 // these exceptions happens, so the SubtaskManager
287 // will detect the problem.
288                     } catch (InterruptedException ex) {
289                     } catch (BrokenBarrierException ex) {
290 }
291 }
292 }
293
294 }
295
296 /**
297 * Compute the distances for the covered coordinates
298 * to the updated centers.
299 */
300         private void workerComputeDistances() {
301 int lim = mStartCoord + mNumCoords;
302             for (int i = mStartCoord; i < lim; i++) {
303 int numClusters = mProtoClusters.length;
304                 for (int c = 0; c < numClusters; c++) {
305 ProtoCluster cluster = mProtoClusters[c];
306 if (cluster.getConsiderForAssignment() &&
307                         cluster.needsUpdate()) {
308 mDistanceCache[i][c] = distance(mCoordinates[i],
309 cluster.getCenter());
310 }
311 }
312 }
313 }
314
315 /**
316 * Assign each covered coordinate to the nearest cluster.
317 */
318         private void workerMakeAssignments() {
319 mMoves = 0;
320 int lim = mStartCoord + mNumCoords;
321             for (int i = mStartCoord; i < lim; i++) {
322 int c = nearestCluster(i);
323 mProtoClusters[c].add(i);
324                 if (mClusterAssignments[i] != c) {
325 mClusterAssignments[i] = c;
326 mMoves++;
327 }
328 }
329 }
330
331 }
332 }
Synchronization gotchas
At some point while reading this, you've  probably wondered about what synchronization issues might be encountered in SMT  adaptation. I encountered two with ConcurrentKMeans, both in the nested class ProtoCluster, which is a class K-means uses to track intermediate  clustering results. It was clear to me that something was wrong, because ConcurrentKMeans gave results different from BasicKMeans even though it was using the same N, K, and random seed. I ran it  several times, occasionally getting an ArrayIndexOutOfBoundsException originating from the ProtoCluster method add(int  ndx). Then the obvious  dawned on me: multiple threads were calling an unsynchronized method. (Doh!)  The exception happened because one worker thread attempted to add a coordinate,  while another thread was expanding the array holding the coordinate indices.  Simply adding the synchronized modifier to the add(int ndx) method definition fixed the problem. 
  The second problem was more subtle. I was  disappointed with the speedup when running with four threads on my  dual-processor Xeon system.  It was only 30 percent faster than the single-threaded version. So, stepping  through the code in debug mode, I finally noticed that computeDistances() computed nearly all of the distances each time  it was called. This method is supposed to compute only those distances to  clusters whose update flag is set to true. The reason it was doing more work  than necessary was that ProtoCluster's setUpdateFlag() method was spuriously setting the flag regardless of  whether the cluster had actually changed. It turned out that the two arrays in ProtoCluster, mPreviousMembership and mCurrentMembership, which setUpdateFlag() compares element by element, were in mismatched  order. Even though they contained the same indices, their elements were out of  order because the order in which the multiple threads add them is undefined. In  single-threaded mode, this did not happen: the one thread always added the  coordinates in order. To mend the problem, I just needed to add a single line  sorting mCurrentMembership before comparing its elements to the other  array. 
SMT speedup results
Now for the fun part: the results. The table  below shows the results I obtained running the test program on my Dell workstation with two hyper-threaded Xeon processors. The percentages shown in parentheses  are reductions in processing time over BasicKMeans. As you can see, ConcurrentKMeans gains significantly in speed when run with  either two or four threads. Since my system is capable of executing four threads simultaneously, the maximum speedup is 64  percent—almost a tripling of the speed. I also tested the program on a  Panasonic laptop with a Centrino Duo processor. Running it with two threads  gave a time reduction of 47 percent! 
    Results on Dell PC with Two Xeon Processors 
| Number of coordinates | Number of clusters | BasicKmeans: | ConcurrentKMeans: Processing times (and time reduction percent) | ||
| (N) | (K) | Processing times | Running 1 thread | Running 2 threads | Running 4 threads | 
| 25,000 | 300 | 88,300 ms | 88,252 ms | 48,595 ms (45 percent) | 31,610 ms (64 percent) | 
| 10,000 | 100 | 8,907 ms | 8,922 ms | 5,000 ms (44 percent) | 3,328 ms (63 percent) | 
| 5000 | 50 | 1,813 ms | 1,812 ms | 1,031 ms (43 percent) | 719 ms (60 percent) | 
| 1000 | 10 | 78 ms | 78 ms | 62 ms (21 percent) | 63 ms (19 percent) | 
I included the single thread column for concurrent K-means to demonstrate that the SMT adaptation did not incur a performance penalty in single-threaded mode. This is an important point, since algorithms that you adapt to SMT almost certainly still need to execute well on SMT-incapable systems.
Conclusion
This article should convince you that adapting your time-consuming tasks for concurrency can be well worth the effort. In fact, it may be imperative if your programs are to compete successfully on the platforms of the future. If you get in the habit of programming your tasks for concurrency now, as they are redeployed on newer and newer systems capable of running increasing numbers of threads at once, your programs will scream ahead of the single-threaded products of your competitors.
Author Bio
Randall Scarberry is a senior research scientist and software engineer at the Department of Energy’s Pacific Northwest National Laboratory in Richland, Washington. He specializes in high-performance Java applications to pattern recognition and text analysis. When not coming up with ways to speed up tricky algorithms, he enjoys hiking the numerous trails in the Cascade Mountains.
Resources
- Download the sample code that  accompanies this article:
 http://www.javaworld.com/javaworld/jw-11-2006/thread/jw-11-thread.zip
- For information on the Colt libraries:
 http://dsd.lbl.gov/~hoschek/colt/
- For more information about the K-means  algorithm:
 http://en.wikipedia.org/wiki/K-means
- Visit the concurrency section of the  Java Tutorial:
 http://java.sun.com/docs/books/tutorial/essential/concurrency/index.html
- For more on multicore processing, read Humphrey Sheil's JavaWorld series: "Is Your Code Ready for the Next Wave in Commodity Computing?"
- Part 1: Prepare yourself for multicore  processing (July 2006):
 http://www.javaworld.com/javaworld/jw-07-2006/jw-0710-multicore.html
- Part 2: Measure the speed-up (or  slow-down!) delivered by parallel hardware for your application:
 http://www.javaworld.com/javaworld/jw-11-2006/jw-1106-multicore.html
- For more articles on improving the  performance of your Java applications, browse through the articles in JavaWorld's Performance Tuning Research Center:
 http://www.javaworld.com/javaworld/jw-11-2006/jw-1106-multicore.html
- Keep up with what's new at JavaWorld!  Sign up for our free Enterprise Java newsletter:
 http://www.javaworld.com/newsletter/

