国信证券 RocketMQ + Spring Boot 在配置热加载的实践

标签: dev | 发表时间:2019-12-08 00:00 | 作者:
出处:http://itindex.net/relian

在进行微服务架构研发的过程中,不少研发人员都提出配置热加载的需求,一方面是为了提升研发效率,开发人员修改配置不需要重启服务;另一方面是为了线上紧急事件,可以快速恢复,例如线上数据库 down 了,可以紧急启动降级开关,通过配置热加载动态生效,降低处理事件的时间。于是我采用 rocketmq 消息队列,实现一套解耦的配置热加载机制。这里我将思路简单介绍下,希望大家能通过这篇文章,实现自己的配置热加载。

1、设计方案


  1. 配置中心修改配置后,将修改的配置发送到 rocketmq。

  2. 微服务启动时,监控配置修改的topic,发现自身配置已修改,则发起配置热加载。

  3. 服务需要设置热加载开关,配置误修改可能带来风险。

1.1 RocketMQ消息设计

topic tag body
confCenter confModify 服务名称和配置内容

1.2 采用spring的event机制处理消息消息消费

RocketmqEvent 代码如下:

  public class RocketmqEvent extends ApplicationEvent {   
    private static final long serialVersionUID = -4468405250074063206L;
    private DefaultMQPushConsumer consumer;
    private List<MessageExt> msgs;

    public RocketmqEvent(List<MessageExt> msgs, DefaultMQPushConsumer consumer) throws Exception {
        super(msgs);
        this.consumer = consumer;
        this.setMsgs(msgs);
    }

    public String getMsg(int idx) {
        try {
            return new String(getMsgs().get(idx).getBody(), "utf-8");
        } catch (UnsupportedEncodingException e) {
            return null;
        }
    }

    public String getMsg(int idx,String code) {
        try {
            return new String(getMsgs().get(idx).getBody(), code);
        } catch (UnsupportedEncodingException e) {
            return null;
        }
    }

    public DefaultMQPushConsumer getConsumer() {
        return consumer;
    }

    public void setConsumer(DefaultMQPushConsumer consumer) {
        this.consumer = consumer;
    }

    public MessageExt getMessageExt(int idx) {
        return getMsgs().get(idx);
    }


    public String getTopic(int idx) {
        return getMsgs().get(idx).getTopic();
    }


    public String getTag(int idx) {
        return getMsgs().get(idx).getTags();
    }


    public byte[] getBody(int idx) {
        return getMsgs().get(idx).getBody();
    }


    public String getKeys(int idx) {
        return getMsgs().get(idx).getKeys();
    }

    public List<MessageExt> getMsgs() {
        return msgs;
    }

    public void setMsgs(List<MessageExt> msgs) {
        this.msgs = msgs;
    }
}

1.3 RocketMQ的message的Event处理

  @Autowired   
ApplicationEventPublisher publisher 
publisher.publishEvent(new RocketmqEvent(msgs, consumer));

2、spring配置热加载


微服务端收到消息后,采用spring的RefreshScope机制进行配置热加载
1、需要注入 RefreshScope 和 ConfigurableWebApplicationContext 两个对象。

  ConfigurableWebApplicationContext applicationContext;\\采用ApplicationContextAware机制进行注入   
@Autowired
private RefreshScope refreshScope = null;

2、解析 RocketMQ 的 message,然后进行配置刷新。

  @EventListener   
    public void envListener(RocketmqEvent event) {
        event.getMsgs().forEach(msg -> {
            String body = new String(msg.getBody());
            JSONObject json = JSON.parseObject(body);
            MutablePropertySources target = applicationContext.getEnvironment().getPropertySources();//获取Sources
            for (PropertySource<?> source : target) {
                if ("defaultProperties".equals(source.getName())) {//对默认配置进行刷新
                    @SuppressWarnings("unchecked")
                    Map<String, String> map = (Map<String, String>) source.getSource();
                    Map<String, String> after = JSONObject.parseObject(json.toJSONString(),
                            new TypeReference<Map<String, String>>() {
                            });
                    map.putAll(after);
                    PropertiesContent.refleshAll(after);
                    this.refreshScope.refreshAll();
                    return;
                }
            }
        });
    }

最后,这里只是简单介绍了配置热加载的思路,当然还有很多细节需要思考,如果有兴趣的可以一起交流交流。

