Apache Kudu入门
Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。
Kudu支持水平扩展,使用Raft协议进行一致性保证,并且与Cloudera Impala和Apache Spark等当前流
行的大数据查询和分析工具结合紧密。本文将为您介绍Kudu的一些基本概念和架构以及在企业中的应用,使您
对Kudu有一个较为全面的了解。
- 为什么需要Kudu ?
提起大数据存储,我们能想到的技术有很多,比如HDFS,以及在HDFS上的列式存储技术Parquet,ORC,还有以KV形式存储半结构化数据的HBase和Cassandra等。既然有了如此多的存储技术,Cloudera公司为什么要开发出一款全新的存储引擎Kudu呢?事实上,当前的这些存储技术都存在着一定的局限性。对于会被用来进行分析的静态数据集来说,使用Parquet或者ORC存储是一种明智的选择。但是目前的列式存储技术都不能更新数据,而且随机读写性能较差。我们可以使用高效随机读写的HBase、Cassandra等数据库来代替。所以现在的企业中,既要实现实时读写数据,又要对数据进行分析分析,就需要使用两套系统,并且让他们同步,维护成本太高,我们可以用一套系统来搞定这两种需求,这种系统就是Kudu。Kudu是介于HBase和HDFS之间的存储系统,,在读写性能上不如HDFS,在实时读取写入性能上不如HBase。
- Kudu的架构
与HDFS和HBase相似,Kudu使用单个的Master节点,用来管理集群的元数据,并且使用任意数量的
Tablet Server 节点用来存储数据。为了保证主节点高何用,可以部署多个Master节点来提高容错性。
如下图:
架构节点总结:
- Matser:负责管理表元数据
- Tablet Server:负责存储表数据。
数据是如何存储的:
- Table:是数据库中用来存储数据的对象,是有结构的数据集合。kudu中的表具有schema(比如
分区的规则和副本机制)和全局有序的primary key 。
- Tablet
Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。
Kudu支持水平扩展,使用Raft协议进行一致性保证,并且与Cloudera Impala和Apache Spark等当前流
行的大数据查询和分析工具结合紧密。本文将为您介绍Kudu的一些基本概念和架构以及在企业中的应用,使您
对Kudu有一个较为全面的了解。
北京市昌平区建材城西路金燕龙办公楼一层 电话:400-618-9090
- kudu中一个table如果数据量很大,会被水平分成多个被称之为 tablet 片段。一个 tablet 是
一张 table连续的片段,比如tablet1存储都是pk001-pk100 的数据,tablet2存储都是pk101-
pk200 的数据。
- 为了保证每个 tablet 数据存储的安全性,我们为其设置多个副本,每个副本都存储在 Tablet
Server中。
- tablet 的多个副本会设置主从关系,由一个leader tablet 和多个 follower tablet 组成。 在写
数据的时候,客户端只跟leader tablet所在的Tablet Server交互。
- Tablet的安装
安装依赖
yum install autoconf automake cyrus-sasl-devel cyrus-sasl-gssapi cyrus-sasl-plain flex gcc gcc-c++ gdb git java-1.8.0-openjdk-devel krb5-server krb5-workstation libtool make openssl-devel patch pkgconfig redhat-lsb-core rsyncunzip vim-common which -y
GitHub下载kudu-1.8.0源码包
cd /data
git clone https://github.com/apache/kudu
cd kudu
build-support/enable_devtoolset.sh
下载程序依赖
mkdir thirdparty/src/
cd thirdparty/src/
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfront ... elease-1.8.0.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
9335b81317a6451d5a37c5dc7ec088eecbf68c82.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
87a592e8aa04497764c533acd6e887618ca7b8a8.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
7a179d1ac2e08a5cc1622bec900d1e0452776713.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
42148a6df6986a257ab21c80f8eca2e54544ac4d.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfront ... iwyu-0.9.src.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
9eac2058b70615519b2c4d8c6bdbfca1bd079e39.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
47a55825ca3b35eab1ca22b7ab82b9544e32a9af.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
824860bb76893d163efbcff330734b9f62eecb17.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
498021fa15186aee8b282d3c032fbd2cede6bec4-stripped.tar.gz
wget http://d3dr9sfxru4sde.cloudfr...
wget http://d3dr9sfxru4sde.cloudfr...
2c9a927a9e87cba0e4c0f34fc0b55887c6636927-bin.tar.gz
编译程序依赖并构建kudu安装配置
cd ../../
thirdparty/build-if-necessary.sh # 这个命令会把上面下载的依赖全部安装执行Kudu
的./configura
创建编译后的安装目录
mkdir build/release -p
cd build/release/
../../build-support/enable_devtoolset.sh
编译kudu并安装
../../thirdparty/installed/common/bin/cmake DCMAKE_BUILD_TYPE=release ../..
make -j4
make DESTDIR=/data/kudu/build/release/kudu install
对lib文件做软链接
ln -s /data/kudu/build/release/kudu/usr/local/include/* /usr/local/include/
ln -s /data/kudu/build/release/kudu/usr/local/lib64/* /usr/local/lib64/
ln -s /data/kudu/build/release/kudu/usr/local/share/* /usr/local/share/
======== MASTER ========
mkdir conf
cd conf
cat >>master.gflagfile<<EOF
Comma-separated list of the RPC addresses belonging to all Masters in this
cluster.
NOTE: if not specified, configures a non-replicated Master.
--master_addresses=kudu1:7051,kudu2:7051,kudu3:7051
--rpc_bind_addresses=kudu1:7051
--fs_wal_dir=/data/kudu_data/master/wal
--fs_data_dirs=/data/kudu_data/master/data
--enable_process_lifetime_heap_profiling=true
--heap_profile_path=/data/kudu_data/master/heap
--rpc-encryption=disabled
--rpc_authentication=disabled
--unlock_unsafe_flags=true
--allow_unsafe_replication_factor=true
--max_log_size=1800
--max_log_size=2048
--memory_limit_hard_bytes=0
--memory_limit_hard_bytes=1073741824
--default_num_replicas=3
--max_clock_sync_error_usec=10000000
--consensus_rpc_timeout_ms=30000
--follower_unavailable_considered_failed_sec=300
--leader_failure_max_missed_heartbeat_periods=3
--block_manager_max_open_files=10240
--server_thread_pool_max_thread_count=-1
--tserver_unresponsive_timeout_ms=60000
--rpc_num_service_threads=10
--max_negotiation_threads=50
--min_negotiation_threads=10
--rpc_negotiation_timeout_ms=3000
--rpc_default_keepalive_time_ms=65000
--rpc_num_acceptors_per_address=1
--rpc_num_acceptors_per_address=5
--master_ts_rpc_timeout_ms=30000
--master_ts_rpc_timeout_ms=60000
--remember_clients_ttl_ms=60000
--remember_clients_ttl_ms=3600000
--remember_responses_ttl_ms=60000
--remember_responses_ttl_ms=600000
--rpc_service_queue_length=50
--rpc_service_queue_length=1000
--raft_heartbeat_interval_ms=500
--raft_heartbeat_interval_ms=60000
--heartbeat_interval_ms=1000
--heartbeat_interval_ms=60000
--heartbeat_max_failures_before_backoff=3
You can avoid the dependency on ntpd by running Kudu with --use-hybrid-
clock=false
This is not recommended for production environment.
NOTE: If you run without hybrid time the tablet history
forever. Eventually you may run out of disk space.
--use_hybrid_clock=false
--webserver_enabled=true
--metrics_log_interval_ms=60000
--webserver_port=8051
--webserver_doc_root=/data/kudu/www
EOF
======== TSERVER =========
cat >>tserver.gflagfile<<EOF
Comma-separated list of the RPC addresses belonging to all Masters in this
cluster.
NOTE: if not specified, configures a non-replicated Master.
--tserver_master_addrs=kudu1:7051,kudu2:7051,kudu3:7051
--rpc_bind_addresses=kudu:7050
--log_dir=/data/kudu_data/tserver/logs
--log_filename=kudu1
--fs_wal_dir=/data/kudu_data/tserver/wal
--fs_data_dirs=/data/kudu_data/tserver/data
--enable_process_lifetime_heap_profiling=true
--heap_profile_path=/data/kudu_data/tserver/heap
--rpc-encryption=disabled
--rpc_authentication=disabled
--unlock_unsafe_flags=true
--allow_unsafe_replication_factor=true
--max_log_size=1800
--max_log_size=2048
--memory_limit_hard_bytes=0
--memory_limit_hard_bytes=1073741824
--default_num_replicas=3
--max_clock_sync_error_usec=10000000
--consensus_rpc_timeout_ms=30000
--follower_unavailable_considered_failed_sec=300
--leader_failure_max_missed_heartbeat_periods=3
--block_manager_max_open_files=10240
--server_thread_pool_max_thread_count=-1
--tserver_unresponsive_timeout_ms=60000
--rpc_num_service_threads=10
--max_negotiation_threads=50
--min_negotiation_threads=10
--rpc_negotiation_timeout_ms=3000
--rpc_default_keepalive_time_ms=65000
--rpc_num_acceptors_per_address=1
--rpc_num_acceptors_per_address=5
--master_ts_rpc_timeout_ms=30000
--master_ts_rpc_timeout_ms=60000
--remember_clients_ttl_ms=60000
--remember_responses_ttl_ms=60000
--remember_responses_ttl_ms=600000
--rpc_service_queue_length=50
--rpc_service_queue_length=1000
--raft_heartbeat_interval_ms=500
--raft_heartbeat_interval_ms=60000
--heartbeat_interval_ms=1000
--heartbeat_interval_ms=60000
--heartbeat_max_failures_before_backoff=3
You can avoid the dependency on ntpd by running Kudu with --use-hybrid-
clock=false
This is not recommended for production environment.
NOTE: If you run without hybrid time the tablet history GC will not work.
Therefore when you delete or update a row the history of that data will be
kept
forever. Eventually you may run out of disk space.
--use_hybrid_clock=false
--webserver_enabled=true
--metrics_log_interval_ms=60000
--webserver_port=8050
--webserver_doc_root=/data/kudu/www
EOF
配置系统systemd启动
========= MASTER =========
cat >>/usr/lib/systemd/system/kudu-master.service<<EOF
[Unit]
Description=Apache Kudu Master Server
Documentation=http://kudu.apache.org
[Service]
Environment=KUDU_HOME=/data/kudu
ExecStart=/data/kudu/build/release/bin/kudu-master --
flagfile=/data/kudu/build/release/conf/master.gflagfile
TimeoutStopSec=5
Restart=on-failure
User=kudu
LimitNOFILE=65535
LimitNPROC=10240
[Install]
WantedBy=multi-user.target
EOF
========= TSERVER =========
cat >>/usr/lib/systemd/system/kudu-tserver.service<<EOF
[Unit]
Description=Apache Kudu Master Server
Documentation=http://kudu.apache.orgEnviron...
ExecStart=/data/kudu/build/release/bin/kudu-tserver --
flagfile=/data/kudu/build/release/conf/tserver.gflagfile
TimeoutStopSec=5
Restart=on-failure
User=kudu
LimitNOFILE=65535
LimitNPROC=10240
[Install]
WantedBy=multi-user.target
EOF
创建进程用户
useradd kudu
创建数据目录(根据配置文件创建)
mkdir /data/kudu_data/{master,tserver}/{data,wal,logs,heap} -p
chown -R kudu.kudu /data/kudu_data/
chown -R kudu.kudu /data/kudu/
cd /data
配置环境
[/table]
[/table]
cat >>/etc/profile<<EOF
export PATH=${PATH}:/data/kudu/build/release/bin
EOF
source /etc/profile
启动程序
systemctl start kudu-master.service
systemctl start kudu-tserver.service
- 测试Kudu的存储
构建maven工程、导入依赖
[table=535]
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
初始化方法
[table=535]
public class TestKudu {
//声明全局变量 KuduClient后期通过它来操作kudu表
private KuduClient kuduClient;
//指定kuduMaster地址
private String kuduMaster;
//指定表名
private String tableName;
@Before
public void init(){
//初始化操作 kuduMaster="node1:7051,node2:7051,node3:7051"; //指定表名 tableName="student"; KuduClient.KuduClientBuilder kuduClientBuilder = new
KuduClient.KuduClientBuilder(kuduMaster);
kuduClientBuilder.defaultSocketReadTimeoutMs(10000); kuduClient=kuduClientBuilder.build();
}
}
创建表
/**
- 创建表
*/
@Test
public void createTable() throws KuduException {
//判断表是否存在,不存在就构建
if(!kuduClient.tableExists(tableName)){
//构建创建表的schema信息-----就是表的字段和类型 ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>(); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id",
Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name",
Type.STRING).build()); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex",
Type.INT32).build());
Schema schema = new Schema(columnSchemas); //指定创建表的相关属性 CreateTableOptions options = new CreateTableOptions(); ArrayList<String> partitionList = new ArrayList<String>(); //指定kudu表的分区字段是什么 partitionList.add("id"); // 按照 id.hashcode % 分区数 = 分区号 options.addHashPartitions(partitionList,6); kuduClient.createTable(tableName,schema,options);
}
}
插入数据
/**
- 向表加载数据
*/
@Test
public void insertTable() throws KuduException {
//向表加载数据需要一个kuduSession对象
KuduSession kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
//需要使用kuduTable来构建Operation的子类实例对象
KuduTable kuduTable = kuduClient.openTable(tableName);
for(int i=1;i<=10;i++){
Insert insert = kuduTable.newInsert(); PartialRow row = insert.getRow(); row.addInt("id",i); row.addString("name","zhangsan-"+i); row.addInt("age",20+i); row.addInt("sex",i%2); kuduSession.apply(insert);//最后实现执行数据的加载操作
}
}
查询数据
/**
- 查询表的数据结果
*/
@Test
public void queryData() throws KuduException {
//构建一个查询的扫描器
KuduScanner.KuduScannerBuilder kuduScannerBuilder =
kuduClient.newScannerBuilder(kuduClient.openTable(tableName)); columnsList.add("name");
columnsList.add("age");
columnsList.add("sex");
kuduScannerBuilder.setProjectedColumnNames(columnsList);
//返回结果集
KuduScanner kuduScanner = kuduScannerBuilder.build();
//遍历
while (kuduScanner.hasMoreRows()){
RowResultIterator rowResults = kuduScanner.nextRows(); while (rowResults.hasNext()){ RowResult row = rowResults.next(); int id = row.getInt("id"); String name = row.getString("name"); int age = row.getInt("age"); int sex = row.getInt("sex"); System.out.println("id="+id+" name="+name+" age="+age+"
sex="+sex);
}
}
}