zookeeper__leader选举

标签: zookeeper leader 选举 | 发表时间:2014-11-09 21:47 | 作者:lvdccyb
出处:http://www.iteye.com

 

ZooKeeper进行领导者选举是比较容易的。

 

伪代码表示:

zkclient:

<1>判定是否存在/zxeample/leader路径

<2>如果不存在,那么试图创建一个会话znode(Ephemeral Path)(path = /zxeample/leader,data=client id)

 

<2.1>创建成功,标识自己是leader

<2.2>创建不成功(包括异常)转向<1>

<3>如果存在path=/zxeample/leader,标识自己是slave,(可能需要与leader进行通信)

<4>如果自己是slave,那么监控该znode的data change事件。(用于当leader挂了,事件通知模型,就会产生事件触发通知,从而进行新的选举领导者)

 

基于java开源org.I0Itec.zkclient库实现,更简单。kafka也是基于这个实现leader选举的,不过是scala写的。

 

测试方法:

(1)启动ZooKeeper server

(2)启动zkCli

 (3)启动程序,

构建10个线程,每个线程都是一个ZkClient,

(4)然后在zkCli中,使用命令rmr /zxexample/leader

 

总结:尚有2个不如人意之处.创建znode有冲突,因为存在多个client同时创建,单只有一个成功,其余失败(逻辑正确),但是会打印很多异常。第二,线程是用sleep,因此,其实是一直在循环,即轮询,而没有消息驱动的方式。

 

 

package zkexam;

import java.security.SecureRandom;
import java.util.concurrent.Callable;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/**
 * choose a server as a Leader(Master),while other servers are slaves.
 * 
 * @author Free
 *
 */
public class ServerElect {
	SecureRandom rand = new SecureRandom();

	public ServerElect() {

	}

	public static class Leader {
		ZkClient leader;

		// byte[] data;
		public ZkClient getClient() {
			return leader;
		}

		public void setClient(ZkClient leaderClient) {
			this.leader = leaderClient;
		}
	}

	Leader selectLeader(ZkClient... client) {
		if (client == null || client.length < 0) {
			throw new IllegalArgumentException(
					"no zookeeper client need to be selected as leader.");

		}
		Leader leader = new Leader();
		do {
			int i = rand.nextInt() % (client.length);
			try {
				client[i].createEphemeral("/zxexample/leader", "I am leader "
						+ i);
				leader.setClient(client[i]);
				for (int j = 0; j < client.length && j != i; j++) {

				}
				break;
			} catch (Exception e) {
				e.printStackTrace();
			}
		} while (true);
		return leader;
	}

	public class MyWatcher<T> implements Watcher {
		Callable<T> callback;

		MyWatcher(Callable<T> c) {
			callback = c;
		}

		@Override
		public void process(WatchedEvent event) {
			org.apache.zookeeper.Watcher.Event.EventType eventType = event
					.getType();
			switch (eventType) {
			case NodeDeleted:
				try {
					callback.call();
				} catch (Exception e) {
					e.printStackTrace();
				}
				break;
			default:
				break;
			}
		}

	}

	public static class LeaderChangeListener implements IZkDataListener {
		ZkClient client;

		public LeaderChangeListener(ZkClient client_) {
			client = client_;
		}

		/**
		 * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
		 * 
		 * @throws Exception
		 *             On any error.
		 */
		public void handleDataChange(String dataPath, Object data) {

			System.out.println("a new leader is elected.");
		}

		@Override
		public void handleDataDeleted(String dataPath) throws Exception {
			System.out.println(dataPath + ":data is deleted.");
		}
	}

	public static class zkClientThread extends Thread {
		final static String path = "/zxexample/leader";
		ZkClient client;
		long maxMsToWaitUntilConnected;
		volatile boolean isFirstTime = true;
		volatile boolean isLeader;
		String data;

		// Watcher watcher;
		public zkClientThread(ZkClient client_, String name) {
			super(name);
			client = client_;

		}

		public void start() {
			super.start();
		}

		public void tryLeader() {
			try {
				data = getName();
				if (!client.exists(path)) {
					try {
						client.createEphemeral(path, data);
					} catch (ZkNoNodeException e) {
						String parentDir = path.substring(0,
								path.lastIndexOf('/'));
						if (parentDir.length() != 0) {
							client.createPersistent(parentDir, true);
						}
						client.createEphemeral(path, data);
					}
					isLeader = true;
					System.out.println("I am leader :" + getName());
				}
			} catch (Exception e) {
				e.printStackTrace();
				isFirstTime = true;
				isLeader = false;
			}
		}

		public void run() {
			while (true) {
				if (client.exists(path)) {
					if (isFirstTime) {
						Object obj = client.readData(path);
						if (obj == null || !obj.toString().equals(getName())) {
							tryLeader();
						} else {
							// client.subscribeDataChanges(path,
							// new LeaderChangeListener(client));
							// wait leader ,and communication to leader;
							client.watchForData(path);
						}
						isFirstTime = false;
					}
				} else {
					tryLeader();
				}
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					break;
				}
			}
		}
	}

	public static void main(String args[]) {
		int curClientCount = 10;
		ZkClient[] client = new ZkClient[curClientCount];
		zkClientThread[] zkThreads = new zkClientThread[curClientCount];
		for (int i = 0; i < curClientCount; i++) {

			client[i] = new ZkClient("127.0.0.1:2181", 218100);
			zkThreads[i] = new zkClientThread(client[i], "zk-" + i);
		}
		for (int i = 0; i < zkThreads.length; i++) {
			zkThreads[i].start();
		}
	}
}

 

 

 

