基于Zookeeper的分布式共享锁

标签: zookeeper 分布 共享 | 发表时间:2015-07-09 10:12 | 作者:
出处:http://www.iteye.com

首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。

 

共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了。Zookeeper就很容易实现。具体的实现原理官网和其它网站也有翻译,这里就不在赘述了。

 

官网资料: http://zookeeper.apache.org/doc/r3.4.5/recipes.html

中文资料: https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper

详见Locks章节。

 

原理都知道了,网上一搜索Apache上面已经有提供了,既然已经有轮子了,哪我们也没必要重复造轮子了吧!直接使用 Curator。但是,我们在测试中发现,用于共享锁的结点无法自动回收,除了最末一级的临时结点会在锁释放和session超时的时候能自动回收外,其它结点均无法自动回收。我们的订单一天有好几万,遇到618和双十一的时候每天的订单量超50W,如果结点长期不回收的话,肯定会影响Zookeeper的性能。这时,我们就想到了一句话“自己动手,丰衣足食”。下面直接上代码:

 

首先,创建一个Maven工程,在pom文件里导入下面的包:

	<dependencies>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.6</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-client</artifactId>
			<version>2.8.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>2.8.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-framework</artifactId>
			<version>2.8.0</version>
		</dependency>
		<dependency>
			<groupId>commons-beanutils</groupId>
			<artifactId>commons-beanutils</artifactId>
			<version>1.9.2</version>
		</dependency>
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.2</version>
		</dependency>
		<dependency>
			<groupId>commons-lang</groupId>
			<artifactId>commons-lang</artifactId>
			<version>2.6</version>
		</dependency>
	</dependencies>

 

LockZookeeperClient接口:

package com.XXX.framework.lock;

import org.apache.curator.framework.CuratorFramework;

/**
 * 
 * description
 * 
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public interface LockZookeeperClient {
	/**
	 * 
	 * @return
	 */
	CuratorFramework getCuratorFramework();

	/**
	 * 
	 * @return
	 */
	String getBasePath();

	/**
	 * garbage collector
	 * 
	 * @param gcPath
	 */
	void gc(String gcPath);
}

 

LockZookeeperClient接口的实现LockZookeeperClientFactory:

package com.XXX.framework.lock;

import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentSkipListSet;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * description
 *  
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public class LockZookeeperClientFactory implements LockZookeeperClient {
	private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class);

	private boolean hasGc = true;
	private Timer gcTimer;
	private TimerTask gcTimerTask;
	private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>();
	private int gcIntervalSecond = 60;

	private CuratorFramework curatorFramework;
	private String zookeeperIpPort = "localhost:2181";
	private int sessionTimeoutMs = 10000;
	private int connectionTimeoutMs = 10000;
	private String basePath = "/locks";

	public void setHasGc(boolean hasGc) {
		this.hasGc = hasGc;
	}

	public void setGcIntervalSecond(int gcIntervalSecond) {
		this.gcIntervalSecond = gcIntervalSecond;
	}

	public void setZookeeperIpPort(String zookeeperIpPort) {
		this.zookeeperIpPort = zookeeperIpPort;
	}

	public void setSessionTimeoutMs(int sessionTimeoutMs) {
		this.sessionTimeoutMs = sessionTimeoutMs;
	}

	public void setConnectionTimeoutMs(int connectionTimeoutMs) {
		this.connectionTimeoutMs = connectionTimeoutMs;
	}

	public void setBasePath(String basePath) {
		basePath = basePath.trim();
		if (basePath.endsWith("/")) {
			basePath = basePath.substring(0, basePath.length() - 1);
		}

		this.basePath = basePath;
	}

	public void init() {
		if(StringUtils.isBlank(zookeeperIpPort)){
			throw new NullPointerException("zookeeperIpPort");
		}

		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
		curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
		curatorFramework.start();
		LOG.info("CuratorFramework initialise succeed.");

		if (hasGc) {
			gc();
		}
	}

	public void destroy() {
		gcPaths.clear();
		gcPaths = null;
		gcStop();
		curatorFramework.close();
		curatorFramework = null;
	}

	@Override
	public void gc(String gcPath) {
		if (hasGc && StringUtils.isNotBlank(gcPath)) {
			gcPaths.add(gcPath.trim());
		}
	}

	@Override
	public CuratorFramework getCuratorFramework() {
		return this.curatorFramework;
	}

	@Override
	public String getBasePath() {
		return this.basePath;
	}

	private synchronized void gc() {
		gcStop();

		try {
			scanningGCNodes();
		} catch (Throwable e) {
			LOG.warn(e);
		}

		gcTimerTask = new TimerTask() {
			@Override
			public void run() {
				doingGc();
			}
		};

		Date begin = new Date();
		begin.setTime(begin.getTime() + (10 * 1000L));
		gcTimer = new Timer("lock-gc", true);
		gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L);
	}

	private synchronized void gcStop() {
		if (null != gcTimer) {
			gcTimer.cancel();
			gcTimer = null;
		}

		if (null != gcTimerTask) {
			gcTimerTask.cancel();
			gcTimerTask = null;
		}
	}

	private synchronized void scanningGCNodes() throws Exception {
		if (null == curatorFramework.checkExists().forPath(basePath)) {
			return;
		}

		List<String> paths = curatorFramework.getChildren().forPath(basePath);
		if (CollectionUtils.isEmpty(paths)) {
			gcPaths.add(basePath);
			return;
		}

		for (String path : paths) {
			try{
				String tmpPath = basePath + "/" + path;
				if (null == curatorFramework.checkExists().forPath(tmpPath)) {
					continue;
				}
				
				gcPaths.add(tmpPath);
			} catch(Throwable e){
				LOG.warn("scanning gc nodes error.", e);
			}
		}
	}
	
	private synchronized void doingGc() {
		LOG.debug("GC beginning.");

		if (CollectionUtils.isNotEmpty(gcPaths)) {
			for (String path : gcPaths) {
				try {
					if (null != curatorFramework.checkExists().forPath(path)) {
						if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) {
							curatorFramework.delete().forPath(path);
							gcPaths.remove(path);
							LOG.debug("GC " + path);
						}
					} else {
						gcPaths.remove(path);
					}
				} catch (Throwable e) {
					gcPaths.remove(path);
					LOG.warn(e);
				}
			}
		}

		LOG.debug("GC ended.");
	}

}

 

