Apache Kudu入门

Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。
Kudu支持水平扩展,使用Raft协议进行一致性保证,并且与Cloudera Impala和Apache Spark等当前流
行的大数据查询和分析工具结合紧密。本文将为您介绍Kudu的一些基本概念和架构以及在企业中的应用,使您
对Kudu有一个较为全面的了解。

  1. 为什么需要Kudu ?

提起大数据存储,我们能想到的技术有很多,比如HDFS,以及在HDFS上的列式存储技术Parquet,ORC,还有以KV形式存储半结构化数据的HBase和Cassandra等。既然有了如此多的存储技术,Cloudera公司为什么要开发出一款全新的存储引擎Kudu呢?事实上,当前的这些存储技术都存在着一定的局限性。对于会被用来进行分析的静态数据集来说,使用Parquet或者ORC存储是一种明智的选择。但是目前的列式存储技术都不能更新数据,而且随机读写性能较差。我们可以使用高效随机读写的HBase、Cassandra等数据库来代替。所以现在的企业中,既要实现实时读写数据,又要对数据进行分析分析,就需要使用两套系统,并且让他们同步,维护成本太高,我们可以用一套系统来搞定这两种需求,这种系统就是Kudu。Kudu是介于HBase和HDFS之间的存储系统,,在读写性能上不如HDFS,在实时读取写入性能上不如HBase。

  1. Kudu的架构

与HDFS和HBase相似,Kudu使用单个的Master节点,用来管理集群的元数据,并且使用任意数量的
Tablet Server 节点用来存储数据。为了保证主节点高何用,可以部署多个Master节点来提高容错性。
如下图:
架构节点总结:

  1. Matser:负责管理表元数据
  2. Tablet Server:负责存储表数据。

数据是如何存储的:

  1. Table:是数据库中用来存储数据的对象,是有结构的数据集合。kudu中的表具有schema(比如

分区的规则和副本机制)和全局有序的primary key 。

  1. Tablet

Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。
Kudu支持水平扩展,使用Raft协议进行一致性保证,并且与Cloudera Impala和Apache Spark等当前流
行的大数据查询和分析工具结合紧密。本文将为您介绍Kudu的一些基本概念和架构以及在企业中的应用,使您
对Kudu有一个较为全面的了解。
北京市昌平区建材城西路金燕龙办公楼一层 电话:400-618-9090

  1. kudu中一个table如果数据量很大,会被水平分成多个被称之为 tablet 片段。一个 tablet 是

一张 table连续的片段,比如tablet1存储都是pk001-pk100 的数据,tablet2存储都是pk101-
pk200 的数据。

  1. 为了保证每个 tablet 数据存储的安全性,我们为其设置多个副本,每个副本都存储在 Tablet

Server中。

  1. tablet 的多个副本会设置主从关系,由一个leader tablet 和多个 follower tablet 组成。 在写

数据的时候,客户端只跟leader tablet所在的Tablet Server交互。

  1. Tablet的安装

安装依赖
yum install autoconf automake cyrus-sasl-devel cyrus-sasl-gssapi cyrus-sasl-plain flex gcc gcc-c++ gdb git java-1.8.0-openjdk-devel krb5-server krb5-workstation libtool make openssl-devel patch pkgconfig redhat-lsb-core rsyncunzip vim-common which -y

GitHub下载kudu-1.8.0源码包
cd /data
git clone https://github.com/apache/kudu
cd kudu
build-support/enable_devtoolset.sh

下载程序依赖
mkdir thirdparty/src/
cd thirdparty/src/
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfront ... elease-1.8.0.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
9335b81317a6451d5a37c5dc7ec088eecbf68c82.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
87a592e8aa04497764c533acd6e887618ca7b8a8.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
7a179d1ac2e08a5cc1622bec900d1e0452776713.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
42148a6df6986a257ab21c80f8eca2e54544ac4d.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfront ... iwyu-0.9.src.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
9eac2058b70615519b2c4d8c6bdbfca1bd079e39.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
47a55825ca3b35eab1ca22b7ab82b9544e32a9af.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
824860bb76893d163efbcff330734b9f62eecb17.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
498021fa15186aee8b282d3c032fbd2cede6bec4-stripped.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
2c9a927a9e87cba0e4c0f34fc0b55887c6636927-bin.tar.gz

编译程序依赖并构建kudu安装配置
cd ../../
thirdparty/build-if-necessary.sh # 这个命令会把上面下载的依赖全部安装执行Kudu
的./configura

创建编译后的安装目录
mkdir build/release -p
cd build/release/
../../build-support/enable_devtoolset.sh

编译kudu并安装
../../thirdparty/installed/common/bin/cmake DCMAKE_BUILD_TYPE=release ../..
make -j4
make DESTDIR=/data/kudu/build/release/kudu install

