<< 在网页中用iframe做下拉菜单盖住select下拉框 | 首页 | What is a three-tier architecture? (什么是三层架构?) >>

Use the Java concurrency API to speed up time-consuming tasks

Until recently, true concurrency has been impossible on most computers marketed to consumers. Most have been one-processor models capable of executing only a single thread in any given time slice. Operating systems simulate doing many things at once by rapidly time-slicing between threads, a practice known as temporal multithreading.
True concurrency, also known as simultaneous multithreading (SMT), occurs when multiple threads execute instructions during the same clock cycle. While in the past, SMT has been possible only on high-end, multiple-processor systems, the advent of inexpensive multiple-core processors is bringing SMT-capable systems to average consumers. The laptop you pick up at Best Buy this holiday season might very well sport a Centrino Duo capable of simultaneously executing two threads. Intel recently announced a new line of quad-core processors. Soon, single-processor, single-core systems will be considered relics.
These new systems give you, the high-performance Java developer, a tremendous opportunity for speeding up your programs. However, they won't inherit the advantages of SMT automatically; you must adapt your algorithms so the time-consuming sections are performed by multiple threads designed to run simultaneously. The addition of high-level concurrency utilities to Java 5 greatly eases this process. Using two important classes from the package java.util.concurrent, I'll demonstrate how to speed up time-consuming tasks by having them use the optimal number of threads for the systems on which they are executed.
A simple approach
The basic approach is rather simple:

  • Develop a single-threaded, sequential, robust, and clearly organized version of your algorithm. If you're reading this article to speed up something that exists, you may already have this step covered.
  • Identify the subtasks. Examine the algorithm to identify the discrete stages. This shouldn't be difficult if you performed Step 1 adequately. Each subtask probably has its own method, or at least a clearly-delineated block of code. If you have difficulty identifying the subtasks, you probably need to understand the algorithm better or reorganize your code. In that case, return to Step 1.
  • Benchmark the algorithm. In other words, time the identified subtasks to determine the fraction of time consumed by each. This should be a trivial matter of adding some timing and output statements.
  • Delegate the most time-consuming subtasks to a thread pool. Now we come to the most difficult part: reorganizing your code so the slowest subtasks are performed concurrently by multiple subtask threads. Luckily, two utility classes in java.util.concurrent greatly simplify the process.

Introduction to K-means
For an example, I'll show you how to adapt K-means clustering to SMT. Don't panic if you've never heard of K-means. It's a real-world clustering algorithm that is easy to understand, and it benefits enormously from SMT. K-means is the algorithm on which I cut my SMT teeth.
My employer became frustrated when our main product took hours and hours to cluster a particularly large dataset. Overhearing him mutter in the elevator about needing to convert K-means from Java to C was all the motivation I needed. I more than doubled the product's speed by simple refactoring and optimization. Then, knowing that the processing server was a quad-CPU Sun, I decided to make key sections concurrent. Since this was before Java 5, I used utilities included with version 1.0.3 of the Colt libraries. (These utilities, written by Doug Lea, have since been integrated into java.util.concurrent.) The SMT adaptation achieved a further tripling in speed. My total efforts sped up K-means eight-fold and made the boss so happy, we avoided the C conversion nightmare.
Before going into K-means further, I recommend that you download the files kmeans.jar and kmeans_src.jar from Resources. Only the key sections of code will be included in this article, so you'll want to browse the full listings.
Extract the sources from the jar using the command jar xvf kmeans_src.jar. It contains three versions of K-means in the files: BasicKMeans.java, BenchmarkedKMeans.java, and ConcurrentKMeans.java. These files correspond to the results of Steps 1, 3, and 4 of our basic approach.
K-means splits a large set of coordinates into groups called clusters. More formally:

  • K-means partitions N coordinates into K clusters, where N and K are positive integers, and K is less than N. The number of clusters K is an input parameter to K-means. K-means doesn't determine what K should be.
  • A coordinate is a series of numbers. All have the same length M. The version of K-means presented in this article stores them in a two-dimensional N by M array of doubles.
  • A cluster consists of a center and a list of the coordinates indices that belong to it. The center, also of length M, is simply the average of the member coordinates.
  • At the conclusion of K-means, every coordinate belongs to one and only one cluster.

