在数据全文检索需求越来越大的今天,很多公司都在关系数据库数据的基础上,加上了Elastic Search,来进行数据快速全文检索,所以ElasticSearch与关系型数据库数据进行数据同步就变的尤为重要。本文主要介绍PostgreSQL数据库通过Logstash-JDBC插件与ElasticSearch进行数据的近实时同步。
此数据结构是临时想的,在正常的项目中是不会存在这种结构的,大家见谅。一张学生表,一张老师表,通过课程进行关联。(这种关联关系显示生活中不会这样设计的,但是能说明问题就行一切从简)
create table student(
id bigint primary key NOT NULL,
name varchar(5) not null,
sex char(1) default ‘男‘ ,
age int check(age>1),
courses varchar(20),
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
create table teacher(
id bigint primary key NOT NULL ,
name varchar(5) not null ,
sex char(1) default ‘男‘,
courses varchar(20) default ‘语文‘,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
()
其中logstash.conf 这个文件是logstash指定的配置文件,在下面会有所展示
version: ‘3.7‘
services:
logstash_test:
image:
logstash:7.6.2
container_name:
logstash_test
volumes:
- ‘./services/logstash_test/data/:/usr/data/‘
- ‘./services/logstash_test/config/logstash.yml:/usr/share/logstash/config/logstash.yml‘
- ‘./services/logstash_test/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml‘
- ‘./services/logstash_test/config/logstash_test.conf:/usr/share/logstash/pipeline/logstash.conf‘
pipelines.yml 指定路径要包含logstash.conf文件
- pipeline.id: student_test_xhh
path.config: "/usr/share/logstash/pipeline/*.conf"
pipeline.batch.size: 500
pipeline.batch.delay: 200
pipeline.workers: 1
logstash.yml
config:
reload:
automatic: true
interval: 3s
使用jdbc进行同步,
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://host:port/db_name"
jdbc_driver_library => "/usr/data/postgresql-42.2.14.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_user => "db_user"
jdbc_password => "db_password"
jdbc_paging_enabled => true
jdbc_page_size => 50
statement_filepath => "/usr/data/jdbc_test.sql"
record_last_run => true
clean_run => true
tracking_column_type => "timestamp"
tracking_column => "unix_ts_in_secs"
use_column_value => true
last_run_metadata_path => "/usr/share/logstash/config/student"
schedule => "2 * * * * *"
}
}
filter {
json {
source => "teacher"
target => "teacher"
}
mutate {
add_field => {"[@metadata][doc_id]" => "%{id}"}
}
mutate {
remove_field => ["[teacher][id]", "[id]"]
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => "http://elasticserver:9200"
user => "elasticuser"
password => "elasticpassword"
document_id => "%{[@metadata][doc_id]}"
index => "student_test_xhh"
}
}
着重几个参数介绍:
select s.id,
s.name,
s.sex,
s.age,
s.courses,
s.create_time,
s.update_time,
cast(jsonb_build_object(
‘id‘, t.id,
‘name‘, t.name,
‘sex‘, t.sex,
‘courses‘, t.courses
) as varchar ) as teacher,
s.update_time AS unix_ts_in_secs
from student s left join teacher t on s.courses = t.courses
where (s.update_time > cast(:sql_last_value as timestamp ) and s.update_time < NOW())
group by s.id, t.id
order by s.update_time
更新时间 记录到unix_ts_in_secs中
sql_last_value: logstash内置参数,若第一次运行unix_ts_in_secs中没有记录,则sql_last_value会从时间戳为0的时间算起。
在elk中执行
GET /student_test_xhh/_search
会显示下面数据
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "student_test_xhh",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"create_time" : "2020-08-06T11:42:59.454Z",
"update_time" : "2020-08-06T11:42:59.454Z",
"teacher" : {
"sex" : "男",
"name" : "张老师",
"courses" : "语文"
},
"sex" : "男",
"unix_ts_in_secs" : "2020-08-06T11:42:59.454Z",
"courses" : "语文",
"name" : "李青",
"age" : 12,
"@timestamp" : "2020-08-06T12:10:02.849Z",
"@version" : "1"
}
},
{
"_index" : "student_test_xhh",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"create_time" : "2020-08-06T11:42:59.454Z",
"update_time" : "2020-08-06T11:42:59.454Z",
"teacher" : {
"sex" : "女",
"name" : "王老师",
"courses" : "数学"
},
"sex" : "女",
"unix_ts_in_secs" : "2020-08-06T11:42:59.454Z",
"courses" : "数学",
"name" : "艾欧尼亚",
"age" : 11,
"@timestamp" : "2020-08-06T12:10:02.857Z",
"@version" : "1"
}
},
{
"_index" : "student_test_xhh",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"create_time" : "2020-08-06T11:42:59.454Z",
"update_time" : "2020-08-06T11:42:59.454Z",
"teacher" : {
"sex" : "女",
"name" : "李老师",
"courses" : "历史"
},
"sex" : "男",
"unix_ts_in_secs" : "2020-08-06T11:42:59.454Z",
"courses" : "历史",
"name" : "巨龙之巢",
"age" : 10,
"@timestamp" : "2020-08-06T12:10:02.860Z",
"@version" : "1"
}
},
{
"_index" : "student_test_xhh",
"_type" : "_doc",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"create_time" : "2020-08-06T11:42:59.454Z",
"update_time" : "2020-08-06T11:42:59.454Z",
"teacher" : {
"sex" : null,
"name" : null,
"courses" : null
},
"sex" : "男",
"unix_ts_in_secs" : "2020-08-06T11:42:59.454Z",
"courses" : "英语",
"name" : "暗影岛",
"age" : 9,
"@timestamp" : "2020-08-06T12:10:02.862Z",
"@version" : "1"
}
}
]
}
}
数据同步成功!
原文作者:xaohuihui
【最佳实践示例】Logstash JDBC实现ElasticSearch与关系型数据库Postgre
原文:https://blog.51cto.com/14612701/2517862