对lib文件做软链接
ln -s /data/kudu/build/release/kudu/usr/local/include/* /usr/local/include/
ln -s /data/kudu/build/release/kudu/usr/local/lib64/* /usr/local/lib64/
ln -s /data/kudu/build/release/kudu/usr/local/share/* /usr/local/share/
======== MASTER ========
mkdir conf
cd conf
cat >>master.gflagfile<<EOF

Comma-separated list of the RPC addresses belonging to all Masters in this

cluster.

NOTE: if not specified, configures a non-replicated Master.

--master_addresses=kudu1:7051,kudu2:7051,kudu3:7051
--rpc_bind_addresses=kudu1:7051
--fs_wal_dir=/data/kudu_data/master/wal
--fs_data_dirs=/data/kudu_data/master/data
--enable_process_lifetime_heap_profiling=true
--heap_profile_path=/data/kudu_data/master/heap
--rpc-encryption=disabled
--rpc_authentication=disabled

--unlock_unsafe_flags=true

--allow_unsafe_replication_factor=true

--max_log_size=1800

--max_log_size=2048

--memory_limit_hard_bytes=0

--memory_limit_hard_bytes=1073741824
--default_num_replicas=3
--max_clock_sync_error_usec=10000000
--consensus_rpc_timeout_ms=30000
--follower_unavailable_considered_failed_sec=300
--leader_failure_max_missed_heartbeat_periods=3

--block_manager_max_open_files=10240

--server_thread_pool_max_thread_count=-1

--tserver_unresponsive_timeout_ms=60000
--rpc_num_service_threads=10
--max_negotiation_threads=50
--min_negotiation_threads=10
--rpc_negotiation_timeout_ms=3000
--rpc_default_keepalive_time_ms=65000

--rpc_num_acceptors_per_address=1

--rpc_num_acceptors_per_address=5

--master_ts_rpc_timeout_ms=30000

--master_ts_rpc_timeout_ms=60000

--remember_clients_ttl_ms=60000

--remember_clients_ttl_ms=3600000

--remember_responses_ttl_ms=60000

--remember_responses_ttl_ms=600000

--rpc_service_queue_length=50

--rpc_service_queue_length=1000

--raft_heartbeat_interval_ms=500

--raft_heartbeat_interval_ms=60000

--heartbeat_interval_ms=1000

--heartbeat_interval_ms=60000
--heartbeat_max_failures_before_backoff=3

You can avoid the dependency on ntpd by running Kudu with --use-hybrid-

clock=false

This is not recommended for production environment.

NOTE: If you run without hybrid time the tablet history

forever. Eventually you may run out of disk space.

--use_hybrid_clock=false
--webserver_enabled=true
--metrics_log_interval_ms=60000
--webserver_port=8051

--webserver_doc_root=/data/kudu/www

EOF
======== TSERVER =========
cat >>tserver.gflagfile<<EOF

Comma-separated list of the RPC addresses belonging to all Masters in this

cluster.

NOTE: if not specified, configures a non-replicated Master.

--tserver_master_addrs=kudu1:7051,kudu2:7051,kudu3:7051
--rpc_bind_addresses=kudu:7050
--log_dir=/data/kudu_data/tserver/logs
--log_filename=kudu1
--fs_wal_dir=/data/kudu_data/tserver/wal
--fs_data_dirs=/data/kudu_data/tserver/data
--enable_process_lifetime_heap_profiling=true
--heap_profile_path=/data/kudu_data/tserver/heap
--rpc-encryption=disabled
--rpc_authentication=disabled

--unlock_unsafe_flags=true

--allow_unsafe_replication_factor=true

--max_log_size=1800

--max_log_size=2048

--memory_limit_hard_bytes=0

--memory_limit_hard_bytes=1073741824
--default_num_replicas=3
--max_clock_sync_error_usec=10000000
--consensus_rpc_timeout_ms=30000
--follower_unavailable_considered_failed_sec=300
--leader_failure_max_missed_heartbeat_periods=3

--block_manager_max_open_files=10240

--server_thread_pool_max_thread_count=-1

--tserver_unresponsive_timeout_ms=60000
--rpc_num_service_threads=10
--max_negotiation_threads=50
--min_negotiation_threads=10
--rpc_negotiation_timeout_ms=3000
--rpc_default_keepalive_time_ms=65000

--rpc_num_acceptors_per_address=1

--rpc_num_acceptors_per_address=5

--master_ts_rpc_timeout_ms=30000

--master_ts_rpc_timeout_ms=60000

--remember_clients_ttl_ms=60000

--remember_responses_ttl_ms=60000

--remember_responses_ttl_ms=600000

--rpc_service_queue_length=50

--rpc_service_queue_length=1000

--raft_heartbeat_interval_ms=500

--raft_heartbeat_interval_ms=60000

--heartbeat_interval_ms=1000

--heartbeat_interval_ms=60000
--heartbeat_max_failures_before_backoff=3

You can avoid the dependency on ntpd by running Kudu with --use-hybrid-

clock=false

This is not recommended for production environment.

NOTE: If you run without hybrid time the tablet history GC will not work.

Therefore when you delete or update a row the history of that data will be

kept

forever. Eventually you may run out of disk space.

--use_hybrid_clock=false
--webserver_enabled=true
--metrics_log_interval_ms=60000
--webserver_port=8050

--webserver_doc_root=/data/kudu/www

EOF

配置系统systemd启动
========= MASTER =========
cat >>/usr/lib/systemd/system/kudu-master.service<<EOF
[Unit]
Description=Apache Kudu Master Server
Documentation=http://kudu.apache.org
[Service]
Environment=KUDU_HOME=/data/kudu
ExecStart=/data/kudu/build/release/bin/kudu-master --
flagfile=/data/kudu/build/release/conf/master.gflagfile
TimeoutStopSec=5
Restart=on-failure
User=kudu

LimitNOFILE=65535

LimitNPROC=10240

[Install]
WantedBy=multi-user.target
EOF
========= TSERVER =========
cat >>/usr/lib/systemd/system/kudu-tserver.service<<EOF
[Unit]
Description=Apache Kudu Master Server
Documentation=http://kudu.apache.orgEnviron...
ExecStart=/data/kudu/build/release/bin/kudu-tserver --
flagfile=/data/kudu/build/release/conf/tserver.gflagfile
TimeoutStopSec=5
Restart=on-failure
User=kudu

LimitNOFILE=65535

LimitNPROC=10240

[Install]
WantedBy=multi-user.target
EOF

创建进程用户
useradd kudu

创建数据目录(根据配置文件创建)
mkdir /data/kudu_data/{master,tserver}/{data,wal,logs,heap} -p
chown -R kudu.kudu /data/kudu_data/
chown -R kudu.kudu /data/kudu/
cd /data

配置环境
[/table]
[/table]
cat >>/etc/profile<<EOF
export PATH=${PATH}:/data/kudu/build/release/bin
EOF
source /etc/profile
启动程序
systemctl start kudu-master.service
systemctl start kudu-tserver.service

  1. 测试Kudu的存储

构建maven工程、导入依赖

[table=535]
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>

初始化方法
[table=535]
public class TestKudu {
//声明全局变量 KuduClient后期通过它来操作kudu表
private KuduClient kuduClient;
//指定kuduMaster地址
private String kuduMaster;
//指定表名
private String tableName;
@Before
public void init(){

//初始化操作
kuduMaster="node1:7051,node2:7051,node3:7051";
//指定表名
tableName="student";
KuduClient.KuduClientBuilder kuduClientBuilder = new

KuduClient.KuduClientBuilder(kuduMaster);

kuduClientBuilder.defaultSocketReadTimeoutMs(10000);
kuduClient=kuduClientBuilder.build();

}
}

创建表
/**

  • 创建表

*/
@Test
public void createTable() throws KuduException {
//判断表是否存在,不存在就构建
if(!kuduClient.tableExists(tableName)){

//构建创建表的schema信息-----就是表的字段和类型
ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id",

Type.INT32).key(true).build());

columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name",

Type.STRING).build()); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex",
Type.INT32).build());

Schema schema = new Schema(columnSchemas);
//指定创建表的相关属性
CreateTableOptions options = new CreateTableOptions();
ArrayList<String> partitionList = new ArrayList<String>();
//指定kudu表的分区字段是什么
partitionList.add("id");   // 按照 id.hashcode % 分区数 = 分区号
options.addHashPartitions(partitionList,6);
kuduClient.createTable(tableName,schema,options);

}
}

插入数据
/**

  • 向表加载数据

*/
@Test
public void insertTable() throws KuduException {
//向表加载数据需要一个kuduSession对象
KuduSession kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
//需要使用kuduTable来构建Operation的子类实例对象
KuduTable kuduTable = kuduClient.openTable(tableName);
for(int i=1;i<=10;i++){

Insert insert = kuduTable.newInsert();
PartialRow row = insert.getRow();
row.addInt("id",i);
row.addString("name","zhangsan-"+i);
row.addInt("age",20+i);
row.addInt("sex",i%2);
kuduSession.apply(insert);//最后实现执行数据的加载操作

}
}

查询数据
/**

  • 查询表的数据结果

*/
@Test
public void queryData() throws KuduException {
//构建一个查询的扫描器
KuduScanner.KuduScannerBuilder kuduScannerBuilder =
kuduClient.newScannerBuilder(kuduClient.openTable(tableName)); columnsList.add("name");
columnsList.add("age");
columnsList.add("sex");
kuduScannerBuilder.setProjectedColumnNames(columnsList);
//返回结果集
KuduScanner kuduScanner = kuduScannerBuilder.build();
//遍历
while (kuduScanner.hasMoreRows()){

RowResultIterator rowResults = kuduScanner.nextRows();
while (rowResults.hasNext()){
  RowResult row = rowResults.next();
  int id = row.getInt("id");
  String name = row.getString("name");
  int age = row.getInt("age");
  int sex = row.getInt("sex");
  System.out.println("id="+id+" name="+name+" age="+age+"

sex="+sex);
}
}
}