作者简介:邓启翔,国信证券 zebra 微服务技术负责人,具有多年网络框架和分布式系统架构设计及研发经验,对消息中间件具有深刻理解。曾就职于平安、汇添富基金,资深架构师。

相关 [国信证券 rocketmq spring] 推荐:

国信证券 RocketMQ + Spring Boot 在配置热加载的实践

- - IT瘾-dev
在进行微服务架构研发的过程中,不少研发人员都提出配置热加载的需求,一方面是为了提升研发效率,开发人员修改配置不需要重启服务;另一方面是为了线上紧急事件,可以快速恢复,例如线上数据库 down 了,可以紧急启动降级开关,通过配置热加载动态生效,降低处理事件的时间. 于是我采用 rocketmq 消息队列,实现一套解耦的配置热加载机制.

项目中单元测试容易出现的普遍问题归纳(Junit/Spring/Spring-test/Dubbo/RocketMQ/JAVA)

- - 编程语言 - ITeye博客
   最近公司要求项目在使用maven构建的时候不能跳过test的生命周期,也就是通过mvn test命令需要将整个项目运行起来. 因为之前项目组的成员都是在eclipse中去执行的unit test,在maven对所有模块构建的都是直接-Dmaven.test.skip=true的方式直接跳过UT的.

RocketMQ与Kafka对比(18项差异)

- - 开源软件 - ITeye博客
为了方便大家选型,整理一份RocketMQ与Kafka的对比文档,文中如有错误之处,欢迎来函指正. RocketMQ支持异步实时刷盘,同步刷盘,同步Replication,异步Replication. Kafka使用异步刷盘方式,异步Replication. 总结:RocketMQ的同步刷盘在单机可靠性上比Kafka更高,不会因为操作系统Crash,导致数据丢失.

Springcloud + RocketMQ 解决分布式事务

- - 掘金架构
分布式事务有哪些实现方式. 随着互联网时代的高速发展,分布式成了大型系统的标配,这是时代发展的选择. 大型分布式系统不是每个公司和开发人员都能够涉及的领域,因为大型系统后面都 隐藏着众多代名词:复杂,昂贵,高科技,人才云集,大战略. 大部分领头互联网公司甚至依托自己的分布式经验逐步建立自己的体系,并使用这套体系搭建自己的平台对内,甚至对外提供服务, 就像现在众多的云平台提供的服务,甚至有些大战略提出促进发展:大中台小前台、大炮台支援单兵作战等等.

分布式开放消息系统(RocketMQ)的原理与实践

- - 编程语言 - ITeye博客
备注:1.如果您此前未接触过RocketMQ,请先阅读附录部分,以便了解RocketMQ的整体架构和相关术语2.文中的MQServer与Broker表示同一概念.   分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点. 而谈到消息系统的设计,就回避不了两个问题:  .

RocketMQ 在使用上的一些排坑和优化

- - 掘金架构
RocketMQ 在我们的项目中使用非常广泛,在使用的过程中,也遇到了很多的问题. 比如没有多环境的隔离,在多个版本同时开发送测的情况下,互相干扰严重. RocketMQ 的投递可能会失败,导致丢失消息. 另外开源版本的 RocketMQ 不支持任意时间精度的延时消息,仅支持特定的 level. 在使用的过程中,我们做了一些针对性的优化,整理出了这篇文章.

消息幂等(去重)通用解决方案,RocketMQ

- - 薛定谔的风口猪
消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值. 我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”.

netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能

- - SegmentFault 最新的文章
netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否有效,以及处理登录认证的UserAuthHandler和消息处理MessageHandler. //将多个消息转换成单一的消息对象. //支持异步发送大的码流,一般用于发送文件流. //检测链路是否读空闲,配合心跳handler检测channel是否正常.

Spring详解

- - CSDN博客架构设计推荐文章
Spring是一个开源的控制反转(Inversion of Control ,IoC)和面向切面(AOP)的容器框架.它的主要目的是简化企业开发.. PersonDaoBean 是在应用内部创建及维护的. 所谓控制反转就是应用本身不负责依赖对象的创建及维护,依赖对象的创建及维护是由外部容器负责的.

Spring定时

- - 行业应用 - ITeye博客
spring的定时任务配置分为三个步骤:. . . . . .