基于Calcite的分布式多数据源查询

标签: dev | 发表时间:2022-01-25 00:00 | 作者:
出处:http://itindex.net/relian

在本文中,我们将实践 GBase8sMySQL的跨数据源联合查询,案例中 MySQL数据源中存放商品信息, GBase8s数据源中存放订单信息。整体架构如下

好了,我们开始吧。

环境准备

GBase8s

安装镜像 docker pull liaosnet/gbase8s启动容器 docker run -itd -p 19088:9088 liaosnet/gbase8s容器基本信息:

  JDBC JAR:/home/gbase/gbasedbtjdbc_3.3.0_2.jar   
类名:com.gbasedbt.jdbc.Driver
URL:jdbc:gbasedbt-sqli://IPADDR:19088/testdb:GBASEDBTSERVER=gbase01;DB_LOCALE=zh_CN.utf8;CLIENT_LOCALE=zh_CN.utf8;IFX_LOCK_MODE_WAIT=30;
用户:gbasedbt
密码:GBase123
其中:IPADDR为docker所在机器的IP地址,同时需要放通19088端口。

MySQL

安装镜像 docker pull liaosnet/gbase8s启动容器 docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=dafei1288 -d mysql

数据准备

GBase8s

  CREATE TABLE order_table (   
 oid INTEGER NOT NULL,
 iid INTEGER,
 icount INTEGER,
 PRIMARY KEY (oid) CONSTRAINT order_table_pk
);

INSERT INTO order_table (oid, iid, icount) VALUES(1, 1, 10);
INSERT INTO order_table (oid, iid, icount) VALUES(2, 3, 30);

MySQL

  create table item   
(
    i_id    int auto_increment
        primary key,
    catalog varchar(20) null,
    pname   varchar(20) null,
    price   float       null,
    constraint item_i_id_uindex
        unique (i_id)
);

INSERT INTO test.item (i_id, catalog, pname, price) VALUES (1, '游戏', '大航海时代IV', 300);
INSERT INTO test.item (i_id, catalog, pname, price) VALUES (2, '游戏', '马车8', 300);
INSERT INTO test.item (i_id, catalog, pname, price) VALUES (3, '食品', '青椒豆腐乳西瓜', 20);

工程准备

创建 maven工程,目录如下图所示:

添加依赖

  <?xml version="1.0" encoding="UTF-8"?>   
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>calcite_multi_database</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.calcite</groupId>
            <artifactId>calcite-core</artifactId>
            <version>1.29.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.9</version>
        </dependency>
        <dependency>
            <groupId>gbase</groupId>
            <artifactId>gbasedbt</artifactId>
            <version>330</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/libs/gbasedbtjdbc_3.3.0_2_36477d.jar</systemPath>
        </dependency>
    </dependencies>
</project>

添加数据源配置文件 multiSource.json

  {   
  "defaultSchema": "gbasedbt",
  "schemas": [
    {
      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
      "name": "mysql",
      "operand": {
        "jdbcDriver": "com.mysql.cj.jdbc.Driver",
        "jdbcUrl": "jdbc:mysql://localhost:3306/test",
        "jdbcUser": "root",
        "jdbcPassword": "dafei1288"
      },
      "type": "custom"
    },
    {
      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
      "name": "gbasedbt",
      "operand": {
        "jdbcDriver": "com.gbasedbt.jdbc.Driver",
        "jdbcUrl": "jdbc:gbasedbt-sqli://localhost:19088/testdb:GBASEDBTSERVER=gbase01;DB_LOCALE=zh_CN.utf8;CLIENT_LOCALE=zh_CN.utf8;IFX_LOCK_MODE_WAIT=30;",
        "jdbcUser": "gbasedbt",
        "jdbcPassword": "GBase123"
      },
      "type": "custom"
    }
  ],
  "version": "1.0"
}

创建执行程序 MultiSource

使用上面的配置文件,创建 Calcite Jdbc链接

  String filepath = "E:\\working\\GBase\\writting\\calcite_multi_database_select\\calcite_multi_database\\src\\main\\resources\\multiSource.json";   
Properties config = new Properties();
config.put("model",filepath);
config.put("lex", "MYSQL");

这里 config.put("lex", "MYSQL");用于解析外层 SQL,所以必须保留。使用查询语句 SELECT o.oid,o.iid,o.icount,i.catalog,i.pname,i.price FROM gbasedbt.order_table AS o join mysql.item AS i on o.iid = i.i_id进行数据查询。

