jdk1.5——阻塞队列应用案例
- - 行业应用 - ITeye博客1 打印日志: 原来打印16个日志需要16秒时间,现在开启4个线程,让这16个任务在4秒内完成:. 1 将16个任务增加到 阻塞队列中. 2开启4个线程,每次从队列中获取数据. 这样主线程不停的放, 并发来的4个线程不停的取, 你可以理解为并发一次来了4个线程,每个线程取到后内部打印1S操作仍旧不变,.
1 打印日志: 原来打印16个日志需要16秒时间,现在开启4个线程,让这16个任务在4秒内完成:
思路: 0创建容量16的队列 1 将16个任务增加到 阻塞队列中 2开启4个线程,每次从队列中获取数据 这样主线程不停的放, 并发来的4个线程不停的取, 你可以理解为并发一次来了4个线程,每个线程取到后内部打印1S操作仍旧不变, 执行4次,一共耗时4S完成原来16秒不用并发下的操作 主线程放log 和 子线程取log 之间用condtion notEmpty notFull 来实现阻塞 public class Test { public static void main(String[] args){ // 0 创建容量只为1的队列 final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(16); // 2 开启4个线程,每次从队列中获取数据 for(int i=0;i<4;i++){ new Thread(new Runnable(){ @Override public void run() { while(true){ try { String log = queue.take(); parseLog(log); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } // 1 将16个任务在主线程中增加到阻塞队列中 System.out.println("begin:"+(System.currentTimeMillis()/1000)); for(int i=0;i<16;i++){ //这行代码不能改动 final String log = ""+(i+1);//这行代码不能改动 { try { queue.put(log); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //Test.parseLog(log); } } } //parseLog方法内部的代码不能改动 public static void parseLog(String log){ System.out.println(log+":"+(System.currentTimeMillis()/1000)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
2 数据有序一个个输出:
package queue; import java.util.concurrent.Semaphore; import java.util.concurrent.SynchronousQueue; /** * 设计10个线程来消费 数据,要求达到 每次消费的数据是有序,并且是一个个的输出 * * 设计点: * 要求是 1 顺序输出 2 每次输出一个 * 1 守门员semaphore设置每次进来只有1个 * 2 使用阻塞队列SynchronousQueue,其特点就是只有在取数据线程来的时候,入数据线程才将数据放进去,类似于Exchanger作用,达到顺序输出效果 * @author zm * */ public class Test { public static void main(String[] args) { // 一个计数信号量。信号量维护了一个许可集,即每次进入的个数, 这里设置为每次只能进1个 final Semaphore semaphore = new Semaphore(1); final SynchronousQueue<String> queue = new SynchronousQueue<String>(); for(int i=0;i<10;i++){// 开启10个消费线程 new Thread(new Runnable(){ @Override public void run() { try { semaphore.acquire(); // 每次线程进来 都c从问守门员semaphore那获得申请 String input = queue.take(); String output = TestDo.doSome(input); System.out.println(Thread.currentThread().getName()+ ":" + output); semaphore.release(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); } System.out.println("begin:"+(System.currentTimeMillis()/1000)); for(int i=0;i<10;i++){ //这行不能改动 String input = i+""; //这行不能改动 try { queue.put(input); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } //不能改动此TestDo类 class TestDo { public static String doSome(String input){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String output = input + ":"+ (System.currentTimeMillis() / 1000); return output; } }