[原]Curator服务发现
- - 升华思想 技术高深一个服务发现系统提供下面几个机制:. 注册它们有用的服务. 定位一个单一特殊服务的实例. 当一个服务改变时发出通知. 3.3.1.1 服务实例. 一个服务实例使用类ServiceInstance作为服务实例类. ServiceInstance有一个名称、id、地址、端口或者ssl端口以及可选负载(用户定义).
一个服务发现系统提供下面几个机制:
注册它们有用的服务
定位一个单一特殊服务的实例
当一个服务改变时发出通知
一个服务实例使用类ServiceInstance作为服务实例类。ServiceInstance有一个名称、id、地址、端口或者ssl端口以及可选负载(用户定义)。ServiceInstances序列化并存储到ZooKeeper服务器上。
base path
|_______ service A name
|__________ instance 1 id --> (serialized ServiceInstance)
|__________ instance 2 id --> (serialized ServiceInstance)
|__________ ...
|_______ service B name
|__________ instance 1 id --> (serialized ServiceInstance)
|__________ instance 2 id --> (serialized ServiceInstance)
|__________ ...
|_______ ...
主要抽象类是ServiceProvider,它提供了命名的服务、提供策略等服务。服务策略有三种: Round Robin, Random 和 Sticky(总是选择相同一个)。
ServiceProvider开始服务类必须提供start()开始方法和完成时调用close()。唯一在ServiceProvider类的方法有下面定义:
public ServiceInstance<T> getInstance()
throws Exception
Return an instance for a single use. IMPORTANT: users should not hold on to the instance
returned. A fresh instance should always be retrieved.
Returns: the instance to use
为了分配一个ServiceProvider,你必须有一个ServiceDiscovery类。服务发现类是由ServiceDiscoveryBuilder构建的。
我们创建一个注册服务的简单程序,改程序主要由三个类组成。
DistributedService: 注册服务类,提供服务的地址,服务内容信息。
DistributedServer: 分布式服务的服务类,创建服务实例,注册服务。
DistributedDiscovery:分布式服务发现类,服务类的初始化,添加、删除、呈现服务。
DistributedService类定义:
import org.codehaus.jackson.map.annotate.JsonRootName;
/**
* @author: [email protected]
*/
@JsonRootName("services")
public class DistributedService {
private String address ;
private String info;
public DistributedService(){
this("", "");
}
public DistributedService(String address, String info){
this.address = address;
this.info = info;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
}
DistributedServer类定义:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import java.io.Closeable;
import java.io.IOException;
/**
* @author: [email protected]
*/
public class DistributedServer implements Closeable {
private final ServiceDiscovery<DistributedService> serviceDiscovery;
private final ServiceInstance<DistributedService> thisInstance;
DistributedServer(CuratorFramework client,
String path,
String serviceName,
String address,
String info ) throws Exception {
// in a real application, you'd have a convention of some kind for the URI layout
UriSpec uriSpec = new UriSpec("{scheme}://zookeeper.com:{port}");
thisInstance = ServiceInstance.<DistributedService>builder()
.name(serviceName)
.payload(new DistributedService(address, info))
.port((int) (65535 * Math.random())) // in a real application, you'd use a common port
.uriSpec(uriSpec)
.build();
// if you mark your payload class with @JsonRootName the provided JsonInstanceSerializer will work
JsonInstanceSerializer<DistributedService> serializer = new
JsonInstanceSerializer<DistributedService>(DistributedService.class);
serviceDiscovery = ServiceDiscoveryBuilder.builder(DistributedService.class)
.client(client)
.basePath(path)
.serializer(serializer)
.thisInstance(thisInstance)
.build();
}
public ServiceInstance<DistributedService> getThisInstance() {
return thisInstance;
}
public void start() throws Exception {
serviceDiscovery.start();
}
@Override
public void close() throws IOException {
serviceDiscovery.close();
}
}
DistributedDiscover类定义:
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import util.ZooKeeperConf;
import java.util.Collection;
import java.util.List;
/**
* @author [email protected]
*/
public class DistributedDiscovery {
private List<DistributedServer> servers = Lists.newArrayList();
private static final String PATH = "/discovery/distributed_services";
private CuratorFramework client = null;
private ServiceDiscovery<DistributedService> serviceDiscovery = null;
public DistributedDiscovery(){
init();
}
public void init(){
client = CuratorFrameworkFactory.newClient(ZooKeeperConf.CLUSTER_NODES,
new ExponentialBackoffRetry(1000, 3));
client.start();
JsonInstanceSerializer<DistributedService> serializer = new
JsonInstanceSerializer<DistributedService>(DistributedService.class);
serviceDiscovery = ServiceDiscoveryBuilder.builder(DistributedService.class)
.client(client)
.basePath(PATH)
.serializer(serializer)
.build();
try {
serviceDiscovery.start();
} catch (Exception e) {
System.out.println("serviceServiceDiscovery.start() with an exception." +
e.getMessage());
e.printStackTrace();
}
}
public void addService(String serviceName,
String address,
String info) throws Exception {
DistributedServer server = new DistributedServer(client, PATH, serviceName,address, info);
servers.add(server);
server.start();
System.out.println(serviceName + " added");
}
public void listServices() throws Exception {
Collection<String> serviceNames = serviceDiscovery.queryForNames();
System.out.println(serviceNames.size() + " type(s)");
for (String serviceName : serviceNames) {
Collection<ServiceInstance<DistributedService>> instances = serviceDiscovery.queryForInstances(serviceName);
System.out.println(serviceName);
for(ServiceInstance<DistributedService> instance: instances){
outputInstance(instance);
}
}
}
private static void outputInstance(ServiceInstance<DistributedService> instance) {
System.out.println("\t address: " + instance.getPayload().getAddress()+ ", info: " +
instance.getPayload().getInfo()+ ": " + instance.buildUriSpec());
}
public List<DistributedServer> getServers() {
return servers;
}
}
下面是测试类ServicesMain
import java.lang.management.ManagementFactory;
import java.util.concurrent.Callable;
/**
* @author [email protected]
*/
public class ServicesMain {
public static void main(String[] args) throws Exception {
DistributedDiscovery dd = new DistributedDiscovery();
String name = ManagementFactory.getRuntimeMXBean().getName();
System.out.printf("getRuntimeMXBean mame: %s", name);
int index = name.indexOf('@');
// pid as the service name .
dd.addService(name.substring(0, index), "192.168.11.2:8089", "cluster node 1");
//
dd.addService(name, "192.168.11.3:8089", "cluster node 2");
dd.listServices();
// Callable<Boolean > callable = new Callable<Boolean>(){
// @Override
// public Boolean call() throws Exception{
// System.out.println();
// boolean isStop = true;
// while(isStop){
// //wait 10 seconds
// Thread.sleep(10000);
// isStop = false;
// }
// return true;
// }
// };
// callable.call();
Thread.currentThread().sleep(10000);
dd.listServices();
}
}
多次执行上面的代码,你可以看到第一次执行时,会打印出所有的注册服务。一旦有的注册服务消失,发现到的服务也会减少。
getRuntimeMXBean mame: 8040@JOHNLAU--localhost: 172.24.219.99
8040 added
--localhost: 172.24.219.99
8040@JOHNLAU added
10 type(s)
test-b
test-a
2620
address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:42118
8040@JOHNLAU
address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:17009
7896
10004@JOHNLAU
2620@JOHNLAU
address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:58955
10004
7896@JOHNLAU
8040
address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:53458
10 type(s)
test-b
test-a
2620
address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:42118
8040@JOHNLAU
address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:17009
7896
10004@JOHNLAU
2620@JOHNLAU
address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:58955
10004
7896@JOHNLAU
8040
address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:53458
Disconnected from the target VM, address: '127.0.0.1:58347', transport: 'socket'