<?php
date_default_timezone_set(‘Asia/Shanghai‘);
$longopts = array(
‘input:‘,
‘output:‘,
‘query:‘,
‘start:‘,
‘end:‘,
);
$options = getopt(‘a‘, $longopts);
if(!isset($options[‘input‘])) {
fwrite(STDERR, "input必须设置索引地址\n");
exit(1);
}
$check = get($options[‘input‘].‘/_count‘);
if($check===false) {
fwrite(STDERR, "input索引地址无效:{$options[‘input‘]}\n");
exit(2);
}
$check = json_decode($check, true);
if(!isset($check[‘count‘])) {
fwrite(STDERR, "input索引地址无效:{$options[‘input‘]}\n");
exit(3);
}
$components = parse_url($options[‘input‘]);
$host = "{$components[‘scheme‘]}://{$components[‘host‘]}:{$components[‘port‘]}";
$index = basename($components[‘path‘]);
$query = isset($options[‘query‘]) ? $options[‘query‘] : ‘{"query":{"match_all":{}}}‘;
if(isset($options[‘start‘])) {
$start_time = strtotime($options[‘start‘]);
$start = date(‘Y-m-d‘, $start_time).‘T00:00:00+0800‘;
if(isset($options[‘end‘])) {
$end_time = strtotime($options[‘end‘]);
$end = date(‘Y-m-d‘, $end_time).‘T00:00:00+0800‘;
} else {
$end = date(‘Y-m-d‘, $start_time+86400).‘T00:00:00+0800‘;
}
$field = strpos($index, ‘analysis‘)!==false ? ‘create_time‘ : ‘date‘;
$query = ‘{"size":1000,"sort":{"‘.$field.‘":{"order":"asc"}},"query":{"filtered":{"filter":{"range":{"‘.$field.‘":{"gte":"‘.$start.‘","lt":"‘.$end.‘"}}}}}}‘;
if(strpos($index, ‘eventlogs‘)!==false) {
$query = ‘{"size":1000,"sort":{"‘.$field.‘":{"order":"asc"}},"query":{"filtered":{"filter":{"bool":{‘.
‘"must":[{"range":{"date":{"gte":"‘.$start.‘","lte":"‘.$end.‘"}}}],‘.
‘"must_not":[{"exists": {"field":"nsp3hq"}},{"exists": {"field":"q0i8u1"}},{"exists": {"field":"eyn916"}},{"exists": {"field":"20mqd8"}},‘.
‘{"exists": {"field":"wwbkux"}},{"exists": {"field":"r5ua96"}},{"exists": {"field":"easiz"}},{"exists": {"field":"dexusu"}},{"exists": {"field":"earts"}},‘.
‘{"exists": {"field":"ealu"}},{"exists": {"field":"ealf"}},{"exists": {"field":"eal"}},{"exists": {"field":"ears"}},{"exists": {"field":"ealuf"}},‘.
‘{"exists": {"field":"ealus"}},{"exists": {"field":"eaatf"}},{"exists": {"field":"enail"}},{"exists": {"field":"enuail"}},{"exists": {"field":"test"}}]‘.
‘}}}}}‘;
}
}
$scroll_id = null;
$retry = 0;
$num = 0;
while(true) {
if(is_null($scroll_id)) {
$result = post("{$host}/{$index}/_search?scroll=2m", $query);
} else {
$result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}");
}
$json = json_decode($result, true);
if(!isset($json[‘_scroll_id‘])) {
fwrite(STDERR, "查询失败:索引-{$index} 起始-{$start} 截止-{$end}\n");
sleep(5);
$retry++;
if($retry>10) {
exit(4);
}
}
$scroll_id = $json[‘_scroll_id‘];
foreach($json[‘hits‘][‘hits‘] as $row) {
fwrite(STDOUT, json_encode($row)."\n");
$num++;
}
if(count($json[‘hits‘][‘hits‘])==0)
{
break;
}
usleep(100000);
}
//校验条数是否一致
$query = json_decode($query, true);
unset($query[‘size‘], $query[‘sort‘]);
$result = post("{$host}/{$index}/_count", json_encode($query));
$json = json_decode($result, true);
if(!isset($json[‘count‘]) or intval($json[‘count‘])!==$num) {
fwrite(STDERR, "校验失败:索引-{$index} 起始-{$start} 截止-{$end} 记录条数-{$json[‘count‘]} 导出条数-{$num}\n");
}
function get($url)
{
$handle = curl_init();
//curl_setopt($handle, CURLOPT_POST, 1);
curl_setopt($handle, CURLOPT_HEADER, 0);
curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($handle, CURLOPT_URL, $url);
//curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
return curl_exec($handle);
}
function post($url, $data)
{
$handle = curl_init();
curl_setopt($handle, CURLOPT_POST, 1);
curl_setopt($handle, CURLOPT_HEADER, 0);
curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($handle, CURLOPT_URL, $url);
curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
return curl_exec($handle);
}