将数据库数据的变化, 通过canal解析binlog日志, 实时更新到solr的索引库中.
不会的话,可以看我之前写的博客:https://www.cnblogs.com/dalianpai/p/11671722.html
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.8.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.solr/solr-solrj -->
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>8.5.0</version>
</dependency>
</dependencies>
package com.dalianpai.canal.pojo;
import org.apache.solr.client.solrj.beans.Field;
import java.util.Date;
public class Book {
@Field("id")
private Integer id;
@Field("book_name")
private String name;
@Field("book_author")
private String author;
@Field("book_publishtime")
private Date publishtime;
@Field("book_price")
private Double price;
@Field("book_publishgroup")
private String publishgroup;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public Date getPublishtime() {
return publishtime;
}
public void setPublishtime(Date publishtime) {
this.publishtime = publishtime;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public String getPublishgroup() {
return publishgroup;
}
public void setPublishgroup(String publishgroup) {
this.publishgroup = publishgroup;
}
@Override
public String toString() {
return "Book{" +
"id=" + id +
", name=‘" + name + ‘\‘‘ +
", author=‘" + author + ‘\‘‘ +
", publishtime=" + publishtime +
", price=" + price +
", publishgroup=‘" + publishgroup + ‘\‘‘ +
‘}‘;
}
}
solr中也要建相关Field,在这里添加即可
public class SyncDataBootStart {
private static Logger logger = LoggerFactory.getLogger(SyncDataBootStart.class);
public static void main(String[] args) throws Exception {
String destination = "example";
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.121", 11111),
destination,
"canal",
"canal");
//获取CanalServer 连接
//连接CanalServer
canalConnector.connect();
//订阅Destination
canalConnector.subscribe();
//轮询拉取数据
Integer batchSize = 5*1024;
while (true){
Message message = canalConnector.getWithoutAck(batchSize);
long messageId = message.getId();
int size = message.getEntries().size();
if(messageId == -1 || size == 0){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
//进行数据同步
//1. 解析Message对象
List<InnerBinlogEntry> innerBinlogEntries = CanalDataParser.convertToInnerBinlogEntry(message);
//2. 将解析后的数据信息 同步到Solr的索引库中.
syncDataToSolr(innerBinlogEntries);
}
//提交确认
canalConnector.ack(messageId);
}
}
private static void syncDataToSolr(List<InnerBinlogEntry> innerBinlogEntries) throws Exception {
//获取solr的连接
SolrClient solrClient = new HttpSolrClient.Builder("http://192.168.1.119:8983/solr/canal").build();
//遍历数据集合 , 根据数据集合中的数据信息, 来决定执行增加, 修改 , 删除操作 .
if(innerBinlogEntries != null){
for (InnerBinlogEntry innerBinlogEntry : innerBinlogEntries) {
CanalEntry.EventType eventType = innerBinlogEntry.getEventType();
//如果是Insert, update , 则需要同步数据到 solr 索引库
if(eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE){
List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();
if(rows != null){
for (Map<String, BinlogValue> row : rows) {
BinlogValue id = row.get("id");
BinlogValue name = row.get("name");
BinlogValue author = row.get("author");
BinlogValue publishtime = row.get("publishtime");
BinlogValue price = row.get("price");
BinlogValue publishgroup = row.get("publishgroup");
Book book = new Book();
book.setId(Integer.parseInt(id.getValue()));
book.setName(name.getValue());
book.setAuthor(author.getValue());
book.setPrice(Double.parseDouble(price.getValue()));
book.setPublishgroup(publishgroup.getValue());
book.setPublishtime(DateUtils.parseDate(publishtime.getValue()));
//导入数据到solr索引库
solrClient.addBean(book);
solrClient.commit();
}
}
}else if(eventType == CanalEntry.EventType.DELETE){
//如果是Delete操作, 则需要删除solr索引库中的数据 .
List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();
if(rows != null){
for (Map<String, BinlogValue> row : rows) {
BinlogValue id = row.get("id");
//根据ID删除solr的索引库
solrClient.deleteById(id.getValue());
solrClient.commit();
}
}
}
}
}
solrClient.close();
}
}
进行测试
部分日志:
****************************************************
* Batch Id: [18] ,count : [3] , memsize : [217] , Time : 2020-09-05 17:41:52
* Start : [mysql-bin.000004:3957:1599298910000(2020-09-05 17:41:50)]
* End : [mysql-bin.000004:4207:1599298910000(2020-09-05 17:41:50)]
****************************************************
17:41:52.189 [main] INFO c.d.canal.util.CanalDataParser - BEGIN ----> Thread id: 16
17:41:52.189 [main] INFO c.d.canal.util.CanalDataParser -
================> binlog[mysql-bin.000004:3957] , executeTime : 1599298910000 , delay : 2189ms
17:41:52.189 [main] INFO c.d.canal.util.CanalDataParser -
----------------> binlog[mysql-bin.000004:4094] , name[canal,tb_book] , eventType : DELETE , executeTime : 1599298910000 , delay : 2189ms
17:41:52.189 [main] INFO c.d.canal.util.CanalDataParser - ########################### Data Parse Result ###########################
17:41:52.190 [main] INFO c.d.canal.util.CanalDataParser - rowlog schema[canal], table[tb_book], event[DELETE] , {"author":{"value":"吴瀚请","beforeValue":"吴瀚请"},"price":{"value":"99.0","beforeValue":"99.0"},"publishtime":{"value":"2020-09-05 09:13:28","beforeValue":"2020-09-05 09:13:28"},"name":{"value":"白帽子讲安全协议","beforeValue":"白帽子讲安全协议"},"id":{"value":"3","beforeValue":"3"},"publishgroup":{"value":"电子工业出版社","beforeValue":"电子工业出版社"}}
17:41:52.190 [main] INFO c.d.canal.util.CanalDataParser - ########################### Data Parse Result ###########################
17:41:52.190 [main] INFO c.d.canal.util.CanalDataParser -
17:41:52.190 [main] INFO c.d.canal.util.CanalDataParser - END ----> transaction id: 256
17:41:52.190 [main] INFO c.d.canal.util.CanalDataParser -
================> binlog[mysql-bin.000004:4207] , executeTime : 1599298910000 , delay : 2190ms
17:41:52.190 [main] DEBUG o.a.s.c.solrj.impl.HttpClientUtil - Creating new http client, config:followRedirects=false&allowCompression=false
17:41:52.193 [main] DEBUG o.a.h.c.protocol.RequestAddCookies - CookieSpec selected: default
17:41:52.193 [main] DEBUG o.a.h.c.protocol.RequestAuthCache - Auth cache not set in the context
17:41:52.193 [main] DEBUG o.a.h.i.c.PoolingHttpClientConnectionManager - Connection request: [route: {}->http://192.168.1.119:8983][state: class org.apache.solr.client.solrj.impl.HttpSolrClient][total kept alive: 0; route allocated: 0 of 10000; total allocated: 0 of 10000]
17:41:52.193 [main] DEBUG o.a.h.i.c.PoolingHttpClientConnectionManager - Connection leased: [id: 2][route: {}->http://192.168.1.119:8983][total kept alive: 0; route allocated: 1 of 10000; total allocated: 1 of 10000]
17:41:52.193 [main] DEBUG o.a.h.impl.execchain.MainClientExec - Opening connection {}->http://192.168.1.119:8983
17:41:52.193 [main] DEBUG o.a.h.i.c.DefaultHttpClientConnectionOperator - Connecting to /192.168.1.119:8983
17:41:52.212 [main] DEBUG o.a.h.i.c.DefaultHttpClientConnectionOperator - Connection established 10.0.0.2:9769<->192.168.1.119:8983
17:41:52.212 [main] DEBUG o.a.h.i.c.DefaultManagedHttpClientConnection - http-outgoing-2: set socket timeout to 120000
17:41:52.212 [main] DEBUG o.a.h.impl.execchain.MainClientExec - Executing request POST /solr/canal/update?wt=javabin&version=2 HTTP/1.1
17:41:52.212 [main] DEBUG o.a.h.impl.execchain.MainClientExec - Target auth state: UNCHALLENGED
17:41:52.212 [main] DEBUG o.a.h.impl.execchain.MainClientExec - Proxy auth state: UNCHALLENGED
17:41:52.212 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> POST /solr/canal/update?wt=javabin&version=2 HTTP/1.1
17:41:52.212 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> Content-Type: application/javabin
17:41:52.212 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> User-Agent: Solr[org.apache.solr.client.solrj.impl.HttpSolrClient] 1.0
17:41:52.212 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> Transfer-Encoding: chunked
17:41:52.212 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> Host: 192.168.1.119:8983
17:41:52.212 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> Connection: Keep-Alive
17:41:52.212 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "POST /solr/canal/update?wt=javabin&version=2 HTTP/1.1[\r][\n]"
17:41:52.212 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "Content-Type: application/javabin[\r][\n]"
17:41:52.212 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "User-Agent: Solr[org.apache.solr.client.solrj.impl.HttpSolrClient] 1.0[\r][\n]"
17:41:52.212 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "Transfer-Encoding: chunked[\r][\n]"
17:41:52.212 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "Host: 192.168.1.119:8983[\r][\n]"
17:41:52.212 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "Connection: Keep-Alive[\r][\n]"
17:41:52.213 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "[\r][\n]"
17:41:52.213 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "2d[\r][\n]"
17:41:52.213 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "[0x2][0xc4][0xe0]¶ms[0xc0][0xe0]*delByIdMap[\n]"
17:41:52.213 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "[0x1][0xe0]!3[0x0][0xe0]&delByQ[0x0][0xe0]$docs[0x0][\r][\n]"
17:41:52.213 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "0[\r][\n]"
17:41:52.213 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "[\r][\n]"
17:41:52.285 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "HTTP/1.1 200 OK[\r][\n]"
17:41:52.285 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "Content-Security-Policy: default-src ‘none‘; base-uri ‘none‘; connect-src ‘self‘; form-action ‘self‘; font-src ‘self‘; frame-ancestors ‘none‘; img-src ‘self‘; media-src ‘self‘; style-src ‘self‘ ‘unsafe-inline‘; script-src ‘self‘; worker-src ‘self‘;[\r][\n]"
17:41:52.285 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "X-Content-Type-Options: nosniff[\r][\n]"
17:41:52.286 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "X-Frame-Options: SAMEORIGIN[\r][\n]"
17:41:52.286 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "X-XSS-Protection: 1; mode=block[\r][\n]"
17:41:52.286 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "Content-Type: application/octet-stream[\r][\n]"
17:41:52.286 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "Content-Length: 41[\r][\n]"
17:41:52.286 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "[\r][\n]"
17:41:52.286 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "[0x2][0xa1][0xe0].responseHeader[0xa2][0xe0]&status[0x6][0x0][0x0][0x0][0x0][0xe0]%QTimeU[0x2]"
17:41:52.286 [main] DEBUG org.apache.http.headers - http-outgoing-2 << HTTP/1.1 200 OK
17:41:52.286 [main] DEBUG org.apache.http.headers - http-outgoing-2 << Content-Security-Policy: default-src ‘none‘; base-uri ‘none‘; connect-src ‘self‘; form-action ‘self‘; font-src ‘self‘; frame-ancestors ‘none‘; img-src ‘self‘; media-src ‘self‘; style-src ‘self‘ ‘unsafe-inline‘; script-src ‘self‘; worker-src ‘self‘;
17:41:52.286 [main] DEBUG org.apache.http.headers - http-outgoing-2 << X-Content-Type-Options: nosniff
17:41:52.286 [main] DEBUG org.apache.http.headers - http-outgoing-2 << X-Frame-Options: SAMEORIGIN
17:41:52.286 [main] DEBUG org.apache.http.headers - http-outgoing-2 << X-XSS-Protection: 1; mode=block
17:41:52.286 [main] DEBUG org.apache.http.headers - http-outgoing-2 << Content-Type: application/octet-stream
17:41:52.286 [main] DEBUG org.apache.http.headers - http-outgoing-2 << Content-Length: 41
17:41:52.286 [main] DEBUG o.a.h.impl.execchain.MainClientExec - Connection can be kept alive indefinitely
17:41:52.287 [main] DEBUG o.a.h.i.c.PoolingHttpClientConnectionManager - Connection [id: 2][route: {}->http://192.168.1.119:8983][state: class org.apache.solr.client.solrj.impl.HttpSolrClient] can be kept alive indefinitely
17:41:52.287 [main] DEBUG o.a.h.i.c.DefaultManagedHttpClientConnection - http-outgoing-2: set socket timeout to 0
17:41:52.287 [main] DEBUG o.a.h.i.c.PoolingHttpClientConnectionManager - Connection released: [id: 2][route: {}->http://192.168.1.119:8983][state: class org.apache.solr.client.solrj.impl.HttpSolrClient][total kept alive: 1; route allocated: 1 of 10000; total allocated: 1 of 10000]
17:41:52.288 [main] DEBUG o.a.h.c.protocol.RequestAddCookies - CookieSpec selected: default
17:41:52.288 [main] DEBUG o.a.h.c.protocol.RequestAuthCache - Auth cache not set in the context
17:41:52.288 [main] DEBUG o.a.h.i.c.PoolingHttpClientConnectionManager - Connection request: [route: {}->http://192.168.1.119:8983][state: class org.apache.solr.client.solrj.impl.HttpSolrClient][total kept alive: 1; route allocated: 1 of 10000; total allocated: 1 of 10000]
17:41:52.288 [main] DEBUG o.a.h.i.c.PoolingHttpClientConnectionManager - Connection leased: [id: 2][route: {}->http://192.168.1.119:8983][state: class org.apache.solr.client.solrj.impl.HttpSolrClient][total kept alive: 0; route allocated: 1 of 10000; total allocated: 1 of 10000]
17:41:52.288 [main] DEBUG o.a.h.i.c.DefaultManagedHttpClientConnection - http-outgoing-2: set socket timeout to 0
17:41:52.288 [main] DEBUG o.a.h.i.c.DefaultManagedHttpClientConnection - http-outgoing-2: set socket timeout to 120000
17:41:52.288 [main] DEBUG o.a.h.impl.execchain.MainClientExec - Executing request POST /solr/canal/update HTTP/1.1
17:41:52.288 [main] DEBUG o.a.h.impl.execchain.MainClientExec - Target auth state: UNCHALLENGED
17:41:52.288 [main] DEBUG o.a.h.impl.execchain.MainClientExec - Proxy auth state: UNCHALLENGED
17:41:52.288 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> POST /solr/canal/update HTTP/1.1
17:41:52.288 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> Content-Type: application/x-www-form-urlencoded; charset=UTF-8
17:41:52.288 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> User-Agent: Solr[org.apache.solr.client.solrj.impl.HttpSolrClient] 1.0
17:41:52.288 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> Content-Length: 67
17:41:52.288 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> Host: 192.168.1.119:8983
17:41:52.288 [main] DEBUG org.apache.http.headers - http-outgoing-2 >> Connection: Keep-Alive
17:41:52.288 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "POST /solr/canal/update HTTP/1.1[\r][\n]"
17:41:52.288 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "Content-Type: application/x-www-form-urlencoded; charset=UTF-8[\r][\n]"
17:41:52.289 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "User-Agent: Solr[org.apache.solr.client.solrj.impl.HttpSolrClient] 1.0[\r][\n]"
17:41:52.289 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "Content-Length: 67[\r][\n]"
17:41:52.289 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "Host: 192.168.1.119:8983[\r][\n]"
17:41:52.289 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "Connection: Keep-Alive[\r][\n]"
17:41:52.289 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "[\r][\n]"
17:41:52.289 [main] DEBUG org.apache.http.wire - http-outgoing-2 >> "commit=true&softCommit=false&waitSearcher=true&wt=javabin&version=2"
17:41:52.426 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "HTTP/1.1 200 OK[\r][\n]"
17:41:52.427 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "Content-Security-Policy: default-src ‘none‘; base-uri ‘none‘; connect-src ‘self‘; form-action ‘self‘; font-src ‘self‘; frame-ancestors ‘none‘; img-src ‘self‘; media-src ‘self‘; style-src ‘self‘ ‘unsafe-inline‘; script-src ‘self‘; worker-src ‘self‘;[\r][\n]"
17:41:52.427 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "X-Content-Type-Options: nosniff[\r][\n]"
17:41:52.427 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "X-Frame-Options: SAMEORIGIN[\r][\n]"
17:41:52.427 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "X-XSS-Protection: 1; mode=block[\r][\n]"
17:41:52.427 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "Content-Type: application/octet-stream[\r][\n]"
17:41:52.427 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "Content-Length: 41[\r][\n]"
17:41:52.427 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "[\r][\n]"
17:41:52.428 [main] DEBUG org.apache.http.wire - http-outgoing-2 << "[0x2][0xa1][0xe0].responseHeader[0xa2][0xe0]&status[0x6][0x0][0x0][0x0][0x0][0xe0]%QTime[[0x6]"
17:41:52.428 [main] DEBUG org.apache.http.headers - http-outgoing-2 << HTTP/1.1 200 OK
17:41:52.428 [main] DEBUG org.apache.http.headers - http-outgoing-2 << Content-Security-Policy: default-src ‘none‘; base-uri ‘none‘; connect-src ‘self‘; form-action ‘self‘; font-src ‘self‘; frame-ancestors ‘none‘; img-src ‘self‘; media-src ‘self‘; style-src ‘self‘ ‘unsafe-inline‘; script-src ‘self‘; worker-src ‘self‘;
17:41:52.429 [main] DEBUG org.apache.http.headers - http-outgoing-2 << X-Content-Type-Options: nosniff
17:41:52.429 [main] DEBUG org.apache.http.headers - http-outgoing-2 << X-Frame-Options: SAMEORIGIN
17:41:52.429 [main] DEBUG org.apache.http.headers - http-outgoing-2 << X-XSS-Protection: 1; mode=block
17:41:52.429 [main] DEBUG org.apache.http.headers - http-outgoing-2 << Content-Type: application/octet-stream
17:41:52.429 [main] DEBUG org.apache.http.headers - http-outgoing-2 << Content-Length: 41
17:41:52.429 [main] DEBUG o.a.h.impl.execchain.MainClientExec - Connection can be kept alive indefinitely
17:41:52.430 [main] DEBUG o.a.h.i.c.PoolingHttpClientConnectionManager - Connection [id: 2][route: {}->http://192.168.1.119:8983][state: class org.apache.solr.client.solrj.impl.HttpSolrClient] can be kept alive indefinitely
17:41:52.430 [main] DEBUG o.a.h.i.c.DefaultManagedHttpClientConnection - http-outgoing-2: set socket timeout to 0
17:41:52.431 [main] DEBUG o.a.h.i.c.PoolingHttpClientConnectionManager - Connection released: [id: 2][route: {}->http://192.168.1.119:8983][state: class org.apache.solr.client.solrj.impl.HttpSolrClient][total kept alive: 1; route allocated: 1 of 10000; total allocated: 1 of 10000]
17:41:52.431 [main] DEBUG o.a.h.i.c.PoolingHttpClientConnectionManager - Connection manager is shutting down
17:41:52.432 [main] DEBUG o.a.h.i.c.DefaultManagedHttpClientConnection - http-outgoing-2: Close connection
17:41:52.432 [main] DEBUG o.a.h.i.c.PoolingHttpClientConnectionManager - Connection manager shut down
Process finished with exit code -1
原文:https://www.cnblogs.com/dalianpai/p/13619038.html