基于Zookeeper的分布式共享锁
- - ITeye博客首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单. 这时,分布式共享锁就闪亮登场了. 共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了. Zookeeper就很容易实现. 具体的实现原理官网和其它网站也有翻译,这里就不在赘述了.
首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。
共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了。Zookeeper就很容易实现。具体的实现原理官网和其它网站也有翻译,这里就不在赘述了。
官网资料: http://zookeeper.apache.org/doc/r3.4.5/recipes.html
中文资料: https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper
详见Locks章节。
原理都知道了,网上一搜索Apache上面已经有提供了,既然已经有轮子了,哪我们也没必要重复造轮子了吧!直接使用 Curator。但是,我们在测试中发现,用于共享锁的结点无法自动回收,除了最末一级的临时结点会在锁释放和session超时的时候能自动回收外,其它结点均无法自动回收。我们的订单一天有好几万,遇到618和双十一的时候每天的订单量超50W,如果结点长期不回收的话,肯定会影响Zookeeper的性能。这时,我们就想到了一句话“自己动手,丰衣足食”。下面直接上代码:
首先,创建一个Maven工程,在pom文件里导入下面的包:
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.9.2</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> </dependencies>
LockZookeeperClient接口:
package com.XXX.framework.lock; import org.apache.curator.framework.CuratorFramework; /** * * description * * @author Roadrunners * @version 1.0, 2015年7月9日 */ public interface LockZookeeperClient { /** * * @return */ CuratorFramework getCuratorFramework(); /** * * @return */ String getBasePath(); /** * garbage collector * * @param gcPath */ void gc(String gcPath); }
LockZookeeperClient接口的实现LockZookeeperClientFactory:
package com.XXX.framework.lock; import java.util.Date; import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; /** * * description * * @author Roadrunners * @version 1.0, 2015年7月9日 */ public class LockZookeeperClientFactory implements LockZookeeperClient { private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class); private boolean hasGc = true; private Timer gcTimer; private TimerTask gcTimerTask; private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>(); private int gcIntervalSecond = 60; private CuratorFramework curatorFramework; private String zookeeperIpPort = "localhost:2181"; private int sessionTimeoutMs = 10000; private int connectionTimeoutMs = 10000; private String basePath = "/locks"; public void setHasGc(boolean hasGc) { this.hasGc = hasGc; } public void setGcIntervalSecond(int gcIntervalSecond) { this.gcIntervalSecond = gcIntervalSecond; } public void setZookeeperIpPort(String zookeeperIpPort) { this.zookeeperIpPort = zookeeperIpPort; } public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; } public void setBasePath(String basePath) { basePath = basePath.trim(); if (basePath.endsWith("/")) { basePath = basePath.substring(0, basePath.length() - 1); } this.basePath = basePath; } public void init() { if(StringUtils.isBlank(zookeeperIpPort)){ throw new NullPointerException("zookeeperIpPort"); } ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy); curatorFramework.start(); LOG.info("CuratorFramework initialise succeed."); if (hasGc) { gc(); } } public void destroy() { gcPaths.clear(); gcPaths = null; gcStop(); curatorFramework.close(); curatorFramework = null; } @Override public void gc(String gcPath) { if (hasGc && StringUtils.isNotBlank(gcPath)) { gcPaths.add(gcPath.trim()); } } @Override public CuratorFramework getCuratorFramework() { return this.curatorFramework; } @Override public String getBasePath() { return this.basePath; } private synchronized void gc() { gcStop(); try { scanningGCNodes(); } catch (Throwable e) { LOG.warn(e); } gcTimerTask = new TimerTask() { @Override public void run() { doingGc(); } }; Date begin = new Date(); begin.setTime(begin.getTime() + (10 * 1000L)); gcTimer = new Timer("lock-gc", true); gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L); } private synchronized void gcStop() { if (null != gcTimer) { gcTimer.cancel(); gcTimer = null; } if (null != gcTimerTask) { gcTimerTask.cancel(); gcTimerTask = null; } } private synchronized void scanningGCNodes() throws Exception { if (null == curatorFramework.checkExists().forPath(basePath)) { return; } List<String> paths = curatorFramework.getChildren().forPath(basePath); if (CollectionUtils.isEmpty(paths)) { gcPaths.add(basePath); return; } for (String path : paths) { try{ String tmpPath = basePath + "/" + path; if (null == curatorFramework.checkExists().forPath(tmpPath)) { continue; } gcPaths.add(tmpPath); } catch(Throwable e){ LOG.warn("scanning gc nodes error.", e); } } } private synchronized void doingGc() { LOG.debug("GC beginning."); if (CollectionUtils.isNotEmpty(gcPaths)) { for (String path : gcPaths) { try { if (null != curatorFramework.checkExists().forPath(path)) { if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) { curatorFramework.delete().forPath(path); gcPaths.remove(path); LOG.debug("GC " + path); } } else { gcPaths.remove(path); } } catch (Throwable e) { gcPaths.remove(path); LOG.warn(e); } } } LOG.debug("GC ended."); } }
SharedLock共享锁:
package com.XXX.framework.lock.shared; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import com.XXX.framework.lock.LockZookeeperClient; /** * * description * * @author Roadrunners * @version 1.0, 2015年7月9日 */ public class SharedLock { private InterProcessLock interProcessLock; public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) { super(); if (StringUtils.isBlank(resourceId)) { throw new NullPointerException("resourceId"); } String path = lockZookeeperClient.getBasePath(); path += ("/" + resourceId.trim()); interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path); lockZookeeperClient.gc(path); } /** * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call * to {@link #release()} * * @throws Exception ZK errors, connection interruptions */ public void acquire() throws Exception { interProcessLock.acquire(); } /** * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call * to {@link #release()} * * @param time time to wait * @param unit time unit * @return true if the mutex was acquired, false if not * @throws Exception ZK errors, connection interruptions */ public boolean acquire(long time, TimeUnit unit) throws Exception { return interProcessLock.acquire(time, unit); } /** * Perform one release of the mutex. * * @throws Exception ZK errors, interruptions, current thread does not own the lock */ public void release() throws Exception { interProcessLock.release(); } /** * Returns true if the mutex is acquired by a thread in this JVM * * @return true/false */ public boolean isAcquiredInThisProcess() { return interProcessLock.isAcquiredInThisProcess(); } }
到此代码已经完成。下面写一个简单的Demo:
//LockZookeeperClientFactory通常是通过Spring配置注入的,此处是为了Demo的简单明了才这样写的,不建议这样写 LockZookeeperClientFactory lzc = new LockZookeeperClientFactory(); lzc.setZookeeperIpPort("10.100.15.1:8900"); lzc.setBasePath("/locks/sharedLock/"); lzc.init(); SharedLock sharedLock = new SharedLock(lzc, "sharedLock1"); try { if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) { System.out.println("sharedLock1 get"); } } catch (Exception e) { e.printStackTrace(); } finally { try { sharedLock.release(); } catch (Exception e) { e.printStackTrace(); } } lzc.destroy();
就这样,系统就会每隔一分钟去回收一次没有使用的结点。