java多线程消息队列的实现

标签: java 多线程 消息队列 | 发表时间:2014-10-12 23:38 | 作者:lyongq04
出处:http://www.iteye.com

1、定义一个队列缓存池:
private static List<Queue> queueCache = new LinkedList<Queue>();


2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。
private Integer offerMaxQueue = 2000;


3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中
new Thread(){
				public void run(){
					while(true){
						String ip = null;
						try {
							synchronized (queueCache) {
								Integer size = queueCache.size();
								if(size==0){
//队列缓存池没有消息,等待。。。。									queueCache.wait();
								}
								Queue queue = queueCache.remove(0);

								if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理
									queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理,
									continue;
								}else{
						;//这里是处理该消息的操作。
								}
								size = queueCache.size();
								if(size<offerMaxQueue&&size>=0){									queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。
								}
							}
						} catch (Exception e) {
							e.printStackTrace();
						}finally{
							try {//检出该消息队列的锁
								unIpLock(queueStr);
							} catch (Execption e) {//捕获异常,不能让线程挂掉
								e.printStackTrace();
							}	
                                            }
						}
			}.start();




4、检入队列

synchronized (queueCache) {
while(true){
Integer size = queueCache.size();
if(size>=offerMaxQueue){
						try {
							queueCache.wait();
continue;//继续执行等待中的检入任务。
	} catch (InterruptedException e) {
			e.printStackTrace();
                   }
 }//IF

if(size<=offerMaxQueue&&size>0){
	queueCache.notifyAll();
}
break;//检入完毕
}//while
}					


5、锁方法实现
/**
	 * 锁
	 * @param ip
	 * @return
	 * @throws 
	 */
	public Boolean isLock(String queueStr) {
		return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;
	}
	//解锁
	public void unIpLock(String queueStr) {
		if(ip!=null){
			this.redisManager.del(queueStr+"_lock");
//			lock.unlock();
		}
	}


已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [java 多线程 消息队列] 推荐:

java多线程消息队列的实现

- - 编程语言 - ITeye博客
2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行. 3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中. if(size==0){ //队列缓存池没有消息,等待. if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理.

Java Thread多线程

- - CSDN博客推荐文章
Java Thread多线程. Java 多线程例子1 小例子. super("zhuyong");//设置线程的名字,默认为“TestThread”. Java 多线程例子2 前台线程(用户线程) 后台线程(守护线程 ). 1,setDaemon(true)后就是后台线程(守护线程 ),反之就是前台线程(用户线程).

Java多线程之synchronized

- - CSDN博客推荐文章
这里通过三个测试类阐述了synchronized应用的不同场景. 首先是最基本的synchronized Method的使用.  * @see 概述:Java中的每个对象都有一个锁(lock)或者叫做监视器(monitor) .  * @see 说明:当synchronized关键字修饰一个方法时,则该方法为同步方法 .

java多线程总结

- - Java - 编程语言 - ITeye博客
在java中要想实现多线程,有两种手段,一种是继续Thread类,另外一种是实现Runable接口. 对于直接继承Thread的类来说,代码大致框架是:. class 类名 extends Thread{. * @author Rollen-Holt 继承Thread类,直接调用run方法.             System.out.println(name + "运行     " + i);.

Java多线程学习

- - CSDN博客编程语言推荐文章
  线程是一种轻量级的进程,它和进程一样拥有独立的执行控制,由操作系统负责调度,区别在于线程没有独立的存储空间,而是和所属进程中的其它线程共享一个存储空间,这使得线程间的通信远较进程简单. 即多个线程可以同时执行,就像有多条流水线一样,可以同时进行工作,是并发执行的.   程序是由进程组成的,进程是由线程组成的.

Java多线程(二)同步

- - CSDN博客编程语言推荐文章
如果你的java基础较弱,或者不大了解java多线程请先看这篇文章 java多线程(一)线程定义、状态和属性. 同步一直是java多线程的难点,在我们做android开发时也很少应用,但这并不是我们不熟悉同步的理由. 希望这篇文章能使更多的人能够了解并且应用java的同步. 在多线程的应用中,两个或者两个以上的线程需要共享对同一个数据的存取.

[Java] Java 多线程案例分析

- - V2EX
现要从 hbase中导出 2016 年整年的,大约 10w只股票行情数据,数据总量约 100t. 汇总到 hdfs中供需求方使用. 已知数据量规模大概是 100t,那么单台机器处理肯定不是不行的,先不说大多数磁盘都没这么大,即便磁盘有这么大,单台机器处理对于内存和 cpu 要求也很高,所以我们将问题一般化,使用数量有限的低配机器.

Java 多线程内存模型

- - ITeye博客
Java 多线程内存模型.       Java虚拟机规范中试图定义一种Java内存模型(Java Memory Model,JMM)来屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的并发效果. 在此之前,主流程序怨言(如C/C++等)直接使用物理硬件(或者说操作系统的内存模型),因此,会由于不同的平台上内存模型差异,导致程序在一套平台上并发完成正常,而在另一套平台上并发访问却经常出错,因此经常需要针对不同的平台来编写程序.

Java多线程之wait()和notify()

- - CSDN博客推荐文章
直接看测试代码吧,细节之处,详见注释.  * Java多线程之wait()和notify()的妙用 .  * @see 问题:同时启动两个线程和同时启动四个线程,控制台打印结果是不同的 .  * @see      同时启动两个线程时,控制台会很规律的输出1010101010101010 .  * @see      同时启动四个线程时,控制台起初会规律的输出10101010,一旦某一刻输出一个负数,那么后面的输出就会"一错再错" .

java多线程设计wait/notify机制

- - CSDN博客推荐文章
  当线程A获得了obj锁后,发现条件condition不满足,无法继续下一处理,于是线程A就wait() , 放弃对象锁..   之后在另一线程B中,如果B更改了某些条件,使得线程A的condition条件满足了,就可以唤醒线程A:.   # 调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj) {…} 代码段内.