Cannal实现数据异构
问题:
在大型网站架构中,DB会采用分库分表来解决容量和性能的问题。但这带来个新的问题:比如不同维度的查询或者聚合查询
方案:
一般会通过数据异构机制来解决问题。
具体示例:
为提升系统的接单能力,需要对订单表进行分库分表,随之而来的问题:用户如何查询自己的订单列表?
方法1:扫描所有订单表,然后内存聚合,在大流量的架构中肯定是不行的;
方法2:双写,但是双写无法保证一致性;
方法3:订阅数据库变更日志,比如订阅mysql的binlog日志模拟数据库的主从同步机制,然后解析变更日志写到订单列表,从而实现数据异构,
这种机制也能保证数据的一致性。
比如,订单中心按照订单号分库分表,然后异构出:订单列表按照用户分库分表,商家订单,订单缓存,ES搜索
MYSQL主从复制
原理
--canal类似该原理
1.准备:
github:https://github.com/alibaba/canal
包括
1-canal的文档
2-server端
3-client端的
4-例子
5-源码包等等
2.canal概述
canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
说明:目前内部使用的同步,已经支持mysql5.x和Oracle部分版本的日志解析
canal server通过slave机制订阅数据库的binlong日志
基于日志增量订阅&消费支持的业务:
(1)数据库镜像
(2)数据库实时备份
(3)多级索引 (卖家和买家各自分库索引)
(4)search build
(5)业务cache刷新
(6)价格变化等重要业务消息
keyword:数据库同步,增量订阅&消费。
3.canal工作原理:
从上层来看,复制分成三步:
(1) master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
(2) slave将master的binary log events拷贝到它的中继日志(relay log);
(3) slave重做中继日志中的事件,将改变反映它自己的数据。
4.部署canal:
4.0 前提:
Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH.
需要安装JDK
4.1 部署canal-server:
4.1.1数据库配置:
开启MySQL的binlog功能,并配置binlog模式为row。
在my.cnf 加入如下:
vi /etc/my.cnf
[mysqld]
log-bin=mysql-bin #开启二进制日志
binlog-format=ROW #选择row模式 ,不要使用statement或者mix模式
server_id=1 #配置主数据库ID,不能和从数据库重复,即不能和canal的slaveId重复,配置mysql replaction需要定义
binlog提供的三种记录模式:
见书,在使用Canal时建议使用row模式
另外在MYSQL中执行"show binary logs",将看到当前有哪些二进制文件及其大小
4.1.2在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
说明:一定要重启,否则不生效,避免类似这样的错误
4.1.3下载canal https://github.com/alibaba/canal/releases
并解压到相应文件夹,比如我下载的是canal.deployer-1.0.24.tar.gz
mkdir /usr/server/canal
cd /usr/server
tar -zxvf canal.tar.gz -C canal
canal 文件目录结构
drwxr-xr-x 2 jianghang jianghang 136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs
4.1.4修改配置-canal的数据库实例 instance.properties
说明:这里可以使用已经存在的 example,也可以新起实例,这个名字和客户端java类中写的名字需要一致
这里我们新配置一个canal Server实例
// vi /usr/server/canal/conf/example/instance.properties
mkdir -p /usr/server/canal/conf/product
cp /usr/server/canal/conf/example/instance.properties /usr/server/canal/conf/product/
vi /usr/server/canal/conf/product/instance.properties
#################################################
## mysql serverId 必须和master的SQL的ID不一致
canal.instance.mysql.slaveId = 101
# position info:连接的数据库地址,从哪个二进制文件,哪个位置开始
canal.instance.master.address = 127.0.0.1:3306
# MYSQL主库连接时,起始的binlog文件
canal.instance.master.journal.name =
# MYSQL主库连接时,起始的binlog文件 偏移量
canal.instance.master.position =
# MYSQL主库连接时,起始的binlog文件 时间戳
canal.instance.master.timestamp =
#从库
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = canal_test
canal.instance.connectionCharset = UTF-8
# 通过如下配置过滤订阅的是 哪个库中的哪些表,减少不必要的订阅;
# 比如只关注产品数据库,通过如下模式可只订阅产品数据库
# table regex
# canal.instance.filter.regex = .*\\..*
canal.instance.filter.regex = product_\d+\\.*
#################################################
说明:若多个库订阅,则需要配置多个实例,为每个数据库配置一个配置文件
4.1.5 进行canal Server 的配置,修改conf/canal.properties
vi /usr/server/canal/conf/canal.properties
canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=
# 当前canalserver上部署的实例,配置多个时用逗号分隔,此处配置product
canal.destinations= product
# 使用zk持久化模式,这样可以保证集群数据共享,共享HA
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
4.1.5然后cd到bin目录 启动和停止canal-server
启动
/usr/server/canal/bin/startup.sh & tail -f /usr/server/canal/logs/canal/canal.log
停止
/usr/server/canal/bin/stop.sh
验证启动状态,查看log文件
tail -f /usr/server/canal/logs/canal/canal.log
2014-07-18 10:21:08.525 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2014-07-18 10:21:08.609 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.12.109.201:11111]
2014-07-18 10:21:09.037 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
---> 上述日志信息显示启动canal成功
*********canal server 集群**************
canal server可以部署一台,也可以部署多台,但是只有一台是活跃的,其它的作为备机;canalserver的高可用是通过zk维护的。
需要安装:zookeeper
配置文件如下修改:
vi /usr/server/canal/conf/canal.properties
canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=127.0.0.1:2181
4.2运行canal-client实例:
4.2.1 建立实例maven工程
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample
[实践:手动创建Maven工程]
4.2.2 添加pom依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
4.2.3 更新依赖 mvn install
4.2.4 ClientSamplet.Java
实例代码
package canal.sample;
/**
* Created by hp on 14-7-17.
*/
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
import com.google.protobuf.InvalidProtocolBufferException;
//import org.jetbrains.annotations.NotNull;
public class ClientSample {
public static void main(String args[]) throws Exception {
// 连接 canal Server
//CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.121",
11111), "example", "", "");
//11111), "chapter6", "", "");
/**通过zk连接canal Server
String zkServers = "192.168.1.121:2181";
//目标实例:可以自定义一个,例如product
String destination = "product";
//连接 canal Server
CanalConnector connector = CanalConnectors.newClusterConnector(zkServers, destination, "", "");
**/
int emptyCount = 0; //空跑的次数
int totalEmtryCount = 1200;//循环多少次为空时退出
try {
connector.connect(); //连接
connector.subscribe(".*\\..*"); //订阅所有,和不写此行一个效果
//connector.subscribe("product_.*\\.product_.*");//订阅product数据库下的product表
connector.rollback();
while (emptyCount < totalEmtryCount) {
//while(true){//一直循环
//批量获取1000个日志(不确认模式)
Message message = connector.getWithoutAck(1000);//这个值根据实际情况修改
long batchId = message.getId();
//以下为空跑计数
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
//做数据处理
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
//private static void printEntry(@NotNull List<Entry> entrys) {
private static void printEntry(List<Entry> entrys) throws InvalidProtocolBufferException {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
//如果是行数据
if(entry.getEntryType() == EntryType.ROWDATA){
//则解析行变更
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
//这里捕获binlog变更信息
// System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
// entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
// entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
// eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
//如果是删除,则获取删除的数据进行业务处理
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//List<Column> columns = rowData.getBeforeColumnsList();
//delete(columns);
//如果是新增修改则获取新增修改数据进行处理
} else if (eventType == EventType.INSERT || eventType == eventType.UPDATE) {
//printColumn(rowData.getAfterColumnsList());
List<Column> columns = rowData.getAfterColumnsList();
save(columns);
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
}
//新增的异构操作
private static void save(List<Column> columns) {
for (Column col:columns) {
String name = col.getName();
String value = col.getValue();
System.out.println("name: "+ name + ",value:" + value);
//name: uid,value:4
//name: name,value:10
}
}
//private static void printColumn(@NotNull List<Column> columns) {
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
报错[有个jar包没找到注释掉了]
import org.jetbrains.annotations.NotNull;
通过以上代码,捕获数据库日志变更,然后进行相关业务的处理。无论是数据异构还是缓存更新
4.2.5 运行java实例
启动后看到控制端信息:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
4.2.6触发数据库变更
create table test (
uid int (4) primary key not null auto_increment,
name varchar(10) not null
);
insert into test (name) values('10');
4.2.7 client 抓取mysql信息:
================> binlog[mysql-bin.000016:3281] , name[canal_test,test] , eventType : INSERT
uid : 7 update=false
name : 10 update=false
empty count : 1
empty count : 2
[发现没有捕获到信息]
tail -f /usr/server/canal/logs/example/example.log
2017-06-03 13:11:28.802 [destination = chapter6 , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status
2017-06-03 13:11:28.817 [destination = chapter6 , address = /127.0.0.1:3306 , EventParser] ERROR c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - dump address /127.0.0.1:3306 has an error, retrying. caused by
com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation
2017-06-03 13:11:28.820 [destination = chapter6 , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:chapter6[com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation
]
原因很简单:因为Mysql需要开启binlog,我设置了,但是没重启。
这样可可以捕获到mysql信息了:
================> binlog[mysql-bin.000001:557] , name[chapter6,test] , eventType : INSERT
uid : 1 update=true
name : 10 update=true
empty count : 1
empty count : 2
5、部署过程中产生问题:
(1)启动失败,log日志中地址正在使用
1、11111端口正在被占用 可以用 ls -i:11111 查看监听进程谁占用端口 或者 用 ps -ef | grep 11111 查看哪个进程占用端口号 然后 kill -9 进程号 杀掉占用进程
2、可以编辑 canal/conf/canal.properties 中的端口号 ,改为不占用的端口
(2)canal无法抓取mysql触发数据库改变的信息
1、检查mysql是否打开binlog写入功能 检查binlog 是否为行模式。
show variables like "binlog_format"
2、检查my.cnf 和 instance.properties 等配置文件填写信息是否正确。
3、检查client 代码 调试实例代码
4、版本兼容问题,canal 1.8 换成 canal 1.7 继续测试
5、查看所有日志文件 分析日志