Dubbo不能优雅停机,导致停止服务的时候,业务掉单

标签: dubbo 服务 业务 | 发表时间:2015-11-03 12:57 | 作者:frankfan915
出处:http://www.iteye.com

Dubbo 优雅停机修改方案

 

1.       服务端不能优雅停机的原因:

NettyServer在构造函数中会调用

ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))

方法将handler进行包装,包装成MultiMessageHandler的一个对象。在下面红色代码中会判断handler是否是WrappedChannelHandler对象,只有是的时候才会对executor对象复值。因为MultiMessageHandler对象不是WrappedChannelHandler的子类,所以executor为空。

当NettyServer的close方法被调用的时候,会调用ExecutorUtil.gracefulShutdown方法,gracefulShutdown方法只有executor不为空时才会等待线程池关闭

 

public abstract class AbstractServer extends AbstractEndpoint implements Server {

   public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {

        super(url, handler);

        localAddress = getUrl().toInetSocketAddress();

        String host = url.getParameter(Constants.ANYHOST_KEY, false)

                        || NetUtils.isInvalidLocalHost(getUrl().getHost())

                        ? NetUtils.ANYHOST : getUrl().getHost();

        bindAddress = new InetSocketAddress(host, getUrl().getPort());

        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);

        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);

        try {

            doOpen();

            if (logger.isInfoEnabled()) {

                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());

            }

        } catch (Throwable t) {

            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()

                                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);

        }

        if (handler instanceof WrappedChannelHandler ){

            executor = ((WrappedChannelHandler)handler).getExecutor();

        }

    }

  public void close(int timeout) {

        ExecutorUtil.gracefulShutdown(executor ,timeout);

        close();

    }

}

 

 

2.  服务端改动:

修改了AbstractServer类的红色部分

 public abstract class AbstractServer extends AbstractEndpoint implements Server {

 

    private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);

 

    private InetSocketAddress              localAddress;

 

    private InetSocketAddress              bindAddress;

 

    private int                            accepts;

 

    private int                            idleTimeout = 600; //600 seconds

 

    protected static final String SERVER_THREAD_POOL_NAME  ="DubboServerHandler";

 

    ExecutorService executor;

 

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {

        super(url, handler);

        localAddress = getUrl().toInetSocketAddress();

        String host = url.getParameter(Constants.ANYHOST_KEY, false)

                || NetUtils.isInvalidLocalHost(getUrl().getHost())

                ? NetUtils.ANYHOST : getUrl().getHost();

        bindAddress = new InetSocketAddress(host, getUrl().getPort());

        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);

        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);

        try {

            doOpen();

            if (logger.isInfoEnabled()) {

                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());

            }

        } catch (Throwable t) {

            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()

                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);

        }

        try {

            recycleToGetExecutor(handler);

        } catch (Exception e) {

            logger.error("recycle to get executor failed");

        }

    }

 

    private void recycleToGetExecutor(ChannelHandler handler) throws NoSuchFieldException, IllegalAccessException {

        if(handler == null)

        {

            return;

        }

 

        if (handler instanceof WrappedChannelHandler ){

            executor = ((WrappedChannelHandler)handler).getExecutor();

        }else if(handler instanceof AbstractChannelHandlerDelegate){

            Field field = AbstractChannelHandlerDelegate.class.getDeclaredField("handler");

            field.setAccessible(true);

            recycleToGetExecutor((ChannelHandler) field.get(handler));

        }

    }

 

    protected abstract void doOpen() throws Throwable;

 

    protected abstract void doClose() throws Throwable;

 

    public void reset(URL url) {

        if (url == null) {

            return;

        }

        try {

            if (url.hasParameter(Constants.ACCEPTS_KEY)) {

                int a = url.getParameter(Constants.ACCEPTS_KEY, 0);

                if (a > 0) {

                    this.accepts = a;

                }

            }

        } catch (Throwable t) {

            logger.error(t.getMessage(), t);

        }

        try {

            if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {

                int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);

                if (t > 0) {

                    this.idleTimeout = t;

                }

            }

        } catch (Throwable t) {

            logger.error(t.getMessage(), t);

        }

        try {

            if (url.hasParameter(Constants.THREADS_KEY)

                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {

                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;

                int threads = url.getParameter(Constants.THREADS_KEY, 0);

                int max = threadPoolExecutor.getMaximumPoolSize();

                int core = threadPoolExecutor.getCorePoolSize();

                if (threads > 0 && (threads != max || threads != core)) {

                    if (threads < core) {

                        threadPoolExecutor.setCorePoolSize(threads);

                        if (core == max) {

                            threadPoolExecutor.setMaximumPoolSize(threads);

                        }

                    } else {

                        threadPoolExecutor.setMaximumPoolSize(threads);

                        if (core == max) {

                            threadPoolExecutor.setCorePoolSize(threads);

                        }

                    }

                }

            }

        } catch (Throwable t) {

            logger.error(t.getMessage(), t);

        }

        super.setUrl(getUrl().addParameters(url.getParameters()));

    }

 

    public void send(Object message, boolean sent) throws RemotingException {

        Collection<Channel> channels = getChannels();

        for (Channel channel : channels) {

            if (channel.isConnected()) {

                channel.send(message, sent);

            }

        }

    }

 

    public void close() {

        if (logger.isInfoEnabled()) {

            logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());

        }

        ExecutorUtil.shutdownNow(executor ,100);

        try {

            super.close();

        } catch (Throwable e) {

            logger.warn(e.getMessage(), e);

        }

        try {

            doClose();

        } catch (Throwable e) {

            logger.warn(e.getMessage(), e);

        }

    }

 

    public void close(int timeout) {

        ExecutorUtil.gracefulShutdown(executor ,timeout);

        close();

    }

 

    public InetSocketAddress getLocalAddress() {

        return localAddress;

    }

 

    public InetSocketAddress getBindAddress() {

        return bindAddress;

    }

 

    public int getAccepts() {

        return accepts;

    }

 

    public int getIdleTimeout() {

        return idleTimeout;

    }

 

    @Override

    public void connected(Channel ch) throws RemotingException {

        Collection<Channel> channels = getChannels();

        if (accepts > 0 && channels.size() > accepts) {

            logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);

            ch.close();

            return;

        }

        super.connected(ch);

    }

 

    @Override

    public void disconnected(Channel ch) throws RemotingException {

        Collection<Channel> channels = getChannels();

        if (channels.size() == 0){

            logger.warn("All clients has discontected from " + ch.getLocalAddress() + ". You can graceful shutdown now.");

        }

        super.disconnected(ch);

    }

 

}

 

