阿里开源数据同步组件Canal( 二 )

canal.client1.1.4
修改启动类
需要在项目启动时便持续监听
package com.xx;import com.xx.client.CanalClient;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;/*** @author aqi* DateTime: 2021/2/24 4:43 下午* Description: canal启动类*/@SpringBootApplicationpublic class CanalApplication implements CommandLineRunner {@Resourceprivate CanalClient canalClient;public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 项目启动,执行canal客户端监听canalClient.run();}}
编写canal客户端
这里操作数据库用的是
package com.xx.client;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry.*;import com.alibaba.otter.canal.protocol.Message;import com.google.protobuf.InvalidProtocolBufferException;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.net.InetSocketAddress;import java.util.List;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;/*** @author aqi* DateTime: 2021/2/24 4:48 下午* Description: No Description*/@Componentpublic class CanalClient {//sql队列private Queue> SQL_QUEUE = new ConcurrentLinkedQueue<>();@Resourceprivate JdbcTemplate jdbcTemplate;/*** canal入库方法*/public void run() {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("服务器ipd地址", 11111), "example", "", "");int batchSize = 1000;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 1) {executeQueueSql();}}} catch (InterruptedException | InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 模拟执行队列里面的sql语句*/public void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();System.out.println("[sql]----> " + sql);this.execute(sql);}}/*** 数据处理*/private void dataHandle(List entrys) throws InvalidProtocolBufferException {for (Entry entry : entrys) {if (EntryType.ROWDATA =http://www.kingceram.com/post/= entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句*/private void saveUpdateSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List newColumnList = rowData.getAfterColumnsList();StringBuilder sql = new StringBuilder("update " + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" ").append(newColumnList.get(i).getName()).append(" = '").append(newColumnList.get(i).getValue()).append("'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName()).append("=").append(column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句*/private void saveDeleteSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List