K-means partitions the coordinates into clusters through the following iterative sequence:

  • Cluster initialization. K clusters must somehow be initialized. Although K-Means may do this in several different ways, the version included in this article randomly picks K of the coordinates to serve as the initial cluster centers. This is done in the method initCenters().
  • Distance computation. To determine which cluster a coordinate should be assigned to, K-means compares the distance between the coordinate and each of the clusters. To minimize the number of times distances must be computed, this version keeps all NK distances stored in a two-dimensional array of doubles. This step computes them all in the method computeDistances().
  • Initial cluster assignment. Each coordinate is assigned to the nearest cluster by the method makeAssignments().
  • Cluster center computation. Now K-means enters an iteration loop. The first step in the loop is to compute the cluster centers changed by the last call to makeAssignments().
  • Distance computation. K-means calls computeDistances() again, but this time, it only computes distances to clusters changed by the last call to makeAssignments().
  • Cluster assignment. Coordinates are again assigned to clusters by makeAssignments(). This time, the number of moves—the number of coordinates switching clusters—is noted. K-means loops back to Step 4 unless at least one of two stopping criteria is met: the number of moves is 0 or the iteration counter reaches an upper limit. If either is met, K-means exits.

Try running K-means from the jar you downloaded using the command java -Xms200m -Xmx200m -jar kmeans.jar. If necessary, spell out the path to java.exe, version 5 or later. The two virtual machine options set the minimum and maximum heap sizes. Increase or lower these as appropriate for your system. Set them to the same value so expansions of the heap do not affect the timing results.

User interface for testing K-means.
The figure above shows the graphical user interface. Three text fields let you specify the number of coordinates, clusters, and the seed for the random number generator. (An explicit seed permits repeatable results between runs.) The combo box lets you select one of three implementations of K-means: BasicKMeans, BenchmarkedKMeans, or ConcurrentKMeans. When the last is selected, another text field is enabled, letting you enter the number of threads used in the SMT-adapted steps. This field is initialized to the optimal number of threads for your system, which is conveniently obtained by Runtime.getRuntime().availableProcessors(). If you see 1 in this field when you first bring up the program, your system is unfortunately not SMT-capable. You can still run the example, but you won't observe a speedup when you increase the number of threads.
The Run KMeans button executes the selected version. As it executes, you will see real-time results in the text area. You'll probably notice the absence of input fields for coordinate length and maximum number of iterations. The test program always uses coordinates of length 100 randomly generated using a Gaussian distribution. The iteration limit is always 500.
Identifying the subtasks of K-means
Following the steps of our basic approach, let's now identify our algorithm's subtasks, which we can see in BasicKMeans's run() method:
Listing 1. The run method of BasicKMeans