3.    客户端不能优雅停机的原因:

DubboInvoker停机的时候直接调用了client.close()方法应该调用client.close(long timeout)方法

public class DubboInvoker<T> extends AbstractInvoker<T> {

public void destroy() {

        //防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭

        if (super.isDestroyed()){

            return ;

        } else {

            //dubbo check ,避免多次关闭

            destroyLock.lock();

            try{

                if (super.isDestroyed()){

                    return ;

                }

                super.destroy();

                if (invokers != null){

                    invokers.remove(this);

                }

                for (ExchangeClient client : clients) {

                    try {

                        client.close();

                    } catch (Throwable t) {

                        logger.warn(t.getMessage(), t);

                    }

                }

               

            }finally {

                destroyLock.unlock();

            }

        }

}

}

 

HeaderExchangeChannel类中判断HeaderExchangeChannel.this是否在DefaultFuture中。实际上在处理请求的时候是将HeaderExchangeChannel对象中channel成员变量放到DefaultFuture中的

final class HeaderExchangeChannel implements ExchangeChannel {

  public ResponseFuture request(Object request, int timeout) throws RemotingException {

        if (closed) {

            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");

        }

        // create request.

        Request req = new Request();

        req.setVersion("2.0.0");

        req.setTwoWay(true);

        req.setData(request);

        DefaultFuture future = new DefaultFuture(channel, req, timeout);

        try{

            channel.send(req);

        }catch (RemotingException e) {

            future.cancel();

            throw e;

        }

        return future;

}

 

public void close(int timeout) {

        if (closed) {

            return;

        }

        closed = true;

        if (timeout > 0) {

            long start = System.currentTimeMillis();

            while (DefaultFuture.hasFuture(HeaderExchangeChannel.this)

                    && System.currentTimeMillis() - start < timeout) {

                try {

                    Thread.sleep(10);

                } catch (InterruptedException e) {

                    logger.warn(e.getMessage(), e);

                }

            }

        }

        close();

    }

}

 

4.    客户端的修改

 

修改下面高量代码:

public class DubboInvoker<T> extends AbstractInvoker<T> {

    public void destroy() {

        //防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭

        if (super.isDestroyed()){

            return ;

        } else {

            //dubbo check ,避免多次关闭

            destroyLock.lock();

            try{

                if (super.isDestroyed()){

                    return ;

                }

                super.destroy();

                if (invokers != null){

                    invokers.remove(this);

                }

                for (ExchangeClient client : clients) {

                    try {

                        logger.info("start to shutdown:" + getServerShutdownTimeout());

                        client.close(getServerShutdownTimeout());

                    } catch (Throwable t) {

                        logger.warn(t.getMessage(), t);

                    }

                }

 

            }finally {

                destroyLock.unlock();

            }

        }

    }

 

    protected static int getServerShutdownTimeout() {

        int timeout = Constants.DEFAULT_SERVER_SHUTDOWN_TIMEOUT;

        String value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY);

        if (value != null && value.length() > 0) {

            try{

                timeout = Integer.parseInt(value);

            }catch (Exception e) {

            }

        } else {

            value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY);

            if (value != null && value.length() > 0) {

                try{

                    timeout = Integer.parseInt(value) * 1000;

                }catch (Exception e) {

                }

            }

        }

 

        return timeout;

    }

}

 

final class HeaderExchangeChannel implements ExchangeChannel {

     // graceful close