除了执行结果,其实我们也会对执行的逻辑计划感兴趣,那么我们来看看如何将该 SQL的逻辑计划打印出来

  public static void printLogicPlan(String modelPath , String sql) throws Exception{   

        String modelJsonStr = Files.readAllLines(Paths.get(modelPath)).stream().collect(Collectors.joining("\n"));
        HashMap map = new Gson().fromJson(modelJsonStr, HashMap.class);
        List<Map> schemas = (List<Map>) map.get("schemas");

        SchemaPlus rootSchema = Frameworks.createRootSchema(true);
        Schema gbasedbt = JdbcSchema.create(rootSchema, "gbasedbt" , (Map<String,Object>)schemas.get(1).get("operand"));
        Schema mysql = JdbcSchema.create(rootSchema, "mysql" , (Map<String,Object>)schemas.get(0).get("operand"));
        rootSchema.add("gbasedbt",gbasedbt);
        rootSchema.add("mysql",mysql);

        SqlParser.Config insensitiveParser = SqlParser.configBuilder()
                .setCaseSensitive(false)
                .build();

        FrameworkConfig config = Frameworks.newConfigBuilder()
                .parserConfig(insensitiveParser)
                .defaultSchema(rootSchema)
                .build();

        Planner planner = Frameworks.getPlanner(config);
        SqlNode sqlNode = planner.parse(sql);
        SqlNode sqlNodeValidated = planner.validate(sqlNode);
        RelRoot relRoot = planner.rel(sqlNodeValidated);
        RelNode relNode = relRoot.project();

        System.out.println(sqlNode.toSqlString(MysqlSqlDialect.DEFAULT));
        System.out.println();
        System.out.println(relNode.explain());
}

下面是逻辑计划打印的结果,我们不难看出,这里是使用了2个全表扫描,然后再通过 Join算子,然后进行 project算子的。其实这个执行不能说效率很高吧,只能说非常慢,如果想做优化,我们以后再开一篇文章。

  LogicalProject(OID=[$0], IID=[$1], ICOUNT=[$2], CATALOG=[$4], PNAME=[$5], PRICE=[$6])   
  LogicalJoin(condition=[=($1, $3)], joinType=[inner])
    JdbcTableScan(table=[[gbasedbt, order_table]])
    JdbcTableScan(table=[[mysql, item]])

SQL查询结果如下

  oid iid icount catalog pname price    
1 1 10 游戏 大航海时代IV 300.0 
2 3 30 食品 青椒豆腐乳西瓜 20.0 

执行截图

完整代码清单:

  package wang.datahub;   

import com.google.gson.Gson;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.externalize.RelWriterImpl;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.dialect.MysqlSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;


import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

public class MultiSource {
    public static void main(String[] args)  throws Exception{
        String filepath = "E:\\working\\GBase\\writting\\calcite_multi_database_select\\calcite_multi_database\\src\\main\\resources\\multiSource.json";
        Properties config = new Properties();
        config.put("model",filepath);
        config.put("lex", "MYSQL");
        String sql =
                "SELECT o.oid,o.iid,o.icount,i.catalog,i.pname,i.price FROM gbasedbt.order_table AS o join mysql.item AS i on o.iid = i.i_id";

        try (Connection con = DriverManager.getConnection("jdbc:calcite:", config)) {
            try (Statement stmt = con.createStatement()) {
                try (ResultSet rs = stmt.executeQuery(sql)) {
                    //打印逻辑计划
                    printLogicPlan(filepath,sql);
                    //打印查询结果
                    printRs(rs);
                }
            }
        }

    }

    public static void printRs(ResultSet rs) throws Exception {
        ResultSetMetaData rsmd = rs.getMetaData();
        int count = rsmd.getColumnCount();

        for(int i = 1; i <= count; i++){
            System.out.print(rsmd.getColumnName(i)+"\t");
        }
        System.out.println();

        while(rs.next()){
            for(int i = 1; i <= count; i++){
                System.out.print(rs.getString(i)+"\t");
            }
            System.out.println();
        }
    }

    public static void printLogicPlan(String modelPath , String sql) throws Exception{

        String modelJsonStr = Files.readAllLines(Paths.get(modelPath)).stream().collect(Collectors.joining("\n"));
        HashMap map = new Gson().fromJson(modelJsonStr, HashMap.class);
        List<Map> schemas = (List<Map>) map.get("schemas");

        SchemaPlus rootSchema = Frameworks.createRootSchema(true);
        Schema gbasedbt = JdbcSchema.create(rootSchema, "gbasedbt" , (Map<String,Object>)schemas.get(1).get("operand"));
        Schema mysql = JdbcSchema.create(rootSchema, "mysql" , (Map<String,Object>)schemas.get(0).get("operand"));
        rootSchema.add("gbasedbt",gbasedbt);
        rootSchema.add("mysql",mysql);

        SqlParser.Config insensitiveParser = SqlParser.configBuilder()
                .setCaseSensitive(false)
                .build();

        FrameworkConfig config = Frameworks.newConfigBuilder()
                .parserConfig(insensitiveParser)
                .defaultSchema(rootSchema)
                .build();

        Planner planner = Frameworks.getPlanner(config);
        SqlNode sqlNode = planner.parse(sql);
        SqlNode sqlNodeValidated = planner.validate(sqlNode);
        RelRoot relRoot = planner.rel(sqlNodeValidated);
        RelNode relNode = relRoot.project();

        System.out.println(sqlNode.toSqlString(MysqlSqlDialect.DEFAULT));
        System.out.println();
        System.out.println(relNode.explain());
    }
}