I am leader :zk-6
I am leader :zk-5
I am leader :zk-6
org.I0Itec.zkclient.exception.ZkNodeExistsException: org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zxexample/leader
	at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:55)
	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
	at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
	at org.I0Itec.zkclient.ZkClient.createEphemeral(ZkClient.java:328)
	at zkexam.ServerElect$zkClientThread.tryLeader(ServerElect.java:141)
	at zkexam.ServerElect$zkClientThread.run(ServerElect.java:169)
Caused by: org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zxexample/leader
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
	at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
	at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
	at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
	... 4 more
I am leader :zk-3

 

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [zookeeper leader 选举] 推荐:

图解zookeeper FastLeader选举算法

- - C++博客-首页原创精华区
zookeeper配置为集群模式时,在启动或异常情况时会选举出一个实例作为Leader. 其默认选举算法为 FastLeaderElection. 不知道zookeeper的可以考虑这样一个问题:某个服务可以配置为多个实例共同构成一个集群对外提供服务. 其每一个实例本地都存有冗余数据,每一个实例都可以直接对外提供读写服务.

zookeeper( 转)

- - 企业架构 - ITeye博客
转自:http://qindongliang.iteye.com/category/299318. 分布式助手Zookeeper(一). Zookeeper最早是Hadoop的一个子项目,主要为Hadoop生态系统中一些列组件提供统一的分布式协作服务,在2010年10月升级成Apache Software .

ZooKeeper监控

- - 淘宝网通用产品团队博客
        在公司内部,有不少应用已经强依赖zookeeper,比如meta和精卫系统,zookeeper的工作状态直接影响它们的正常工作. 目前开源世界中暂没有一个比较成熟的zk-monitor,公司内部的各个zookeeper运行也都是无监控,无报表状态. 目前zookeeper-monitor能做哪些事情,讲到这个,首先来看看哪些因素对zookeeper正常工作比较大的影响:.

zookeeper原理

- - CSDN博客云计算推荐文章
1.为了解决分布式事务性一致的问题. 2.文件系统也是一个树形的文件系统,但比linux系统简单,不区分文件和文件夹,所有的文件统一称为znode. 3.znode的作用:存放数据,但上限是1M ;存放ACL(access control list)访问控制列表,每个znode被创建的时候,都会带有一个ACL,身份验证方式有三种:digest(用户名密码验证),host(主机名验证),ip(ip验证) ,ACL到底有哪些权限呢.

Zookeeper Client简介

- - zzm
直接使用zk的api实现业务功能比较繁琐. 因为要处理session loss,session expire等异常,在发生这些异常后进行重连. 又因为ZK的watcher是一次性的,如果要基于wather实现发布/订阅模式,还要自己包装一下,将一次性订阅包装成持久订阅. 另外如果要使用抽象级别更高的功能,比如分布式锁,leader选举等,还要自己额外做很多事情.

zookeeper 理论

- - zzm
引用官方的说法:“Zookeeper是一个高性能,分布式的,开源分布式应用协调服务. 它提供了简单原始的功能,分布式应用可以基于它实现更高级 的服务,比如同步,配置管理,集群管理,名空间. 它被设计为易于编程,使用文件系统目录树作为数据模型. 服务端跑在java上,提供java和C的客户端 API”.

ZooKeeper 入门

- - 企业架构 - ITeye博客
ZooKeeper是一个高可用的分布式数据管理与系统协调框架. 基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得ZooKeeper解决很多分布式问题. 网上对ZK的应用场景也有不少介绍,本文将结合作者身边的项目例子,系统地对ZK的应用场景进行一个分门归类的介绍.

zookeeper场景

- - 企业架构 - ITeye博客
发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新. 例如全局的配置信息,服务式服务框架的服务地址列表等就非常适合使用. 应用中用到的一些配置信息放到ZK上进行集中管理. 这类场景通常是这样:应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个Watcher,这样一来,以后每次配置有更新的时候,都会实时通知到订阅的客户端,从来达到获取最新配置信息的目的.

Zookeeper的Session

- - 行业应用 - ITeye博客
介绍一下基于zookeeper的一些API的编程. 在此之前,我们先来熟悉一下相关知识:. Zookeeper的Session:. (1)客户端和server间采用长连接. (2)连接建立后,server产生session ID(64位)返还给客户端. (3)客户端定期发送ping包来检查和保持和server的连接.

Paxos与zookeeper

- - 互联网 - ITeye博客
1,什么是Paxos算法. Paxos算法是分布式计算领域中一个非常重要的算法,主要解决分布式系统如何就某个值(决议)达成一致的问题. 一个典型的场景是分布式数据库的一致问题:如果分布式数据库的各个节点初始状态一致,又能执行相同的操作序列,那么最后能达到一个一致的状态. 但是如何保证在每个节点上执行相同的命令序列呢.