SharedLock共享锁:

package com.XXX.framework.lock.shared;

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

import com.XXX.framework.lock.LockZookeeperClient;

/**
 * 
 * description
 * 
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public class SharedLock {
	private InterProcessLock interProcessLock;

	public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) {
		super();
		
		if (StringUtils.isBlank(resourceId)) {
			throw new NullPointerException("resourceId");
		}
		String path = lockZookeeperClient.getBasePath();
		path += ("/" + resourceId.trim());

		interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path);
		lockZookeeperClient.gc(path);
	}
	
    /**
     * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call
     * to {@link #release()}
     *
     * @throws Exception ZK errors, connection interruptions
     */
	public void acquire() throws Exception {
		interProcessLock.acquire();
	}

    /**
     * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call
     * to {@link #release()}
     *
     * @param time time to wait
     * @param unit time unit
     * @return true if the mutex was acquired, false if not
     * @throws Exception ZK errors, connection interruptions
     */
	public boolean acquire(long time, TimeUnit unit) throws Exception {
		return interProcessLock.acquire(time, unit);
	}

    /**
     * Perform one release of the mutex.
     *
     * @throws Exception ZK errors, interruptions, current thread does not own the lock
     */
	public void release() throws Exception {
		interProcessLock.release();
	}
	
    /**
     * Returns true if the mutex is acquired by a thread in this JVM
     *
     * @return true/false
     */
	public boolean isAcquiredInThisProcess() {
		return interProcessLock.isAcquiredInThisProcess();
	}
}

 