好了,现在我们初步完成了基于 GBase8s的跨数据源查询工作,下一篇文章我们来说说查询优化。提前祝广大读者朋友,新春快乐,虎虎生威 ...

相关 [calcite 分布 数据] 推荐:

基于Calcite的分布式多数据源查询

- - IT瘾-dev
在本文中,我们将实践 GBase8s和 MySQL的跨数据源联合查询,案例中 MySQL数据源中存放商品信息, GBase8s数据源中存放订单信息. 安装镜像 docker pull liaosnet/gbase8s启动容器 docker run -itd -p 19088:9088 liaosnet/gbase8s容器基本信息:.

多IDC的数据分布设计(二)

- crystal - Tim[后端技术]
在前文《多IDC的数据分布设计(一)》中介绍了多IDC数据一致性的几种实现原理,遗憾的是,目前虽然有不少分布式产品,但几乎都没有开源的产品专门针对IDC来优化. 本文从实践的角度分析各种方法优缺点. 背景资料 Latency差异. Jeff Dean提到不同数据访问方式latency差异. 这个数据对于我们设计多IDC数据访问策略具有关键的指导作用,我们可以用这个数据来衡量数据架构来如何设计才能满足高并发低延迟的目标.

NoSQL数据库的分布式算法

- - NoSQLFan
本文英文原文发表于知名技术博客《 Highly Scalable Blog》,对NoSQL数据库中的 分布式算法和思想进行了详细的讲解. 文章很长,由@ 可观 进行翻译投稿. 英文原文:《 Distributed Algorithms in NoSQL Databases》. 译文地址:《 NoSQL数据库的分布式算法》.

当数据库遇到分布式

- - DockOne.io
数据库通常有着完善的事务支持,但是局限于单机的存储和性能,于是就出现了各种分布式解决方案. 最近读了《Designing Data-Intensive Applications》这本书,所以做一个总结,供大家做个参考,有什么不对的请大家指正,一起讨论. 数据模型可以说软件开发中最重要的部分,因为影响着我们的思考方式、解题思路以及代码的编写方式.

谈Elasticsearch下分布式存储的数据分布

- - IT瘾-geek
  对于一个分布式存储系统来说,数据是分散存储在多个节点上的. 如何让数据均衡的分布在不同节点上,来保证其高可用性. 所谓均衡,是指系统中每个节点的负载是均匀的,并且在发现有不均匀的情况或者有节点增加/删除时,能及时进行调整,保持均匀状态. 本文将探讨Elasticsearch的数据分布方法,文中所述的背景是Elasticsearch 5.5.

颠覆大数据分析之Spark弹性分布式数据集

- - 并发编程网 - ifeve.com
颠覆大数据分析之Spark弹性数据集. 译者:黄经业     购书. Spark中迭代式机器学习算法的数据流可以通过图2.3来进行理解. 将它和图2.1中Hadoop MR的迭代式机器学习的数据流比较一下. MR中每次迭代都会涉及HDFS的读写,而在Spark中则要简单得多. 它仅需从HDFS到Spark中的分布式共享对象空间的一次读入——从HDFS文件中创建RDD.

Google Spanner原理- 全球级的分布式数据库

- - 我自然
Google Spanner简介. Spanner 是Google的全球级的分布式数据库 (Globally-Distributed Database). Spanner的扩展性达到了令人咋舌的全球级,可以扩展到数百万的机器,数已百计的数据中心,上万亿的行. 更给力的是,除了夸张的扩展性之外,他还能同时通过同步复制和多版本来满足外部一致性,可用性也是很好的.

Sensei:分布式, 实时, 半结构化数据库

- - ITeye博客
在未出现开源搜索引擎以前, Doug Cutting整了个Lucene, 随后Yonik Seeley写了一个Solr, 在2010年 Shay Banon发布了ElasticSearch, 大概在两年前, 我们迎来了Sensei, 最近他们发布了1.0版本, 下面通过 @sematext对LinkedIn的搜索架构师John Wang的一个采访.

关于分布式系统的数据一致性问题

- - 互联网 - ITeye博客
现在先抛出问题,假设有一个主数据中心在北京M,然后有成都A,上海B两个地方数据中心,现在的问题是,假设成都上海各自的数据中心有记录变更,需要先同步到主数据中心,主数据中心更新完成之后,在把最新的数据分发到上海,成都的地方数据中心A,地方数据中心更新数据,保持和主数据中心一致性(数据库结构完全一致).