public void run() {

  try {   

// Note the start time.
long startTime = System.currentTimeMillis();

postKMeansMessage("K-Means clustering started");

// Randomly initialize the cluster centers creating the
// array mProtoClusters.

postKMeansMessage("... centers initialized");

    // Perform the initial computation of distances.

    // Make the initial cluster assignments.

// Number of moves in the iteration and the iteration counter.
int moves = 0, it = 0;

// Main Loop:
// Two stopping criteria:
// - no moves in makeAssignments
//   (moves == 0)
// OR
// - the maximum number of iterations has been reached
//   (it == mMaxIterations)
do {

      // Compute the centers of the clusters that need updating.

// Compute the stored distances between the updated clusters and the
// coordinates.

      // Make this iteration's assignments.
moves = makeAssignments();


postKMeansMessage("... iteration " + it + " moves = " + moves);

    } while (moves > 0 && it < mMaxIterations);

    // Transform the array of ProtoClusters to an array
// of the simpler class Cluster.
mClusters = generateFinalClusters();

long executionTime = System.currentTimeMillis() - startTime;

postKMeansComplete(mClusters, executionTime);

} catch (Throwable t) {


} finally {

    // Clean up temporary data structures used during the algorithm.


Even though I described six steps in the K-means algorithm, it has only four subtasks, since computeDistances() and makeAssignments() each handle two steps. We may think of K-means, therefore, as consisting of the following subtasks:

  • Cluster initialization: Performed by initCenters()
  • Distance computation: Performed by computeDistances()
  • Cluster assignment: Performed by makeAssignments()
  • Cluster center computation: Performed by computeCenters()

Benchmarking K-means
Now that the subtasks have been identified, it's time to go to the benchmarking phase. BenchMarkedKMeans.java is the version to use for benchmarking. It is nothing but BasicKMeans.java with the addition of a few statements for timing of the subtasks. Listing 2 shows its computeDistances() method, which is the same as BasicKMeans's method, except for the two statements that update the object variable mComputeDistancesMS upon every call to the method. Similar timing statements are in the other subtasks methods to increment other timing variables.
Listing 2. The computeDistances()method of BenchMarkedKMeans

* Compute distances between coordinates and cluster centers,
* storing them in the distance cache.  Only distances that
* need to be computed are computed.  This is determined by
* distance update flags in the protocluster objects.
private void computeDistances() throws InsufficientMemoryException {

   // Mark the time the method was entered.
long t = System.currentTimeMillis();

   int numCoords = mCoordinates.length;
int numClusters = mProtoClusters.length;

   if (mDistanceCache == null) { // Distance cache instantiated on first call.
// Explicit garbage collection to reduce likelihood of insufficient
// memory.
// Ensure there is enough memory available for the distances.
// Throw an exception if not.
long memRequired = 8L * numCoords * numClusters;
if (Runtime.getRuntime().freeMemory() < memRequired) {
throw new InsufficientMemoryException();
// Instantiate an array to hold the distances between coordinates
// and cluster centers.
mDistanceCache = new double[numCoords][numClusters];

   for (int coord=0; coord < numCoords; coord++) {
// Update the distances between the coordinate and all
// clusters currently in contention with update flags set.
for (int clust=0; clust < numClusters; clust++) {
ProtoCluster cluster = mProtoClusters[clust];
if (cluster.getConsiderForAssignment() && cluster.needsUpdate()) {
mDistanceCache[coord][clust] =
distance(mCoordinates[coord], cluster.getCenter());

   // Increment the timing variable by the number of ms taken by the method
// call.
mComputeDistancesMS += (System.currentTimeMillis() - t);

Run BenchMarkedKMeans with the default numbers of coordinates and clusters, 25,000 and 300, respectively, and you'll see the timing results in the text area. On my dual-processor Xeon system, 93 percent of the time is eaten up by distance computation, with cluster assignment coming in a distant second of 5 percent. Cluster initialization doesn't even register, which is not surprising, since it's performed only once and involves no extensive calculations. You might be a little surprised that cluster center computation consumes so little time, considering that it is done every iteration and contains numerous averaging operations. The lesson here: never skip the benchmarking step! If I had assumed computeCenters() took a significant part of the processing time, I might have expended a great deal of time and energy adapting it to SMT to attain a trivial gain at most.

Adapting K-means to SMT

Now it is time for the nitty-gritty third phase of SMT adaptation. If you haven't done so already, take a break to study the code in BasicKMeans.java so you'll fully understand the listings that follow. This file is rigorously commented, as are the other files that accompany it.

Based upon the benchmarks, computeDistances() and makeAssignments() are adapted to SMT in the final version of K-means, found in ConcurrentKMeans.java. Before jumping into the code, I should

comment on two very important utility classes from java.util.concurrent: ThreadPoolExecutor and CyclicBarrier.
As you probably guess from the name, ThreadPoolExecutor contains a thread pool. It implements the Executor interface, which mandates the single method public void execute(Runnable command). An Executor executes a Runnable by calling its run method in some manner. A ThreadPoolExecutor executes a Runnable on one of the threads it keeps in an internal pool. When a Runnable is submitted to its execute method, it finds an idle thread in the pool and gives it the Runnable to execute.

Introduction to ThreadPoolExecutor and CyclicBarrier

ThreadPoolExecutor may be configured with either a variable or a fixed number of threads. The threads may be configured to time out if not given a task for a sufficient length of time. For the purposes of SMT-adaptation described here, a simple fixed-size pool with threads that do not time out is desired. Such a pool is obtained using the simple factory method in the Executors class: public Executor newFixedThreadPool(int nThreads).
The other class, CyclicBarrier, is needed for coordination. When an SMT-adapted step is being performed by the pool's threads, the controlling thread needs to know when they all have finished. The next step's success depends upon what the threads in the pool have done. A CyclicBarrier instance informs the controlling thread when an SMT-adapted step is complete, so the controlling thread knows when to proceed.
An appropriate CyclicBarrier is instantiated using the constructor: public CyclicBarrier(int parties, Runnable barrierAction). The argument parties is the number of threads that the barrier will coordinate. In this case, this is the number of threads in the thread pool. The barrierAction is a Runnable for the barrier to execute whenever it is reached. Reaching the barrier means that all of its parties—the subtask threads in the thread pool—have informed the barrier that they are finished. These threads inform the barrier of their completion by calling the barrier's await() method. The barrier tracks the number of calls to await(), and when that number becomes equal to parties, the barrier executes barrierAction.
In ConcurrentKMeans, the controlling thread waits while the thread pool's threads are doing one of the subtasks. The execution of the barrier action breaks the controlling thread out of the wait, allowing K-means to continue.
Now let's plunge into the code. Compare the run method of ConcurrentKMeans in Listing 3 below, with that of BasicKMeans in Listing 1. The only significant difference is the instantiation of an object variable of type SubtaskManager. This object manages the SMT-adapted subtasks of the concurrent version of K-means.
Listing 3. The run method of ConcurrentKMeans

 public void run() {
  try {
    // Note the start time.
    long startTime = System.currentTimeMillis();
    postKMeansMessage("K-Means clustering started");
    // Randomly initialize the cluster centers.
    postKMeansMessage("... centers initialized");
    // Instantiate the subtask manager.
    mSubtaskManager = new SubtaskManager(mThreadCount);
    // Post a message about the state of concurrent subprocessing.
    if (mThreadCount > 1) {
      postKMeansMessage("... concurrent processing mode with "
        + mThreadCount + " subtask threads");
    } else {
      postKMeansMessage("... non-concurrent processing mode");
    // Perform the initial computation of distances.
    // Make the initial cluster assignments.
    // Number of moves in the iteration and the iteration counter.
    int moves = 0, it = 0;
    // Main Loop:
    // Two stopping criteria:
    // - no moves in makeAssignments 
    //   (moves == 0)
    // OR
    // - the maximum number of iterations has been reached
    //   (it == mMaxIterations)
    do {
      // Compute the centers of the clusters that need updating.
      // Compute the stored distances between the updated clusters and the
      // coordinates.
      // Make this iteration's assignments.
      moves = makeAssignments();
      postKMeansMessage("... iteration " + it + " moves = " + moves);
    } while (moves > 0 && it < mMaxIterations);
    // Transform the array of ProtoClusters to an array
    // of the simpler class Cluster.
    mClusters = generateFinalClusters();
    long executionTime = System.currentTimeMillis() - startTime;
    postKMeansComplete(mClusters, executionTime);
  } catch (Throwable t) {
  } finally {
    // Clean up temporary datastructures used during the algorithm.

Compare the version of the computeDistances() from Listing 2 with the ConcurrentKMeans version, shown in Listing 4. The former uses an outer for loop to iterate over the coordinates, computing the distances to changed clusters in the inner loop. This for loop has been replaced in ConcurrentKMeans by a call to mSubtaskManager.computeDistances(). Similarly, ConcurrentKMeans delegates the bulk of the work of the method makeAssignments() to the SubtaskManager.
Listing 4. computeDistances() from ConcurrentKMeans

 * Compute distances between coordinates and cluster centers,
 * storing them in the distance cache. Only distances that
 * need to be computed are computed. This is determined by
 * distance update flags in the protocluster objects.
 private void computeDistances() throws InsufficientMemoryException {        
   if (mDistanceCache == null) { // Distance cache instantiated on first call.
     int numCoords = mCoordinates.length;
     int numClusters = mProtoClusters.length;
     // Explicit garbage collection to reduce likelihood of insufficient
     // memory.     System.gc();
     // Ensure there is enough memory available for the distances.  
     // Throw an exception if not.
     long memRequired = 8L * numCoords * numClusters;
     if (Runtime.getRuntime().freeMemory() < memRequired) {
       throw new InsufficientMemoryException();
     // Instantiate an array to hold the distances between coordinates
     // and cluster centers.
     mDistanceCache = new double[numCoords][numClusters];
   // Bulk of the work is delegated to the
   // SubtaskManager.


Managing the subtasks

SubtaskManager is a non-static class nested within ConcurrentKMeans to manage the SMT-adapted steps. It has to be non-static so that it can access the object fields of the enclosing ConcurrentKMeans. Nested in SubtaskManager is yet another class, a non-static Runnable called Worker. While only one instance of SubtaskManager is created, the number of Worker instances is equal to the number of subtask threads in the thread pool. The Workers are the Runnables submitted to the ThreadPoolExecutor to do the grunt work.
Look at the SubtaskManager code shown in Listing 5 between Lines 6 and 31. The integer field mDoing, having possible values DOING_NOTHING, COMPUTING_DISTANCES, and MAKING_ASSIGNMENTS, is used to track the current subtask. The Boolean mWorking is a flag set to indicate when at least one of the Worker objects is executing. The field mExecutor is declared not as a ThreadPoolExecutor, but as the more general type Executor. You'll understand why when you read the explanation of the constructor. The CyclicBarrier is assigned to the reference field mBarrier. Finally, references to the Worker instances are placed in the array mWorkers.
The constructor, beginning on Line 39, takes the number of threads to use in the thread pool. This must, of course, be at least 1, or it will throw a much deserved IllegalArgumentException. Line 52 reduces numThreads to the number of coordinates, in case someone attempts something ridiculous, such as clustering 100 coordinates with 200 threads. Lines 55-79 instantiate the Worker objects, dividing up the coordinates among them as evenly as possible and ensuring every coordinate is covered. The for loop beginning on Line 68 exists simply to apportion any leftover coordinates, since the number of coordinates may not be evenly divisible by the number of threads.
On Line 81, things get interesting. If numThreads equals 1, Line 85 sets mExecutor to an anonymous inner implementation that calls its Runnable directly. In other words, when only one subtask thread is requested, there is no thread pool. Execution of the single Worker is done on the controlling thread. Also, the CyclicBarrier is not set, because it is unnecessary.
The code within the else clause between Lines 95 and 111 configures the SubtaskManager for the concurrent case. First, mBarrier is instantiated and given a barrier action that calls the SubtaskManager method workersDone(). Then, mExecutor is set to a fixed size ThreadPoolExecutor.
To understand how SubtaskManager accomplishes a subtask, look at its methods makeAssignments() and computeDistances() on Lines 119 and 130, respectively. Both simply set the flag mDoing and call the method work(). This method, beginning on Line 141, returns a Boolean to indicate success of the subtask. It begins by initializing an ok flag to false on Line 142. Then it sets the flag mWorking. Entering the try-catch-finally block, on Line 149, mBarrier is reset if it is non-null. (Remember, the barrier is null if using only one thread.) To reuse the barrier, reset() must be called. The for loop beginning on Line 152 submits each of the Worker objects to the executor. If mExecutor is a ThreadPoolExecutor, as it is when the number of subtask threads is more than one, the Worker objects are executed not on the controlling thread running work(), but on threads in the pool. However, if the number of subtask threads is one, the single Worker executes on the controlling thread.
Immediately after the for loop, if mBarrier is not null, the method waitOnWorkers() blocks the controlling thread in wait mode until the method workersDone() causes it to unblock. Recall that workersDone() is called by the barrier action Runnable when the barrier is reached. Then the ok flag is set to true if mBarrier.isBroken() returns false. The barrier reports a broken state if one of the worker threads is interrupted or throws an exception while running.
If, on the other hand, mBarrier is null after the for loop, the one Worker is executed on the calling thread and there is no reason to block. In that case, the ok flag is set to true.
The catch clause traps a RejectedExecutionException, an exception that ThreadPoolExecutor's execute method may throw if insufficient threads are available in the pool when the execute request was made, or if the thread pool was shut down before submitting the request. Because of the code's structure, neither of these cases can happen; however, to conform to good coding practice, the exception is trapped. (The impossible has a way of happening in software, doesn't it?) Before returning ok, the method ensures mWorking is set back to false in the finally clause.
The blocking and unblocking functions of SubtaskManager are completed by the methods waitOnWorkers() and workersDone(). Look at workersDone() first, which begins on Line 200. It sets mWorking to false, then calls notifyAll() to break the controlling thread out of its wait. Now look at waitOnWorkers() on Line 180. This method calls wait() inside a while loop that exits when mWorking is false.
I learned the hard way that waitOnWorkers() should check mWorking before calling wait() the first time, because it is possible for all of the workers to finish before the controlling thread even enters waitOnWorkers(). As the workers finish, they call the barrier's await() method, which eventually triggers the barrier action that calls workersDone(). If they finish their work so quickly that this happens before the controlling thread enters waitOnWorkers(), the thread gets stuck if it calls wait(). Thus, it ascertains that mWorking is true first. This flag can only be true if workersDone() did not precede the controlling thread's call to waitOnWorkers().
Now look at the code for Worker beginning on Line 234. As stated before, this class does the grunt work. It has integer object variables mStartCoord and mNumCoords to define the range of coordinates over which an instance operates. Worker's run method (see Line 268) contains a switch-case statement keyed on the SubtaskManager field mDoing. When mDoing is equal to COMPUTING_DISTANCES, the method calls workerComputeDistances(). If mDoing is equal to MAKING_ASSIGNMENTS, the method calls workerMakeAssignments(). After returning from one of these methods, the Worker calls mBarrier's await() method if it is non-null. When all Workers have called await(), the barrier action calls workersDone() to unblock SubtaskManager's controlling thread.
If you look at the Worker methods workerComputeDistances() and workerMakeAssignments(), you'll see that they resemble the methods computeDistances() and makeAssignments() in BasicKMeans. The chief difference is that the Worker methods loop through a subset of the coordinates instead of all of them.
Keeping track of the number of cluster-assignment moves is a little more complicated than before. Each Worker must keep a separate tally in the field mMoves. The SubtaskManager method numberOfMoves() returns their sum. ConcurrentKMeans.makeAssignments() calls mSubtaskManager.makeAssignments(), then returns the result of mSubtaskManager.numberOfMoves().
Listing 5. The nested class SubtaskManager

 001 /**
002  * The class which manages the SMT-adapted subtasks.
003  */
004 private class SubtaskManager {
006     // Codes used to identify what step is being done.
007     static final int DOING_NOTHING = 0;
008     static final int COMPUTING_DISTANCES = 1;
009     static final int MAKING_ASSIGNMENTS = 2;
011     // What the object is currently doing. Set to one of the 
012     // three codes above.
013     private int mDoing = DOING_NOTHING;
015     // True if at least one of the Workers is doing something.
016     private boolean mWorking;
018     // The executor that runs the Workers.
019     // When in multiple-processor mode, this is a ThreadPoolExecutor 
020     // with a fixed number of threads. In single-processor mode, it's
021     // a simple implementation that calls the single worker's run
022     // method directly.
023     private Executor mExecutor;
025     // A Barrier to wait on multiple Workers to finish up the current task.
026     // In single-processor mode, there is no need for a barrier, so it
027     // is not set.
028     private CyclicBarrier mBarrier;
030     // The worker objects which implement Runnable.
031     private Worker[] mWorkers;
033     /**
034      * Constructor
035      * 
036      * @param numThreads the number of worker threads to be used for
037      *   the subtasks.
038      */
039     SubtaskManager(int numThreads) {
041         if (numThreads <= 0) {
042             throw new IllegalArgumentException("number of threads <= 0: "
043                     + numThreads);
044         }
046         int coordCount = mCoordinates.length;
048         // There would be no point in having more workers than
049         // coordinates, since some of the workers would have nothing
050         // to do.
051         if (numThreads > coordCount) {
052             numThreads = coordCount;
053         }
055         // Create the workers.
056         mWorkers = new Worker[numThreads];
058         // To hold the number of coordinates for each worker.
059         int[] coordsPerWorker = new int[numThreads];
061         // Initialize with the base amounts.  
062         Arrays.fill(coordsPerWorker, coordCount/numThreads);
064         // There may be some leftovers, since coordCount may not be
065         // evenly divisible by numWorkers. Add a coordinate to each
066         // until all are covered.
067         int leftOvers = coordCount - numThreads * coordsPerWorker[0];
068         for (int i = 0; i < leftOvers; i++) {
069             coordsPerWorker[i]++;
070         }
072         int startCoord = 0;
073         // Instantiate the workers.
074         for (int i = 0; i < numThreads; i++) {
075             // Each worker needs to know its starting coordinate and the 
076             // number of coordinates it handles.
077             mWorkers[i] = new Worker(startCoord, coordsPerWorker[i]);
078             startCoord += coordsPerWorker[i];
079         }
081         if (numThreads == 1) { // Single-processor mode.
083             // Create a simple executor that directly calls the single
084             // worker's run method.  Do not set the barrier.
085             mExecutor = new Executor() {
086                 public void execute(Runnable runnable) {
087                     if (!Thread.interrupted()) {
088                         runnable.run();
089                     } else {
090                         throw new RejectedExecutionException();
091                     }
092                 }
093             };
095         } else { // Multiple-processor mode.
097             // Need the barrier to notify the controlling thread when the
098             // Workers are done.
099             mBarrier = new CyclicBarrier(numThreads, new Runnable() {
100                 public void run() {
101                     // Method called after all workers haved called await() on the
102                     // barrier.  The call to workersDone() 
103                     // unblocks the controlling thread.
104                     workersDone();
105                 }
106             });
108             // Set the executor to a fixed thread pool with 
109             // threads that do not time out.
110             mExecutor = Executors.newFixedThreadPool(numThreads);
111         }
112     }
114     /**
115      * Make the cluster assignments.
116      * 
117      * @return true if nothing went wrong.
118      */
119     boolean makeAssignments() {
120         mDoing = MAKING_ASSIGNMENTS;
121         return work();
122     }
124     /**
125      * Compute the distances between the coordinates and those centers 
126      * with update flags set.
127      * 
128      * @return true if nothing went wrong.
129      */
130     boolean computeDistances() {
131         mDoing = COMPUTING_DISTANCES;
132         return work();
133     }
135     /** 
136      * Perform the current subtask, waiting until all the workers
137      * finish their part of the current task before returning.
138      * 
139      * @return true if the subtask succeeded.
140      */
141     private boolean work() {
142         boolean ok = false;
143         // Set the working flag to true.
144         mWorking = true;
145         try {
146             if (mBarrier != null) {
147                 // Resets the barrier so it can be reused if
148                 // this is not the first call to this method.
149                 mBarrier.reset();
150             }
151             // Now execute the run methods on the Workers.  
152             for (int i = 0; i < mWorkers.length; i++) {
153                 mExecutor.execute(mWorkers[i]);
154             }
155             if (mBarrier != null) {
156                 // Block until the workers are done.  The barrier
157                 // triggers the unblocking.
158                 waitOnWorkers();
159                 // If the isBroken() method of the barrier returns false, 
160                 // no problems.
161                 ok = !mBarrier.isBroken();
162             } else {
163                 // No barrier, so the run() method of a single worker
164                 // was called directly and everything must have worked
165                 // if we made it here.
166                 ok = true;
167             }
168         } catch (RejectedExecutionException ree) {
169             // Possibly thrown by the executor.
170         } finally {
171             mWorking = false;
172         }
173         return ok;
174     }
176     /**
177      * Called from work() to put the controlling thread into
178      * wait mode until the barrier calls workersDone().
179      */
180     private synchronized void waitOnWorkers() {
181         // It is possible for the workers to have finished so quickly that
182         // workersDone() has already been called.  Since workersDone() sets
183         // mWorking to false, check this flag before going into wait mode.
184         // Not doing so could result in hanging the SubtaskManager.
185         while (mWorking) {
186             try {
187                 // Blocks until workersDone() is called.
188                 wait();
189             } catch (InterruptedException ie) {
190                 // mBarrier.isBroken() will return true.
191                 break;
192             }
193         }
194     }
196     /**
197      * Notifies the controlling thread that it can come out of
198      * wait mode.
199      */
200     private synchronized void workersDone() {
201         // If this gets called before waitOnWorkers(), setting this
202         // to false prevents waitOnWorkers() from entering a 
203         // permanent wait.
204         mWorking = false;
205         notifyAll();
206     }
208     /**
209      * Shutdown the thread pool when k-means is finished.
210      */
211     void shutdown() {
212         if (mExecutor instanceof ThreadPoolExecutor) {
213             // This terminates the threads in the thread pool.
214             ((ThreadPoolExecutor) mExecutor).shutdownNow();
215         }216     }
218     /** 
219      * Returns the number of cluster assignment changes made in the
220      * previous call to makeAssignments().
221      */
222     int numberOfMoves() {
223         // Sum the contributions from the workers.
224         int moves = 0;
225         for (int i=0; i<mWorkers.length; i++) {
226             moves += mWorkers[i].numberOfMoves();
227         }
228         return moves;
229     }
231     /**
232      * The class which does the hard work of the subtasks.
233      */
234     private class Worker implements Runnable {
236         // Defines range of coordinates to cover.
237         private int mStartCoord, mNumCoords;
239         // Number of moves made by this worker in the last call
240         // to workerMakeAssignments().  The SubtaskManager totals up
241         // this value from all the workers in numberOfMoves().
242         private int mMoves;
244         /**
245          * Constructor
246          * 
247          * @param startCoord index of the first coordinate covered by
248          *   this Worker.
249          * @param numCoords the number of coordinates covered.
250          */
251         Worker(int startCoord, int numCoords) {
252             mStartCoord = startCoord;
253             mNumCoords = numCoords;
254         }
256         /**
257          * Returns the number of moves this worker made in the last
258          * execution of workerMakeAssignments()
259          */
260         int numberOfMoves() {
261             return mMoves;
262         }
264         /**
265          * The run method.  It accesses the SubtaskManager field mDoing
266          * to determine what subtask to perform.
267          */
268         public void run() {
270             try {
271                 switch (mDoing) {
272                 case COMPUTING_DISTANCES:
273                     workerComputeDistances();
274                     break;
275                 case MAKING_ASSIGNMENTS:
276                     workerMakeAssignments();
277                     break;
278                 }
279             } finally {
280                 // If there's a barrier, call its await() method.  To 
281                 // ensure that it gets done, it's placed in the finally clause.
282                 if (mBarrier != null) {
283                     try {
284                         mBarrier.await();
285                     // barrier.isBroken() will return true if either of 
286                     // these exceptions happens, so the SubtaskManager 
287                     // will detect the problem.
288                     } catch (InterruptedException ex) {
289                     } catch (BrokenBarrierException ex) {
290                     }
291                 }
292             }
294         }
296         /**
297          * Compute the distances for the covered coordinates
298          * to the updated centers.
299          */
300         private void workerComputeDistances() {
301             int lim = mStartCoord + mNumCoords;
302             for (int i = mStartCoord; i < lim; i++) {
303                 int numClusters = mProtoClusters.length;
304                 for (int c = 0; c < numClusters; c++) {
305                     ProtoCluster cluster = mProtoClusters[c];
306                     if (cluster.getConsiderForAssignment() && 
307                         cluster.needsUpdate()) {
308                         mDistanceCache[i][c] = distance(mCoordinates[i], 
309                           cluster.getCenter()); 
310                     }
311                 }
312             } 
313         }
315         /**
316          * Assign each covered coordinate to the nearest cluster.
317          */
318         private void workerMakeAssignments() {
319             mMoves = 0;
320             int lim = mStartCoord + mNumCoords;
321             for (int i = mStartCoord; i < lim; i++) {
322                 int c = nearestCluster(i);
323                 mProtoClusters[c].add(i);
324                 if (mClusterAssignments[i] != c) {
325                     mClusterAssignments[i] = c;
326                     mMoves++;
327                 }
328             }
329         }
331     }
332 } 


Synchronization gotchas

At some point while reading this, you've probably wondered about what synchronization issues might be encountered in SMT adaptation. I encountered two with ConcurrentKMeans, both in the nested class ProtoCluster, which is a class K-means uses to track intermediate clustering results. It was clear to me that something was wrong, because ConcurrentKMeans gave results different from BasicKMeans even though it was using the same N, K, and random seed. I ran it several times, occasionally getting an ArrayIndexOutOfBoundsException originating from the ProtoCluster method add(int ndx). Then the obvious dawned on me: multiple threads were calling an unsynchronized method. (Doh!) The exception happened because one worker thread attempted to add a coordinate, while another thread was expanding the array holding the coordinate indices. Simply adding the synchronized modifier to the add(int ndx) method definition fixed the problem.
The second problem was more subtle. I was disappointed with the speedup when running with four threads on my dual-processor Xeon system. It was only 30 percent faster than the single-threaded version. So, stepping through the code in debug mode, I finally noticed that computeDistances() computed nearly all of the distances each time it was called. This method is supposed to compute only those distances to clusters whose update flag is set to true. The reason it was doing more work than necessary was that ProtoCluster's setUpdateFlag() method was spuriously setting the flag regardless of whether the cluster had actually changed. It turned out that the two arrays in ProtoCluster, mPreviousMembership and mCurrentMembership, which setUpdateFlag() compares element by element, were in mismatched order. Even though they contained the same indices, their elements were out of order because the order in which the multiple threads add them is undefined. In single-threaded mode, this did not happen: the one thread always added the coordinates in order. To mend the problem, I just needed to add a single line sorting mCurrentMembership before comparing its elements to the other array.

SMT speedup results

Now for the fun part: the results. The table below shows the results I obtained running the test program on my Dell workstation with two hyper-threaded Xeon processors. The percentages shown in parentheses are reductions in processing time over BasicKMeans. As you can see, ConcurrentKMeans gains significantly in speed when run with either two or four threads. Since my system is capable of executing four threads simultaneously, the maximum speedup is 64 percent—almost a tripling of the speed. I also tested the program on a Panasonic laptop with a Centrino Duo processor. Running it with two threads gave a time reduction of 47 percent!
Results on Dell PC with Two Xeon Processors

Number of coordinates

Number of clusters


ConcurrentKMeans: Processing times (and time reduction percent)



Processing times

Running 1 thread

Running 2 threads

Running 4 threads



88,300 ms

88,252 ms

48,595 ms (45 percent)

31,610 ms (64 percent)



8,907 ms

8,922 ms

5,000 ms (44 percent)

3,328 ms (63 percent)



1,813 ms

1,812 ms

1,031 ms (43 percent)

719 ms (60 percent)



78 ms

78 ms

62 ms (21 percent)

63 ms (19 percent)

I included the single thread column for concurrent K-means to demonstrate that the SMT adaptation did not incur a performance penalty in single-threaded mode. This is an important point, since algorithms that you adapt to SMT almost certainly still need to execute well on SMT-incapable systems.


This article should convince you that adapting your time-consuming tasks for concurrency can be well worth the effort. In fact, it may be imperative if your programs are to compete successfully on the platforms of the future. If you get in the habit of programming your tasks for concurrency now, as they are redeployed on newer and newer systems capable of running increasing numbers of threads at once, your programs will scream ahead of the single-threaded products of your competitors.

Author Bio

Randall Scarberry is a senior research scientist and software engineer at the Department of Energy’s Pacific Northwest National Laboratory in Richland, Washington. He specializes in high-performance Java applications to pattern recognition and text analysis. When not coming up with ways to speed up tricky algorithms, he enjoys hiking the numerous trails in the Cascade Mountains.


标签 :

发表评论 发送引用通报