我计划以后将单位的搜索项目,慢慢过渡到ES集群,然后实现准实时的搜索.
canal可以感知数据库的变化,作为一个mysql的伪slave,我可以通过canal获取数据库变化,然后批量刷新到ES集群.
canal是阿里开源的一款产品..
虽然感觉国产开源不靠谱.但是没办法..洋人没做,自己又不会做.
主页:
https://github.com/alibaba/canal
下载地址:
https://github.com/alibaba/canal/releases
开发API:
https://github.com/alibaba/canal/wiki/ClientAPI
下载canal,解压然后修改配置文件.
canal.properties
canal.zkServers=192.168.16.105:2181,192.168.16.106:2181,192.168.16.108:2181,192.168.16.109:2181,192.168.16.110:2181
canal.destinations= songod
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
songod/instance.properties
canal.instance.master.address = 192.168.16.98:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = songod
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =
然后执行 startup.sh
运行如下程序,接收数据变更.
-
import java.util.HashMap;
-
import java.util.List;
-
import java.util.Map;
-
import java.util.concurrent.TimeUnit;
-
-
import com.alibaba.otter.canal.client.CanalConnector;
-
import com.alibaba.otter.canal.client.CanalConnectors;
-
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
-
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
-
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
-
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
-
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
-
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
-
import com.alibaba.otter.canal.protocol.Message;
-
-
public class T {
-
public static void main(String args[]) {
-
-
CanalConnector connector = CanalConnectors.newClusterConnector("192.168.16.105:2181,192.168.16.108:2181",
-
"songod", "", "");
-
int batchSize = 100;
-
-
connector.connect();
-
connector.subscribe(".*\\..*");
-
while (true) {
-
long batchId = -1;
-
try {
-
Message message = connector.getWithoutAck(batchSize, new Long(5), TimeUnit.SECONDS);
-
batchId = message.getId();
-
int size = message.getEntries().size();
-
System.out.println("batchSize:" + size);
-
printEntry(message.getEntries());
-
-
connector.ack(batchId);
-
} catch (Exception e) {
-
connector.rollback(batchId);
-
connector.disconnect();
-
}
-
}
-
-
}
-
-
private static void printEntry(List<Entry> entrys) {
-
for (Entry entry : entrys) {
-
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
-
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
-
continue;
-
}
-
-
RowChange rowChage = null;
-
try {
-
rowChage = RowChange.parseFrom(entry.getStoreValue());
-
} catch (Exception e) {
-
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-
e);
-
}
-
EventType eventType = rowChage.getEventType();
-
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
-
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
-
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
-
-
for (RowData rowData : rowChage.getRowDatasList()) {
-
Map<String, Object> map = null;
-
if (eventType == EventType.DELETE) {
-
map = printColumn(rowData);
-
} else if (eventType == EventType.INSERT) {
-
map = printColumn(rowData);
-
} else if (eventType == EventType.UPDATE) {
-
map = printColumn(rowData);
-
}
-
System.out.print(eventType + ":");
-
System.out.println(map);
-
}
-
}
-
}
-
-
private static Map<String, Object> printColumn(RowData rowData) {
-
Map<String, Object> map = new HashMap<String, Object>();
-
for (Column column : rowData.getBeforeColumnsList()) {
-
map.put(column.getName(), column.getValue());
-
}
-
return map;
-
}
-
}
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.21</version>
</dependency>
参考:
http://blog.csdn.net/bbirdsky/article/details/41479479
canal初探
原文:http://blog.itpub.net/29254281/viewspace-2102706/