zookeeper应用场景练习(分布式锁)

标签: zookeeper 应用 分布式锁 | 发表时间:2016-02-03 11:20 | 作者:ZHOUCHAOQIANG
出处:http://blog.csdn.net

 

 在平常的高并发的程序中,为了保证数据的一致性,因此都会用到锁,来对当前的线程进行锁定。在单机操作中,很好做到,比如可以采用Synchronized、Lock或者其他的读写多来锁定当前的线程。但是在分布式的系统中,就很难做到这一点。因此可以采用zookeeper中节点的特性来满足这一点。大致实现的思路如下。

 1.每个客户端都去zookeeper上创建临时的顺序节点

 2.客户端判断当前自己创建的节点是不是最小的

 3.如果是的话,就获得了执行当前任务的锁

 4.如果不是的话,就找到比自己小的节点,然后进行监听,如果被删除的话,就可以获得锁


 上面就是大致的实现思路,下面我们来通过代码来实现一下。

 

package com.test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedLock {

	private String lockName;
	private final int timeOut = 3000;
	private final String root = "/locks";
	private String myZnode;// 代表当前节点信息
	private String waitZnode;
	private static Logger logger = LoggerFactory
			.getLogger(DistributedLock.class);
	private CuratorFramework client;
	private CountDownLatch latch = new CountDownLatch(1);

	public DistributedLock(String connectString, String lockName) {
		this.lockName = lockName;
		client = CuratorFrameworkFactory.builder().connectionTimeoutMs(timeOut)
				.connectString(connectString)
				.retryPolicy(new RetryNTimes(3, 3000)).build();
		ConnectionStateListener listener = new ConnectionStateListener() {

			public void stateChanged(CuratorFramework client,
					ConnectionState newState) {
				if (newState == ConnectionState.CONNECTED) {
					logger.info("连接成功了");
					latch.countDown();
				}
			}
		};

		client.getConnectionStateListenable().addListener(listener);
		client.start();
		try {
			latch.await();
			createRoot();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	/**
	 * @Title: 创建根节点root
	 * @Description: TODO
	 * @param
	 * @return void
	 * @throws
	 */
	private void createRoot() {
		try {
			Stat stat = client.checkExists().forPath(root);
			if (stat != null) {
				logger.info("root has already exists");
			} else {
				// 创建跟节点
				client.create().creatingParentsIfNeeded().forPath(root);

			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public void getLocks() {

		try {
			myZnode = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
					.forPath(root + "/" + lockName);
			logger.info(myZnode + "has created");
			// 取出所有的子节点,然后找出比自己小的节点,进行监听的设置
			List<String> subNodes = client.getChildren().forPath(root);
			// 取出所有带有lockname的节点信息
			List<String> lockObjNodes = new ArrayList<String>();
			for (String node : subNodes) {
				if (node.contains(lockName)) {
					lockObjNodes.add(node);
				}
			}
			// 对当前节点进行排序
			Collections.sort(lockObjNodes);
			// 判断当前的节点是不是最小的节点
			if (myZnode.equals(root + "/" + lockObjNodes.get(0))) {
				doAction();
			} else {
				// 找到比自己节点大一的节点进行监听
				String subMyZone = myZnode
						.substring(myZnode.lastIndexOf("/") + 1);
				waitZnode = lockObjNodes.get(Collections.binarySearch(
						lockObjNodes, subMyZone) - 1);
				// 对节点进行监听
				Stat stat = client.checkExists()
						.usingWatcher(deleteNodeWatcher).forPath("/"+waitZnode);
				if (stat != null) {
					System.out.println(Thread.currentThread().getName()
							+ "处于等待状态");
				} else {
					doAction();
				}
			}
		} catch (Exception e) {
			logger.error(e.getMessage());
		}
	}

	// 删除节点的事件监听
	CuratorWatcher deleteNodeWatcher = new CuratorWatcher() {

		public void process(WatchedEvent event) throws Exception {

			if (event.getType() == EventType.NodeDeleted) {
				doAction();
			}
		}
	};

	private void doAction() {
		System.out.println(Thread.currentThread().getName() + "开始执行");
		client.close();
	}
}


  下面来测试一下

 

/**     
 * @FileName: TestCurrentZk.java   
 * @Package:com.test   
 * @Description: TODO  
 * @author: LUCKY    
 * @date:2016年2月2日 下午11:36:04   
 * @version V1.0     
 */
package com.test;

/**
 * @ClassName: TestCurrentZk
 * @Description: TODO
 * @author: LUCKY
 * @date:2016年2月2日 下午11:36:04
 */
public class TestCurrentZk {

	public static void main(String[] args) throws Exception {
		Thread threads[] = new Thread[10];
		for (int i = 0; i < threads.length; i++) {
			threads[i] = new Thread(new Runnable() {
				public void run() {
					ClientTest clientTest = new ClientTest(
							"100.66.162.36:2181", "locknametest");
					clientTest.getLocks();
				}
			});

			threads[i].start();

		}
		Thread.sleep(Integer.MAX_VALUE);
	}
}


作者:ZHOUCHAOQIANG 发表于2016/2/3 11:20:26 原文链接
阅读:2 评论:0 查看评论

相关 [zookeeper 应用 分布式锁] 推荐:

zookeeper应用场景练习(分布式锁)

- - CSDN博客推荐文章
 在平常的高并发的程序中,为了保证数据的一致性,因此都会用到锁,来对当前的线程进行锁定. 在单机操作中,很好做到,比如可以采用Synchronized、Lock或者其他的读写多来锁定当前的线程. 但是在分布式的系统中,就很难做到这一点. 因此可以采用zookeeper中节点的特性来满足这一点.  1.每个客户端都去zookeeper上创建临时的顺序节点.

zookeeper应用场景

- - CSDN博客推荐文章
zookeeper采用了fast paxos算法,该算法比paxosa算法好的地方是解决了几个proposer交替发出提案,导致没有一个提案被批准的活锁问题. 为什么需要zookeeper. 如果我们有很多服务程序需要有一些配置信息,可以保存在zookeeper的对应的znode中. zookeeper保证多个服务器同时对znode里面信息的修改是一致的.

Zookeeper研究和应用

- medal - 搜索技术博客-淘宝
zookeeper是一个开源分布式的服务,它提供了分布式协作,分布式同步,配置管理等功能. 其实现的功能与google的chubby基本一致.zookeeper的官方网站已经写了一篇非常经典的概述性文章,请大家参阅:ZooKeeper: A Distributed Coordination Service for Distributed Applications.

[转]ZooKeeper典型应用场景一览

- - 企业架构 - ITeye博客
转载自:http://www.coder4.com/archives/3856. ZooKeeper是一个高可用的分布式数据管理与系统协调框架. 基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是 基于这样的特性,使得ZooKeeper解决很多分布式问题. 网上对ZK的应用场景也有不少介绍,本文将结合作者身边的项目例子,系统地对ZK的应用场景 进行一个分门归类的介绍.

ZooKeeper 典型的应用场景

- - CSDN博客推荐文章
下面详细介绍这些典型的应用场景,也就是 Zookeeper 到底能帮我们解决那些问题. 统一命名服务(Name Service). 分布式应用中,通常需要有一套完整的命名规则,既能够产生唯一的名称又便于人识别和记住,通常情况下用树形的名称结构是一个理想的选择,树形的名称结构是一个有层次的目录结构,既对人友好又不会重复.

zookeeper( 转)

- - 企业架构 - ITeye博客
转自:http://qindongliang.iteye.com/category/299318. 分布式助手Zookeeper(一). Zookeeper最早是Hadoop的一个子项目,主要为Hadoop生态系统中一些列组件提供统一的分布式协作服务,在2010年10月升级成Apache Software .

在 python 中使用 zookeeper 管理你的应用集群

- Ken - python.cn(jobs, news)
Zookeeper 分布式服务框架是 Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等. python中有一个zkpython的包,是基于zookeeper的c-client开发的,所以安装的时候需要先安装zookeeper的c客户端.

zookeeper应用场景练习(数据发布/订阅)

- - CSDN博客综合推荐文章
 前面几篇博客大致讲解了一下有关zookeeper的概念知识,下面结合前面的几篇博客来讲解一下zookeeper的使用场景.  所谓的配置中心,就是发布者把数据发送到zookeeper的一个或者一系列的节点上,供订阅者进行订阅. 从而达到动态获取数据的目的,能够实现配置信息的集中式管理和数据的动态更新.

ZooKeeper监控

- - 淘宝网通用产品团队博客
        在公司内部,有不少应用已经强依赖zookeeper,比如meta和精卫系统,zookeeper的工作状态直接影响它们的正常工作. 目前开源世界中暂没有一个比较成熟的zk-monitor,公司内部的各个zookeeper运行也都是无监控,无报表状态. 目前zookeeper-monitor能做哪些事情,讲到这个,首先来看看哪些因素对zookeeper正常工作比较大的影响:.

zookeeper原理

- - CSDN博客云计算推荐文章
1.为了解决分布式事务性一致的问题. 2.文件系统也是一个树形的文件系统,但比linux系统简单,不区分文件和文件夹,所有的文件统一称为znode. 3.znode的作用:存放数据,但上限是1M ;存放ACL(access control list)访问控制列表,每个znode被创建的时候,都会带有一个ACL,身份验证方式有三种:digest(用户名密码验证),host(主机名验证),ip(ip验证) ,ACL到底有哪些权限呢.