    public void close(int timeout) {

        if (closed) {

            return;

        }

        closed = true;

        if (timeout > 0) {

            long start = System.currentTimeMillis();

            while (DefaultFuture.hasFuture(channel)

                    && System.currentTimeMillis() - start < timeout) {

                try {

                    Thread.sleep(10);

                } catch (InterruptedException e) {

                    logger.warn(e.getMessage(), e);

                }

            }

        }

        close();

    }

}

 

 

注意服务端和客户端都需要加上配置dubbo.service.shutdown.wait=60000 ,设定停机等待时间



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [dubbo 服务 业务] 推荐:

dubbo服务降级实现dubbo-plus/circuitbreaker at master · dubboclub/dubbo-plus · GitHub

- -
向注册中心写入动态配置覆盖规则:(通过由监控中心或治理中心的页面完成). 表示消费方对该服务的方法调用都直接返回null值,不发起远程调用. 屏蔽不重要服务不可用时对调用方的影响. 表示消费方对该服务的方法调用在失败后,再返回null值,不抛异常. 容忍不重要服务不稳定时对调用方的影响. Dubbo支持服务降级,并且支持当服务出现异常的时候进行服务降级处理,但是存在一下几个缺陷.

Dubbo不能优雅停机,导致停止服务的时候,业务掉单

- - 开源软件 - ITeye博客
1.       服务端不能优雅停机的原因:. NettyServer在构造函数中会调用. 方法将handler进行包装,包装成MultiMessageHandler的一个对象. 在下面红色代码中会判断handler是否是WrappedChannelHandler对象,只有是的时候才会对executor对象复值.

谈Dubbo服务框架

- - 人月神话的BLOG
Dubbo 是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成. 它最大的特点是按照分层的方式来架构,使用这种方式可以使各个层之间解耦合(或者最大限度地松耦合). 从服务模型的角度来看,Dubbo采用的是一种非常简单的模型,要么是提供方提供服务,要么是消费方消费服务,所以基于这一点可以抽象出服务提供方(Provider)和服务消费方(Consumer)两个角色.

dubbo服务化实施整理

- - 企业架构 - ITeye博客
随着快的业务的快速发展,我们逐步按照业务垂直划分,抽象出基础服务层. 基础业务的服务为上游业务的灵活发展提供支持. 服务应用本身无状态化,可以随着系统的负荷灵活伸缩来提供服务能. 服务的稳定性,可用性达到99%. dubbo来作为服务化中间件,dubbo作为一个RPC框架,大致的原理如下图. Registry: 注册中心;和服务的消费者,和服务提供者都建立长连接.

dubbo服务telnet命令 - 秦鹏飞

- - 博客园_首页
dubbo服务发布之后,我们可以利用telnet命令进行调试、管理. Dubbo2.0.5以上版本服务提供端口支持telnet命令,下面我以 Windows为例抛砖引玉一下:.     测试对应IP和端口下的dubbo服务是否连通,cmd命令如下.     正常情况下,进入telnet窗口,键入回车进入dubbo命令模式.

dubbo服务治理(一)降级

- - 企业架构 - ITeye博客
在线网站一般都会有服务器压力剧增的时候,比如说网上商城的促销,这个时候常用的手段就是服务降级,根据当前业务情况及流量对一些服务和页面有策略的降级,以此缓解了服务器资源压力,以保证核心任务的正常运行,同时也保证了部分甚至大部分客户得到正确响应. 页面拒绝服务:页面提示由于服务繁忙此服务暂停. 跳转到varnish或nginx的一个静态页面.

阿里巴巴Dubbo分布式服务框架已开源

- tangfl - ITeye论坛最新精华讨论帖
Serving services with invocations everyday, Dubbo becomes the key part of Alibaba's SOA solution and has been deployed to the whole alibaba.com family:.

Dubbo:来自于阿里巴巴的分布式服务框架

- - 标点符
Dubbo是阿里巴巴SOA服务化治理方案的核心框架,每天为2,000+个服务提供3,000,000,000+次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点. Dubbo是一个阿里巴巴开源出来的一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案. 远程通讯: 提供对多种基于长连接的NIO框架抽象封装,包括多种线程模型,序列化,以及“请求-响应”模式的信息交换方式.

浅谈SOA面向服务化编程架构(dubbo)

- - ITeye博客
并非淘宝系的技术啦,淘宝系的分布式服务治理框架式HSF啦 ,只闻其声,不能见其物. 而dubbo是阿里开源的一个SOA服务治理解决方案,dubbo本身 集成了监控中心,注册中心,负载集群...等等. 代码和整体的框架还是很优雅滴呀. github地址 https://github.com/alibaba/dubbo.

阿里巴巴分布式服务框架-Dubbo问与答

- - Arccode's blog
Dubbo是阿里巴巴内部的SOA服务化治理方案的核心框架,每天为2000+ 个服务提供3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点. Dubbo自2011年开源后,已被许多非阿里系公司使用. 项目主页: http://dubbo.io/Home-zh.htm.