阿里开源数据同步组件Canal( 三 )

rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List columnList = rowData.getBeforeColumnsList();StringBuilder sql = new StringBuilder("delete from " + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName()).append("=").append(column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句*/private voidsaveInsertSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List columnList = rowData.getAfterColumnsList();StringBuilder sql = new StringBuilder("insert into " + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'").append(columnList.get(i).getValue()).append("'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 入库*/public void execute(String sql) {jdbcTemplate.execute(sql);}}
注意:
如果项目启动失败,并且抛出canal连接失败,这个大概率是防火墙的问题,因为canal默认的端口号是11111,这个默认是关闭的,所以需要手动开启
查看已经开启的端口号
firewall-cmd --zone=public --list-ports
开启11111端口
firewall-cmd --zone=public --add-port=11111/tcp --permanent
重启防火墙
firewall-cmd --reload
往数据库中插入数据
INSERT INTO tb_user (username, age) VALUES ('Tom', 20)
查看数据是否同步,可以看到数据同步成功
三.总结 canal对于代码没有侵入性,基于监听mysql 日志去实现数据同步,实时性也非常的不错,这里有官方发出的性能测试结果,日常工作一般是够用了