到此代码已经完成。下面写一个简单的Demo:

		//LockZookeeperClientFactory通常是通过Spring配置注入的,此处是为了Demo的简单明了才这样写的,不建议这样写
		LockZookeeperClientFactory lzc = new LockZookeeperClientFactory();
		lzc.setZookeeperIpPort("10.100.15.1:8900");
		lzc.setBasePath("/locks/sharedLock/");
		lzc.init();

		SharedLock sharedLock = new SharedLock(lzc, "sharedLock1");
		try {
			if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) {
				System.out.println("sharedLock1 get");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				sharedLock.release();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		lzc.destroy();

 

就这样,系统就会每隔一分钟去回收一次没有使用的结点。



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [zookeeper 分布 共享] 推荐:

基于Zookeeper的分布式共享锁

- - ITeye博客
首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单. 这时,分布式共享锁就闪亮登场了. 共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了. Zookeeper就很容易实现. 具体的实现原理官网和其它网站也有翻译,这里就不在赘述了.

分布式服务框架:Zookeeper

- - 标点符
Zookeeper是一个高性能,分布式的,开源分布式应用协调服务. 它提供了简单原始的功能,分布式应用可以基于它实现更高级的服务,比如同步,配置管理,集群管理,名空间. 它被设计为易于编程,使用文件系统目录树作为数据模型. 服务端跑在java上,提供java和C的客户端API. Zookeeper是Google的Chubby一个开源的实现,是高有效和可靠的协同工作系统,Zookeeper能够用来leader选举,配置信息维护等,在一个分布式的环境中,需要一个Master实例或存储一些配置信息,确保文件写入的一致性等.

zookeeper( 转)

- - 企业架构 - ITeye博客
转自:http://qindongliang.iteye.com/category/299318. 分布式助手Zookeeper(一). Zookeeper最早是Hadoop的一个子项目,主要为Hadoop生态系统中一些列组件提供统一的分布式协作服务,在2010年10月升级成Apache Software .

分布式集群环境hadoop、hbase、zookeeper搭建(全)

- - CSDN博客云计算推荐文章
集群环境至少需要3个节点(也就是3台服务器设备):1个Master,2个Slave,节点之间局域网连接,可以相互ping通,下面举例说明,配置节点IP分配如下:. 三个节点均使用centos 6.3系统,为了便于维护,集群环境配置项最好使用相同用户名、用户密码、相同hadoop、hbase、zookeeper目录结构.

ZooKeeper-- 管理分布式环境中的数据

- - 互联网 - ITeye博客
1.随着分布式应用的不断深入,需要对集群管理逐步透明化. 监控集群和作业状态;可以充分的利用ZK的独有特性,熟悉程度决定应用高度. 2.Service端具有fast fail特性,非常健壮,无单点,不超过半数Server挂掉不会影响提供服务. 3.zookeeper名字空间由节点znode构成,其组织方式类似于文件系统, 其各个节点相当于目录和文件,通 过路径作为唯一标示.

zookeeper应用场景练习(分布式锁)

- - CSDN博客推荐文章
 在平常的高并发的程序中,为了保证数据的一致性,因此都会用到锁,来对当前的线程进行锁定. 在单机操作中,很好做到,比如可以采用Synchronized、Lock或者其他的读写多来锁定当前的线程. 但是在分布式的系统中,就很难做到这一点. 因此可以采用zookeeper中节点的特性来满足这一点.  1.每个客户端都去zookeeper上创建临时的顺序节点.

分布式配置服务etcd VS 分布式协调服务zookeeper

- - 操作系统 - ITeye博客
etcd是一个高可用的键值存储系统,主要用于共享配置和服务发现. etcd是由CoreOS开发并维护的,灵感来自于 ZooKeeper 和 Doozer,它使用Go语言编写,并通过Raft一致性算法处理日志复制以保证强一致性. Raft是一个来自Stanford的新的一致性算法,适用于分布式系统的日志复制,Raft通过选举的方式来实现一致性,在Raft中,任何一个节点都可能成为Leader.

ZooKeeper监控

- - 淘宝网通用产品团队博客
        在公司内部,有不少应用已经强依赖zookeeper,比如meta和精卫系统,zookeeper的工作状态直接影响它们的正常工作. 目前开源世界中暂没有一个比较成熟的zk-monitor,公司内部的各个zookeeper运行也都是无监控,无报表状态. 目前zookeeper-monitor能做哪些事情,讲到这个,首先来看看哪些因素对zookeeper正常工作比较大的影响:.

zookeeper原理

- - CSDN博客云计算推荐文章
1.为了解决分布式事务性一致的问题. 2.文件系统也是一个树形的文件系统,但比linux系统简单,不区分文件和文件夹,所有的文件统一称为znode. 3.znode的作用:存放数据,但上限是1M ;存放ACL(access control list)访问控制列表,每个znode被创建的时候,都会带有一个ACL,身份验证方式有三种:digest(用户名密码验证),host(主机名验证),ip(ip验证) ,ACL到底有哪些权限呢.

Zookeeper Client简介

- - zzm
直接使用zk的api实现业务功能比较繁琐. 因为要处理session loss,session expire等异常,在发生这些异常后进行重连. 又因为ZK的watcher是一次性的,如果要基于wather实现发布/订阅模式,还要自己包装一下,将一次性订阅包装成持久订阅. 另外如果要使用抽象级别更高的功能,比如分布式锁,leader选举等,还要自己额外做很多事情.