看,灰机...

Kibana优化过程(Optimize)过长或无法结束的解决方案

使用过Kibana的同学应该都知道,当我们在kibana的配置文件中打开或者关闭功能,或者安装、卸载额外的插件后,重启kibana会触发一个优化的过程(optimize),如下图:

20181212101105813.png

这个过程或长或短,视你电脑的性能而定。这里简单介绍一下该过程所要完成的事情。

Kibana是一个单页Web应用

首先,Kibana是一个单页的web应用。何为单页web应用?即所有的页面的读取都是在浏览器上完成,而与后台服务器无关。与后台服务器的通信只关乎数据,而非页面。所以,应用上所有的UI都被打包在一起,一次性的发送到了浏览器端,而不是通过URL到后台进行获取。所以,我们看到kibana的首页是下面这样的: http://localhost:5601/app/kibana#/ 注意这里的#后,代表#后面的内容会被浏览器提取,不往服务器端进行url的情况,而是在浏览器上进行内部重新渲染。因为所有的页面都是存储在浏览器的,所有在初次访问的时候,会加载大量的代码到浏览器端,这些代码都是被压缩过的bundle文件:

20181212101741186.png

而optimize的过程,就是把这些原本可读性的源代码压缩为bundle.js的过程。因此,每当你对Kibana进行裁剪之后重启,因为前端的部分是完全由浏览器负责的,所有bundle文件需要重新生成后再发给浏览器,所以会触发optimize的过程。

Kibana在6.2.0版本之后,常规版本已经默认自带了xpack(当然,你还是可以直接下载不带xpack的开源社区版),导致Kibana的size已经到了200M左右,而且越往后的版本,功能越多,代码量越大,每次optimize的过程都会耗费更多的时间。一般来说,我们会将Kibana部署在单独的机器上,因为这仅仅是一个web后端,通常我们不会分配比较优质的资源,(2C4G都算浪费的了),这种情况下面,每次我们裁剪后重启Kibana都会耗费半个小时~1个小时的时间,更有甚者直接hang住,查看系统日志才知道OOM了。

Nodejs的内存机制

Kibana是用Nodejs编写的程序,在一般的后端语言中,基本的内存使用上基本没有什么限制,但是在nodeJs中却只能使用部分内存。在64位系统下位约为1.4G,在32位系统下约为0.7G,造成这个问题的主要原因是因为nodeJs基于V8构建,V8使用自己的方式来管理和分配内存,这一套管理方式在浏览器端使用绰绰有余,但是在nodeJs中这却限制了开发者,在应用中如果碰到了这个限制,就会造成进程退出。

Nodejs内存机制对Kibana优化的影响

因为Kibana的代码体量越来越大,将所有的代码加载到内存之后,再解析语法树,进行bundle的转换所耗费的内存已经接近1.4G的限制了,当你安装更多插件,比如sentinl的时候,系统往往已经无法为继,导致Kibana无法启动

解决方案

这种情况下,我们需要在Kibana启动的时候,指定NodeJs使用更多的内存。这个可以通过设置Node的环境变量办到。

NODE_OPTIONS="--max-old-space-size=4096"

当然,我的建议是直接指定在kibana的启动脚本当中,修改/usr/share/kibana/bin/kibana文件为:

#!/bin/sh
SCRIPT=$0

# SCRIPT may be an arbitrarily deep series of symlinks. Loop until we have the concrete path.
while [ -h "$SCRIPT" ] ; do
  ls=$(ls -ld "$SCRIPT")
  # Drop everything prior to ->
  link=$(expr "$ls" : '.*-> \(.*\)$')
  if expr "$link" : '/.*' > /dev/null; then
    SCRIPT="$link"
  else
    SCRIPT=$(dirname "$SCRIPT")/"$link"
  fi
done

DIR="$(dirname "${SCRIPT}")/.."
NODE="${DIR}/node/bin/node"
test -x "$NODE" || NODE=$(which node)
if [ ! -x "$NODE" ]; then
  echo "unable to find usable node.js executable."
  exit 1
fi

NODE_ENV=production exec "${NODE}" $NODE_OPTIONS --max_old_space_size=3072 --no-warnings "${DIR}/src/cli" ${@}

改动在最后一句:NODE_ENV=production exec "${NODE}" $NODE_OPTIONS --max_old_space_size=3072 --no-warnings "${DIR}/src/cli" ${@}

这样,我们可以保证Kibana能顺利的完成optimize的过程

继续阅读 »

使用过Kibana的同学应该都知道,当我们在kibana的配置文件中打开或者关闭功能,或者安装、卸载额外的插件后,重启kibana会触发一个优化的过程(optimize),如下图:

20181212101105813.png

这个过程或长或短,视你电脑的性能而定。这里简单介绍一下该过程所要完成的事情。

Kibana是一个单页Web应用

首先,Kibana是一个单页的web应用。何为单页web应用?即所有的页面的读取都是在浏览器上完成,而与后台服务器无关。与后台服务器的通信只关乎数据,而非页面。所以,应用上所有的UI都被打包在一起,一次性的发送到了浏览器端,而不是通过URL到后台进行获取。所以,我们看到kibana的首页是下面这样的: http://localhost:5601/app/kibana#/ 注意这里的#后,代表#后面的内容会被浏览器提取,不往服务器端进行url的情况,而是在浏览器上进行内部重新渲染。因为所有的页面都是存储在浏览器的,所有在初次访问的时候,会加载大量的代码到浏览器端,这些代码都是被压缩过的bundle文件:

20181212101741186.png

而optimize的过程,就是把这些原本可读性的源代码压缩为bundle.js的过程。因此,每当你对Kibana进行裁剪之后重启,因为前端的部分是完全由浏览器负责的,所有bundle文件需要重新生成后再发给浏览器,所以会触发optimize的过程。

Kibana在6.2.0版本之后,常规版本已经默认自带了xpack(当然,你还是可以直接下载不带xpack的开源社区版),导致Kibana的size已经到了200M左右,而且越往后的版本,功能越多,代码量越大,每次optimize的过程都会耗费更多的时间。一般来说,我们会将Kibana部署在单独的机器上,因为这仅仅是一个web后端,通常我们不会分配比较优质的资源,(2C4G都算浪费的了),这种情况下面,每次我们裁剪后重启Kibana都会耗费半个小时~1个小时的时间,更有甚者直接hang住,查看系统日志才知道OOM了。

Nodejs的内存机制

Kibana是用Nodejs编写的程序,在一般的后端语言中,基本的内存使用上基本没有什么限制,但是在nodeJs中却只能使用部分内存。在64位系统下位约为1.4G,在32位系统下约为0.7G,造成这个问题的主要原因是因为nodeJs基于V8构建,V8使用自己的方式来管理和分配内存,这一套管理方式在浏览器端使用绰绰有余,但是在nodeJs中这却限制了开发者,在应用中如果碰到了这个限制,就会造成进程退出。

Nodejs内存机制对Kibana优化的影响

因为Kibana的代码体量越来越大,将所有的代码加载到内存之后,再解析语法树,进行bundle的转换所耗费的内存已经接近1.4G的限制了,当你安装更多插件,比如sentinl的时候,系统往往已经无法为继,导致Kibana无法启动

解决方案

这种情况下,我们需要在Kibana启动的时候,指定NodeJs使用更多的内存。这个可以通过设置Node的环境变量办到。

NODE_OPTIONS="--max-old-space-size=4096"

当然,我的建议是直接指定在kibana的启动脚本当中,修改/usr/share/kibana/bin/kibana文件为:

#!/bin/sh
SCRIPT=$0

# SCRIPT may be an arbitrarily deep series of symlinks. Loop until we have the concrete path.
while [ -h "$SCRIPT" ] ; do
  ls=$(ls -ld "$SCRIPT")
  # Drop everything prior to ->
  link=$(expr "$ls" : '.*-> \(.*\)$')
  if expr "$link" : '/.*' > /dev/null; then
    SCRIPT="$link"
  else
    SCRIPT=$(dirname "$SCRIPT")/"$link"
  fi
done

DIR="$(dirname "${SCRIPT}")/.."
NODE="${DIR}/node/bin/node"
test -x "$NODE" || NODE=$(which node)
if [ ! -x "$NODE" ]; then
  echo "unable to find usable node.js executable."
  exit 1
fi

NODE_ENV=production exec "${NODE}" $NODE_OPTIONS --max_old_space_size=3072 --no-warnings "${DIR}/src/cli" ${@}

改动在最后一句:NODE_ENV=production exec "${NODE}" $NODE_OPTIONS --max_old_space_size=3072 --no-warnings "${DIR}/src/cli" ${@}

这样,我们可以保证Kibana能顺利的完成optimize的过程

收起阅读 »

社区日报 第475期 (2018-12-11)

1、当Elasticsearch遇见Kafka
http://t.cn/EUwFsy6
2、Elasticsearch检索 — 聚合和LBS
http://t.cn/EU7qsRb
3、有赞订单管理的三生三世与 “十面埋伏”
http://t.cn/EU75ZTF

编辑:叮咚光军
归档:https://elasticsearch.cn/article/6187
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1、当Elasticsearch遇见Kafka
http://t.cn/EUwFsy6
2、Elasticsearch检索 — 聚合和LBS
http://t.cn/EU7qsRb
3、有赞订单管理的三生三世与 “十面埋伏”
http://t.cn/EU75ZTF

编辑:叮咚光军
归档:https://elasticsearch.cn/article/6187
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

Day 11 -父子关系维护检索实战一 - Elasticsearch 5.x-父子关系维护

本次分享包括两篇文章
  • 父子关系维护检索实战一 Elasticsearch 5.x 父子关系维护检索实战
  • 父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检索实战

本文是其中第一篇- Elasticsearch 5.x 父子关系维护检索实战,涵盖以下部分内容:
  1. Elasticsearch 5.x 中父子关系mapping结构设计
  2. Elasticsearch 5.x 中维护父子关系数据
  3. Elasticsearch 5.x 中has_child和has_parent查询的基本用法
  4. Elasticsearch 5.x 中如何在检索中同时返回父子数据

案例说明
以一个体检记录相关的数据来介绍本文涉及的相关功能,体检数据包括客户基本信息basic和客户医疗记录medical、客户体检记录exam、客户体检结果分析记录diagnosis,它们之间的关系图如下:
parent.png


我们采用Elasticsearch java客户端 bboss-elastic 来实现本文相关功能。

1.准备工作
参考文档《高性能elasticsearch ORM开发库使用介绍》导入和配置bboss客户端

2.定义mapping结构-Elasticsearch 5.x 中父子关系mapping结构设计
Elasticsearch 5.x中一个indice mapping支持多个mapping type,通过在子类型mapping中指定父类型的mapping type名字来设置父子关系,例如:
父类型
"basic": {
....
}
子类型:
"medical": { 
      "_parent": { "type": "basic" },
     .................
}
新建dsl配置文件-esmapper/Client_Info.xml,定义完整的mapping结构:createClientIndice
<properties>

<!--
创建客户信息索引索引表
-->
<property name="createClientIndice">
<![CDATA[{
"settings": {
"number_of_shards": 6,
"index.refresh_interval": "5s"
},
"mappings": {
"basic": { ##基本信息
"properties": {
"party_id": {
"type": "keyword"
},
"sex": {
"type": "keyword"
},
"mari_sts": {
"type": "keyword"
},
"ethnic": {
"type": "text"
},
"prof": {
"type": "text"
},
"province": {
"type": "text"
},
"city": {
"type": "text"
},
"client_type": {
"type": "keyword"
},
"client_name": {
"type": "text"
},
"age": {
"type": "integer"
},
"id_type": {
"type": "keyword"
},
"idno": {
"type": "keyword"
},
"education": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"diagnosis": { ##结果分析
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"provider": {
"type": "text"
},
"subject": {
"type": "text"
},
"diagnosis_type": {
"type": "text"
},
"icd10_code": {
"type": "text",
"type": "keyword"
},
"sd_disease_name": {
"type": "text",
"type": "keyword"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"medical": { ##医疗情况
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hos_name_yb": {
"type": "text"
},
"eivisions_name": {
"type": "text"
},
"medical_type": {
"type": "text"
},
"medical_common_name": {
"type": "text"
},
"medical_sale_name": {
"type": "text"
},
"medical_code": {
"type": "text"
},
"specification": {
"type": "text"
},
"usage_num": {
"type": "text"
},
"unit": {
"type": "text"
},
"usage_times": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"exam": { ##检查结果
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hospital": {
"type": "text"
},
"dept": {
"type": "text"
},
"is_ok": {
"type": "text"
},
"exam_result": {
"type": "text"
},
"fld1": {
"type": "text"
},
"fld2": {
"type": "text"
},
"fld3": {
"type": "text"
},
"fld4": {
"type": "text"
},
"fld5": {
"type": "text"
},
"fld901": {
"type": "text"
},
"fld6": {
"type": "text"
},
"fld902": {
"type": "text"
},
"fld14": {
"type": "text"
},
"fld20": {
"type": "text"
},
"fld21": {
"type": "text"
},
"fld23": {
"type": "text"
},
"fld24": {
"type": "text"
},
"fld65": {
"type": "text"
},
"fld66": {
"type": "text"
},
"fld67": {
"type": "text"
},
"fld68": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
}
}]]>
</property>
</properties>

这个mapping中定义了4个索引类型:basic,exam,medical,diagnosis,其中basic是其他类型的父类型。
通过bboss客户端创建名称为client_info 的索引:
	public void createClientIndice(){
//定义客户端实例,加载上面建立的dsl配置文件
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
try {
//client_info存在返回true,不存在返回false
boolean exist = clientUtil.existIndice("client_info");

//如果索引表client_info已经存在先删除mapping
if(exist) {//先删除mapping client_info
clientUtil.dropIndice("client_info");
}
} catch (ElasticSearchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//创建mapping client_info
clientUtil.createIndiceMapping("client_info","createClientIndice");
String client_info = clientUtil.getIndice("client_info");//获取最新建立的索引表结构client_info
System.out.println("after createClientIndice clientUtil.getIndice(\"client_info\") response:"+client_info);
}


3.维护父子关系数据-Elasticsearch 5.x 中维护父子关系数据
  • 定义对象

首先定义四个对象,分别对应mapping中的四个索引类型,篇幅关系只列出主要属性
  • Basic
  • Medical
  • Exam
  • Diagnosis

通过注解@ESId指定基本信息文档_id
public class Basic extends ESBaseData {
/**
* 索引_id
*/
@ESId
private String party_id;
private String sex; // 性别
......
}
通过注解@ESParentId指定Medical关联的基本信息文档_id,Medical文档_id由ElasticSearch自动生成
public class Medical extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hos_name_yb; //就诊医院
...
}
通过注解@ESParentId指定Exam关联的基本信息文档_id,Exam文档_id由ElasticSearch自动生成
public class Exam extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hospital; // 就诊医院
....
}
通过注解@ESParentId指定Diagnosis关联的基本信息文档_id,Diagnosis文档_id由ElasticSearch自动生成
public class Diagnosis extends ESBaseData {
@ESParentId
private String party_id; //父id
private String provider; //诊断医院
private String subject; //科室
......
}

  • 通过api维护测试数据

对象定义好了后,通过bboss客户数据到之前建立好的索引client_info中。
	/**
* 录入体检医疗信息
*/
public void importClientInfoDataFromBeans() {
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();

//导入基本信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Basic> basics = buildBasics();
clientUtil.addDocuments("client_info","basic",basics,"refresh");

//导入医疗信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Medical> medicals = buildMedicals();
clientUtil.addDocuments("client_info","medical",medicals,"refresh");

//导入体检结果数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Exam> exams = buildExams();
clientUtil.addDocuments("client_info","exam",exams,"refresh");

//导入结果诊断数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Diagnosis> diagnosiss = buildDiagnosiss();
clientUtil.addDocuments("client_info","diagnosis",diagnosiss,"refresh");
}
//构建基本信息集合
private List<Basic> buildBasics() {
List<Basic> basics = new ArrayList<Basic>();
Basic basic = new Basic();
basic.setParty_id("1");
basic.setAge(60);
basics.add(basic);
//继续添加其他数据
return basics;

}
//
构建医疗信息集合
private List<Medical> buildMedicals() {
List<Medical> medicals = new ArrayList<Medical>();
Medical medical = new Medical();
medical.setParty_id("1");//设置父文档id-基本信息文档_id
medical.setCreated_date(new Date());
medicals.add(medical);
//继续添加其他数据
return medicals;

}
//构建体检结果数据集合
private List<Exam> buildExams() {
List<Exam> exams = new ArrayList<Exam>();
Exam exam = new Exam();
exam.setParty_id("1");//设置父文档id-基本信息文档_id
exams.add(exam);
//继续添加其他数据
return exams;
}
//构建结果诊断数据集合
private List<Diagnosis> buildDiagnosiss() {
List<Diagnosis> diagnosiss = new ArrayList<Diagnosis>();
Diagnosis diagnosis = new Diagnosis();
diagnosis.setParty_id("1");//设置父文档id-基本信息文档_id
diagnosiss.add(diagnosis);
//继续添加其他数据
return diagnosiss;
}

  • 通过json报文批量导入测试数据

除了通过addDocuments录入数据,还可以通过json报文批量导入数据
在配置文件esmapper/Client_Info.xml增加以下内容:
    <!--
导入基本信息:
-->
<property name="bulkImportBasicData" trim="false">
<![CDATA[
{ "index": { "_id": "1" }}
{ "party_id":"1", "sex":"男", "mari_sts":"不详", "ethnic":"蒙古族", "prof":"放牧","birth_date":"1966-2-14 00:00:00", "province":"内蒙古", "city":"赤峰市","client_type":"1", "client_name":"安", "age":52,"id_type":"1", "idno":"1", "education":"初中","created_date":"2013-04-24 00:00:00","last_modified_date":"2013-04-24 00:00:00", "etl_date":"2013-04-24 00:00:00"}
{ "index": { "_id": "2" }}
{ "party_id":"2", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"公务员","birth_date":"1986-07-06 00:00:00", "province":"广东", "city":"深圳","client_type":"1", "client_name":"彭", "age":32,"id_type":"1", "idno":"2", "education":"本科", "created_date":"2013-05-09 15:49:47","last_modified_date":"2013-05-09 15:49:47", "etl_date":"2013-05-09 15:49:47"}
{ "index": { "_id": "3" }}
{ "party_id":"3", "sex":"男", "mari_sts":"未婚", "ethnic":"汉族", "prof":"无业","birth_date":"2000-08-15 00:00:00", "province":"广东", "city":"佛山","client_type":"1", "client_name":"浩", "age":18,"id_type":"1", "idno":"3", "education":"高中", "created_date":"2014-09-01 09:49:27","last_modified_date":"2014-09-01 09:49:27", "etl_date":"2014-09-01 09:49:27" }
{ "index": { "_id": "4" }}
{ "party_id":"4", "sex":"女", "mari_sts":"未婚", "ethnic":"满族", "prof":"工人","birth_date":"1996-03-14 00:00:00", "province":"江苏", "city":"扬州","client_type":"1", "client_name":"慧", "age":22,"id_type":"1", "idno":"4", "education":"高中", "created_date":"2014-09-16 09:30:37","last_modified_date":"2014-09-16 09:30:37", "etl_date":"2014-09-16 09:30:37" }
{ "index": { "_id": "5" }}
{ "party_id":"5", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"教师","birth_date":"1983-08-14 00:00:00", "province":"宁夏", "city":"灵武","client_type":"1", "client_name":"英", "age":35,"id_type":"1", "idno":"5", "education":"本科", "created_date":"2015-09-16 09:30:37","last_modified_date":"2015-09-16 09:30:37", "etl_date":"2015-09-16 09:30:37" }
{ "index": { "_id": "6" }}
{ "party_id":"6", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"工人","birth_date":"1959-07-04 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"岭", "age":59,"id_type":"1", "idno":"6", "education":"小学", "created_date":"2015-09-01 09:49:27","last_modified_date":"2015-09-01 09:49:27", "etl_date":"2015-09-01 09:49:27" }
{ "index": { "_id": "7" }}
{ "party_id":"7", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"1999-02-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"欣", "age":19,"id_type":"1", "idno":"7", "education":"高中", "created_date":"2016-12-01 09:49:27","last_modified_date":"2016-12-01 09:49:27", "etl_date":"2016-12-01 09:49:27" }
{ "index": { "_id": "8" }}
{ "party_id":"8", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"2007-11-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"梅", "age":10,"id_type":"1", "idno":"8", "education":"小学", "created_date":"2016-11-21 09:49:27","last_modified_date":"2016-11-21 09:49:27", "etl_date":"2016-11-21 09:49:27" }
{ "index": { "_id": "9" }}
{ "party_id":"9", "sex":"男", "mari_sts":"不详", "ethnic":"回族", "prof":"个体户","birth_date":"1978-03-29 00:00:00", "province":"北京", "city":"北京","client_type":"1", "client_name":"磊", "age":40,"id_type":"1", "idno":"9", "education":"高中", "created_date":"2017-09-01 09:49:27","last_modified_date":"2017-09-01 09:49:27", "etl_date":"2017-09-01 09:49:27" }
{ "index": { "_id": "10" }}
{ "party_id":"10", "sex":"男", "mari_sts":"已婚", "ethnic":"汉族", "prof":"农民","birth_date":"1970-11-14 00:00:00", "province":"浙江", "city":"台州","client_type":"1", "client_name":"强", "age":47,"id_type":"1", "idno":"10", "education":"初中", "created_date":"2018-09-01 09:49:27","last_modified_date":"2018-09-01 09:49:27", "etl_date":"2018-09-01 09:49:27" }
]]>
</property>
<!--
导入诊断信息
-->
<property name="bulkImportDiagnosisData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"J31.0", "sd_disease_name":"鼻炎","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"E78.1", "sd_disease_name":"甘油三脂增高","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "provider":"江苏医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"H44", "sd_disease_name":"眼疾","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2017-04-08 10:42:18", "last_modified_date":"2017-04-08 10:42:18", "etl_date":"2017-04-08 10:42:18" }

{ "index": { "parent": "8" }}
{ "party_id":"8", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "provider":"朝阳医院", "subject":"","diagnosis_type":"","icd10_code":"A03.901", "sd_disease_name":"急性细菌性痢疾","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }
]]>
</property>

<!--
导入医疗信息
-->
<property name="bulkImportMedicalData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"氟化钠", "medical_sale_name":"", "medical_code":"A01AA01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-05-31 00:00:00", "last_modified_date":"2016-05-31 00:00:00", "etl_date":"2016-05-31 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"", "medical_sale_name":"盐酸多西环素胶丸", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-03-18 00:00:00", "last_modified_date":"2016-03-18 00:00:00", "etl_date":"2016-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸多西环素分散片", "medical_sale_name":"", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"肾上腺素", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"诺氟沙星胶囊", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸异丙肾上腺素片", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"甲硝唑栓", "medical_sale_name":"", "medical_code":"A01AB17", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-06-08 10:42:18", "last_modified_date":"2018-06-08 10:42:18", "etl_date":"2018-06-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hos_name_yb":"朝阳医院", "eivisions_name":"", "medical_type":"","medical_common_name":"复方克霉唑乳膏", "medical_sale_name":"", "medical_code":"A01AB18", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44"}
]]>
</property>

<!--
导入体检信息
-->
<property name="bulkImportExamData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"高血压","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "2" }}
{ "party_id":"2", "hospital":"", "dept":"", "is_ok":"Y", "exam_result":"轻度脂肪肝","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "3" }}
{ "party_id":"3", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"急性细菌性痢疾","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "5" }}
{ "party_id":"5", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "8" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "10" }}
{ "party_id":"10", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
]]>
</property>







通过bboss提供的通用api,导入上面定义的数据:
	/**
* 通过读取配置文件中的dsl json数据导入医疗数据
*/
public void importClientInfoFromJsonData(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");

clientUtil.executeHttp("client_info/basic/_bulk?refresh","bulkImportBasicData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/diagnosis/_bulk?refresh","bulkImportDiagnosisData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/medical/_bulk?refresh","bulkImportMedicalData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/exam/_bulk?refresh","bulkImportExamData",ClientUtil.HTTP_POST);
统计导入的数据
		
long basiccount = clientUtil.countAll("client_info/basic");
System.out.println(basiccount);
long medicalcount = clientUtil.countAll("client_info/medical");
System.out.println(medicalcount);
long examcount = clientUtil.countAll("client_info/exam");
System.out.println(examcount);
long diagnosiscount = clientUtil.countAll("client_info/diagnosis");
System.out.println(diagnosiscount);
}
4.父子关系查询-Elasticsearch 5.x 中has_child和has_parent查询的基本用法​
  • 根据父查子-通过客户名称信息查询客户端体检结果

在配置文件esmapper/Client_Info.xml增加dsl语句:queryExamSearchByClientName
   <!--根据客户名称查询客户体检报告-->
<property name="queryExamSearchByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
}
}
}
}
]]>
</property>

 执行查询,通过bboss的searchList 方法获取符合条件的体检报告以及总记录数据,返回size对应的1000条数据
	/**
* 根据客户名称查询客户体检报告
*/
public void queryExamSearchByClientName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);
ESDatas<Exam> exams = clientUtil.searchList("client_info/exam/_search","queryExamSearchByClientName",params,Exam.class);
List<Exam> examList = exams.getDatas();//获取符合条件的体检数据
long totalSize = exams.getTotalSize();//符合条件的总记录数据
}
 
  • 根据子查父数据-通过医疗信息编码查找客户基本数据

在配置文件esmapper/Client_Info.xml增加查询dsl语句:queryClientInfoByMedicalName
    <!--通过医疗信息编码查找客户基本数据-->
<property name="queryClientInfoByMedicalName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_child": {
"type": "medical",
"score_mode": "max",
"query": {
"match": {
"medical_code": #[medicalCode] ## 通过变量medicalCode设置医疗编码
}
}
}
}
}
]]>
</property>
执行查询,通过bboss的searchList 方法获取符合条件的客户端基本信息以及总记录数据
	/**
* 通过医疗信息编码查找客户基本数据
*/
public void queryClientInfoByMedicalName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("medicalCode","A01AA01"); //通过变量medicalCode设置医疗编码
params.put("size",1000); //最多返回size变量对应的记录条数
ESDatas<Basic> bascis = clientUtil.searchList("client_info/basic/_search","queryClientInfoByMedicalName",params,Basic.class);
List<Basic> bascisList = bascis.getDatas();//获取符合条件的客户信息
long totalSize = bascis.getTotalSize();
}
5.同时返回父子数据-Elasticsearch 5.x 中如何在检索中同时返回父子数据
这一节中我们介绍同时返回父子数据的玩法 :inner_hits的妙用
  • 根据父条件查询所有子数据集合并返回父数据,根据客户名称查询所有体检数据,同时返回客户信息

在配置文件esmapper/Client_Info.xml增加检索dsl-queryDiagnosisByClientName
    <!--根据客户名称获取客户体检诊断数据,并返回客户信息-->
<property name="queryDiagnosisByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
},
"inner_hits": {} ## 通过变量inner_hits表示要返回对应的客户信息
}
}
}
]]>
</property>
执行检索并遍历结果
	/**
* 根据客户名称获取客户体检诊断数据,并返回客户数据
*/
public void queryDiagnosisByClientName(){

ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);

try {
ESInnerHitSerialThreadLocal.setESInnerTypeReferences(Basic.class);//指定inner查询结果对应的客户基本信息类型,Basic只有一个文档类型,索引不需要显示指定basic对应的mapping type名称
ESDatas<Diagnosis> diagnosiss = clientUtil.searchList("client_info/diagnosis/_search",
"queryDiagnosisByClientName",params,Diagnosis.class);
List<Diagnosis> diagnosisList = diagnosiss.getDatas();//获取符合条件的体检报告数据
long totalSize = diagnosiss.getTotalSize();
//遍历诊断报告信息,并查看报告对应的客户基本信息
for(int i = 0; diagnosisList != null && i < diagnosisList.size(); i ++) {
Diagnosis diagnosis = diagnosisList.get(i);
List<Basic> basics = ResultUtil.getInnerHits(diagnosis.getInnerHits(), "basic");
if(basics != null) {
System.out.println(basics.size());
}
}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对应的客户基本信息类型
}
}

  •  根据子条件查询父数据并返回符合条件的父的子数据集合,查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录

在配置文件esmapper/Client_Info.xml增加检索dsl-queryClientAndAllSons
    <!--查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录-->
<property name="queryClientAndAllSons">
<![CDATA[
{
"query": {
"bool": {
"should": [
{
"match_all":{}
}
]
,"must": [
{
"has_child": {
"score_mode": "none",
"type": "diagnosis"
,"query": {
"bool": {
"must": [
{
"term": {
"icd10_code": {
"value": "J00"
}
}
}
]
}
},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"score_mode": "none",
"type": "medical"
,"query": {
"match_all": {}

},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"type": "exam",
"query": {
"match_all": {}
},"inner_hits":{}
}
}
]
}
}
}
]]>
</property>
执行查询:
	/**
* 查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
*/
public void queryClientAndAllSons(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
Map<String,Object> params = null;//没有检索条件,构造一个空的参数对象

try {
//设置子文档的类型和对象映射关系
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("exam",Exam.class);//指定inner查询结果对于exam类型和对应的对象类型Exam
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("diagnosis",Diagnosis.class);//指定inner查询结果对于diagnosis类型和对应的对象类型Diagnosis
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("medical",Medical.class);//指定inner查询结果对于medical类型和对应的对象类型Medical
ESDatas<Basic> escompanys = clientUtil.searchList("client_info/basic/_search",
"queryClientAndAllSons",params,Basic.class);
//String response = clientUtil.executeRequest("client_info/basic/_search","queryClientAndAllSons",params);直接获取原始的json报文
// escompanys = clientUtil.searchAll("client_info",Basic.class);
long totalSize = escompanys.getTotalSize();
List<Basic> clientInfos = escompanys.getDatas();//获取符合条件的数据
//查看公司下面的雇员信息(符合检索条件的雇员信息)
for (int i = 0; clientInfos != null && i < clientInfos.size(); i++) {
Basic clientInfo = clientInfos.get(i);
List<Exam> exams = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "exam");
if(exams != null)
System.out.println(exams.size());
List<Diagnosis> diagnosiss = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "diagnosis");
if(diagnosiss != null)
System.out.println(diagnosiss.size());
List<Medical> medicals = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "medical");
if(medicals != null)
System.out.println(medicals.size());

}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对于各种类型信息
}
}
最后我们按顺序执行所有方法,验证功能:
	@Test
public void testMutil(){
this.createClientIndice();//创建indice client_info
// this.importClientInfoDataFromBeans(); //通过api添加测试数据
this.importClientInfoFromJsonData();//导入测试数据
this.queryExamSearchByClientName(); //根据客户端名称查询提交报告
this.queryClientInfoByMedicalName();//通过医疗信息编码查找客户基本数据
this.queryDiagnosisByClientName();//根据客户名称获取客户体检诊断数据,并返回客户数据
this.queryClientAndAllSons();//查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
}
可以下载完整的demo工程运行本文中的测试用例方法,地址见相关资料。
到此Elasticsearch 5.x 父子关系维护检索实战介绍完毕,谢谢大家!

相关资料
完整demo工程  https://github.com/bbossgroups/eshelloword-booter
对应的类文件和配置文件
org.bboss.elasticsearchtest.parentchild.ParentChildTest
esmapper/Client_Info.xml
 
开发交流
bboss交流群 166471282
bboss公众号
getqrcode.jpg

 
敬请关注:父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检
继续阅读 »
本次分享包括两篇文章
  • 父子关系维护检索实战一 Elasticsearch 5.x 父子关系维护检索实战
  • 父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检索实战

本文是其中第一篇- Elasticsearch 5.x 父子关系维护检索实战,涵盖以下部分内容:
  1. Elasticsearch 5.x 中父子关系mapping结构设计
  2. Elasticsearch 5.x 中维护父子关系数据
  3. Elasticsearch 5.x 中has_child和has_parent查询的基本用法
  4. Elasticsearch 5.x 中如何在检索中同时返回父子数据

案例说明
以一个体检记录相关的数据来介绍本文涉及的相关功能,体检数据包括客户基本信息basic和客户医疗记录medical、客户体检记录exam、客户体检结果分析记录diagnosis,它们之间的关系图如下:
parent.png


我们采用Elasticsearch java客户端 bboss-elastic 来实现本文相关功能。

1.准备工作
参考文档《高性能elasticsearch ORM开发库使用介绍》导入和配置bboss客户端

2.定义mapping结构-Elasticsearch 5.x 中父子关系mapping结构设计
Elasticsearch 5.x中一个indice mapping支持多个mapping type,通过在子类型mapping中指定父类型的mapping type名字来设置父子关系,例如:
父类型
"basic": {
....
}
子类型:
"medical": { 
      "_parent": { "type": "basic" },
     .................
}
新建dsl配置文件-esmapper/Client_Info.xml,定义完整的mapping结构:createClientIndice
<properties>

<!--
创建客户信息索引索引表
-->
<property name="createClientIndice">
<![CDATA[{
"settings": {
"number_of_shards": 6,
"index.refresh_interval": "5s"
},
"mappings": {
"basic": { ##基本信息
"properties": {
"party_id": {
"type": "keyword"
},
"sex": {
"type": "keyword"
},
"mari_sts": {
"type": "keyword"
},
"ethnic": {
"type": "text"
},
"prof": {
"type": "text"
},
"province": {
"type": "text"
},
"city": {
"type": "text"
},
"client_type": {
"type": "keyword"
},
"client_name": {
"type": "text"
},
"age": {
"type": "integer"
},
"id_type": {
"type": "keyword"
},
"idno": {
"type": "keyword"
},
"education": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"diagnosis": { ##结果分析
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"provider": {
"type": "text"
},
"subject": {
"type": "text"
},
"diagnosis_type": {
"type": "text"
},
"icd10_code": {
"type": "text",
"type": "keyword"
},
"sd_disease_name": {
"type": "text",
"type": "keyword"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"medical": { ##医疗情况
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hos_name_yb": {
"type": "text"
},
"eivisions_name": {
"type": "text"
},
"medical_type": {
"type": "text"
},
"medical_common_name": {
"type": "text"
},
"medical_sale_name": {
"type": "text"
},
"medical_code": {
"type": "text"
},
"specification": {
"type": "text"
},
"usage_num": {
"type": "text"
},
"unit": {
"type": "text"
},
"usage_times": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"exam": { ##检查结果
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hospital": {
"type": "text"
},
"dept": {
"type": "text"
},
"is_ok": {
"type": "text"
},
"exam_result": {
"type": "text"
},
"fld1": {
"type": "text"
},
"fld2": {
"type": "text"
},
"fld3": {
"type": "text"
},
"fld4": {
"type": "text"
},
"fld5": {
"type": "text"
},
"fld901": {
"type": "text"
},
"fld6": {
"type": "text"
},
"fld902": {
"type": "text"
},
"fld14": {
"type": "text"
},
"fld20": {
"type": "text"
},
"fld21": {
"type": "text"
},
"fld23": {
"type": "text"
},
"fld24": {
"type": "text"
},
"fld65": {
"type": "text"
},
"fld66": {
"type": "text"
},
"fld67": {
"type": "text"
},
"fld68": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
}
}]]>
</property>
</properties>

这个mapping中定义了4个索引类型:basic,exam,medical,diagnosis,其中basic是其他类型的父类型。
通过bboss客户端创建名称为client_info 的索引:
	public void createClientIndice(){
//定义客户端实例,加载上面建立的dsl配置文件
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
try {
//client_info存在返回true,不存在返回false
boolean exist = clientUtil.existIndice("client_info");

//如果索引表client_info已经存在先删除mapping
if(exist) {//先删除mapping client_info
clientUtil.dropIndice("client_info");
}
} catch (ElasticSearchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//创建mapping client_info
clientUtil.createIndiceMapping("client_info","createClientIndice");
String client_info = clientUtil.getIndice("client_info");//获取最新建立的索引表结构client_info
System.out.println("after createClientIndice clientUtil.getIndice(\"client_info\") response:"+client_info);
}


3.维护父子关系数据-Elasticsearch 5.x 中维护父子关系数据
  • 定义对象

首先定义四个对象,分别对应mapping中的四个索引类型,篇幅关系只列出主要属性
  • Basic
  • Medical
  • Exam
  • Diagnosis

通过注解@ESId指定基本信息文档_id
public class Basic extends ESBaseData {
/**
* 索引_id
*/
@ESId
private String party_id;
private String sex; // 性别
......
}
通过注解@ESParentId指定Medical关联的基本信息文档_id,Medical文档_id由ElasticSearch自动生成
public class Medical extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hos_name_yb; //就诊医院
...
}
通过注解@ESParentId指定Exam关联的基本信息文档_id,Exam文档_id由ElasticSearch自动生成
public class Exam extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hospital; // 就诊医院
....
}
通过注解@ESParentId指定Diagnosis关联的基本信息文档_id,Diagnosis文档_id由ElasticSearch自动生成
public class Diagnosis extends ESBaseData {
@ESParentId
private String party_id; //父id
private String provider; //诊断医院
private String subject; //科室
......
}

  • 通过api维护测试数据

对象定义好了后,通过bboss客户数据到之前建立好的索引client_info中。
	/**
* 录入体检医疗信息
*/
public void importClientInfoDataFromBeans() {
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();

//导入基本信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Basic> basics = buildBasics();
clientUtil.addDocuments("client_info","basic",basics,"refresh");

//导入医疗信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Medical> medicals = buildMedicals();
clientUtil.addDocuments("client_info","medical",medicals,"refresh");

//导入体检结果数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Exam> exams = buildExams();
clientUtil.addDocuments("client_info","exam",exams,"refresh");

//导入结果诊断数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Diagnosis> diagnosiss = buildDiagnosiss();
clientUtil.addDocuments("client_info","diagnosis",diagnosiss,"refresh");
}
//构建基本信息集合
private List<Basic> buildBasics() {
List<Basic> basics = new ArrayList<Basic>();
Basic basic = new Basic();
basic.setParty_id("1");
basic.setAge(60);
basics.add(basic);
//继续添加其他数据
return basics;

}
//
构建医疗信息集合
private List<Medical> buildMedicals() {
List<Medical> medicals = new ArrayList<Medical>();
Medical medical = new Medical();
medical.setParty_id("1");//设置父文档id-基本信息文档_id
medical.setCreated_date(new Date());
medicals.add(medical);
//继续添加其他数据
return medicals;

}
//构建体检结果数据集合
private List<Exam> buildExams() {
List<Exam> exams = new ArrayList<Exam>();
Exam exam = new Exam();
exam.setParty_id("1");//设置父文档id-基本信息文档_id
exams.add(exam);
//继续添加其他数据
return exams;
}
//构建结果诊断数据集合
private List<Diagnosis> buildDiagnosiss() {
List<Diagnosis> diagnosiss = new ArrayList<Diagnosis>();
Diagnosis diagnosis = new Diagnosis();
diagnosis.setParty_id("1");//设置父文档id-基本信息文档_id
diagnosiss.add(diagnosis);
//继续添加其他数据
return diagnosiss;
}

  • 通过json报文批量导入测试数据

除了通过addDocuments录入数据,还可以通过json报文批量导入数据
在配置文件esmapper/Client_Info.xml增加以下内容:
    <!--
导入基本信息:
-->
<property name="bulkImportBasicData" trim="false">
<![CDATA[
{ "index": { "_id": "1" }}
{ "party_id":"1", "sex":"男", "mari_sts":"不详", "ethnic":"蒙古族", "prof":"放牧","birth_date":"1966-2-14 00:00:00", "province":"内蒙古", "city":"赤峰市","client_type":"1", "client_name":"安", "age":52,"id_type":"1", "idno":"1", "education":"初中","created_date":"2013-04-24 00:00:00","last_modified_date":"2013-04-24 00:00:00", "etl_date":"2013-04-24 00:00:00"}
{ "index": { "_id": "2" }}
{ "party_id":"2", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"公务员","birth_date":"1986-07-06 00:00:00", "province":"广东", "city":"深圳","client_type":"1", "client_name":"彭", "age":32,"id_type":"1", "idno":"2", "education":"本科", "created_date":"2013-05-09 15:49:47","last_modified_date":"2013-05-09 15:49:47", "etl_date":"2013-05-09 15:49:47"}
{ "index": { "_id": "3" }}
{ "party_id":"3", "sex":"男", "mari_sts":"未婚", "ethnic":"汉族", "prof":"无业","birth_date":"2000-08-15 00:00:00", "province":"广东", "city":"佛山","client_type":"1", "client_name":"浩", "age":18,"id_type":"1", "idno":"3", "education":"高中", "created_date":"2014-09-01 09:49:27","last_modified_date":"2014-09-01 09:49:27", "etl_date":"2014-09-01 09:49:27" }
{ "index": { "_id": "4" }}
{ "party_id":"4", "sex":"女", "mari_sts":"未婚", "ethnic":"满族", "prof":"工人","birth_date":"1996-03-14 00:00:00", "province":"江苏", "city":"扬州","client_type":"1", "client_name":"慧", "age":22,"id_type":"1", "idno":"4", "education":"高中", "created_date":"2014-09-16 09:30:37","last_modified_date":"2014-09-16 09:30:37", "etl_date":"2014-09-16 09:30:37" }
{ "index": { "_id": "5" }}
{ "party_id":"5", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"教师","birth_date":"1983-08-14 00:00:00", "province":"宁夏", "city":"灵武","client_type":"1", "client_name":"英", "age":35,"id_type":"1", "idno":"5", "education":"本科", "created_date":"2015-09-16 09:30:37","last_modified_date":"2015-09-16 09:30:37", "etl_date":"2015-09-16 09:30:37" }
{ "index": { "_id": "6" }}
{ "party_id":"6", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"工人","birth_date":"1959-07-04 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"岭", "age":59,"id_type":"1", "idno":"6", "education":"小学", "created_date":"2015-09-01 09:49:27","last_modified_date":"2015-09-01 09:49:27", "etl_date":"2015-09-01 09:49:27" }
{ "index": { "_id": "7" }}
{ "party_id":"7", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"1999-02-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"欣", "age":19,"id_type":"1", "idno":"7", "education":"高中", "created_date":"2016-12-01 09:49:27","last_modified_date":"2016-12-01 09:49:27", "etl_date":"2016-12-01 09:49:27" }
{ "index": { "_id": "8" }}
{ "party_id":"8", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"2007-11-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"梅", "age":10,"id_type":"1", "idno":"8", "education":"小学", "created_date":"2016-11-21 09:49:27","last_modified_date":"2016-11-21 09:49:27", "etl_date":"2016-11-21 09:49:27" }
{ "index": { "_id": "9" }}
{ "party_id":"9", "sex":"男", "mari_sts":"不详", "ethnic":"回族", "prof":"个体户","birth_date":"1978-03-29 00:00:00", "province":"北京", "city":"北京","client_type":"1", "client_name":"磊", "age":40,"id_type":"1", "idno":"9", "education":"高中", "created_date":"2017-09-01 09:49:27","last_modified_date":"2017-09-01 09:49:27", "etl_date":"2017-09-01 09:49:27" }
{ "index": { "_id": "10" }}
{ "party_id":"10", "sex":"男", "mari_sts":"已婚", "ethnic":"汉族", "prof":"农民","birth_date":"1970-11-14 00:00:00", "province":"浙江", "city":"台州","client_type":"1", "client_name":"强", "age":47,"id_type":"1", "idno":"10", "education":"初中", "created_date":"2018-09-01 09:49:27","last_modified_date":"2018-09-01 09:49:27", "etl_date":"2018-09-01 09:49:27" }
]]>
</property>
<!--
导入诊断信息
-->
<property name="bulkImportDiagnosisData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"J31.0", "sd_disease_name":"鼻炎","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"E78.1", "sd_disease_name":"甘油三脂增高","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "provider":"江苏医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"H44", "sd_disease_name":"眼疾","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2017-04-08 10:42:18", "last_modified_date":"2017-04-08 10:42:18", "etl_date":"2017-04-08 10:42:18" }

{ "index": { "parent": "8" }}
{ "party_id":"8", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "provider":"朝阳医院", "subject":"","diagnosis_type":"","icd10_code":"A03.901", "sd_disease_name":"急性细菌性痢疾","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }
]]>
</property>

<!--
导入医疗信息
-->
<property name="bulkImportMedicalData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"氟化钠", "medical_sale_name":"", "medical_code":"A01AA01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-05-31 00:00:00", "last_modified_date":"2016-05-31 00:00:00", "etl_date":"2016-05-31 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"", "medical_sale_name":"盐酸多西环素胶丸", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-03-18 00:00:00", "last_modified_date":"2016-03-18 00:00:00", "etl_date":"2016-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸多西环素分散片", "medical_sale_name":"", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"肾上腺素", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"诺氟沙星胶囊", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸异丙肾上腺素片", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"甲硝唑栓", "medical_sale_name":"", "medical_code":"A01AB17", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-06-08 10:42:18", "last_modified_date":"2018-06-08 10:42:18", "etl_date":"2018-06-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hos_name_yb":"朝阳医院", "eivisions_name":"", "medical_type":"","medical_common_name":"复方克霉唑乳膏", "medical_sale_name":"", "medical_code":"A01AB18", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44"}
]]>
</property>

<!--
导入体检信息
-->
<property name="bulkImportExamData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"高血压","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "2" }}
{ "party_id":"2", "hospital":"", "dept":"", "is_ok":"Y", "exam_result":"轻度脂肪肝","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "3" }}
{ "party_id":"3", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"急性细菌性痢疾","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "5" }}
{ "party_id":"5", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "8" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "10" }}
{ "party_id":"10", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
]]>
</property>







通过bboss提供的通用api,导入上面定义的数据:
	/**
* 通过读取配置文件中的dsl json数据导入医疗数据
*/
public void importClientInfoFromJsonData(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");

clientUtil.executeHttp("client_info/basic/_bulk?refresh","bulkImportBasicData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/diagnosis/_bulk?refresh","bulkImportDiagnosisData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/medical/_bulk?refresh","bulkImportMedicalData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/exam/_bulk?refresh","bulkImportExamData",ClientUtil.HTTP_POST);
统计导入的数据
		
long basiccount = clientUtil.countAll("client_info/basic");
System.out.println(basiccount);
long medicalcount = clientUtil.countAll("client_info/medical");
System.out.println(medicalcount);
long examcount = clientUtil.countAll("client_info/exam");
System.out.println(examcount);
long diagnosiscount = clientUtil.countAll("client_info/diagnosis");
System.out.println(diagnosiscount);
}
4.父子关系查询-Elasticsearch 5.x 中has_child和has_parent查询的基本用法​
  • 根据父查子-通过客户名称信息查询客户端体检结果

在配置文件esmapper/Client_Info.xml增加dsl语句:queryExamSearchByClientName
   <!--根据客户名称查询客户体检报告-->
<property name="queryExamSearchByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
}
}
}
}
]]>
</property>

 执行查询,通过bboss的searchList 方法获取符合条件的体检报告以及总记录数据,返回size对应的1000条数据
	/**
* 根据客户名称查询客户体检报告
*/
public void queryExamSearchByClientName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);
ESDatas<Exam> exams = clientUtil.searchList("client_info/exam/_search","queryExamSearchByClientName",params,Exam.class);
List<Exam> examList = exams.getDatas();//获取符合条件的体检数据
long totalSize = exams.getTotalSize();//符合条件的总记录数据
}
 
  • 根据子查父数据-通过医疗信息编码查找客户基本数据

在配置文件esmapper/Client_Info.xml增加查询dsl语句:queryClientInfoByMedicalName
    <!--通过医疗信息编码查找客户基本数据-->
<property name="queryClientInfoByMedicalName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_child": {
"type": "medical",
"score_mode": "max",
"query": {
"match": {
"medical_code": #[medicalCode] ## 通过变量medicalCode设置医疗编码
}
}
}
}
}
]]>
</property>
执行查询,通过bboss的searchList 方法获取符合条件的客户端基本信息以及总记录数据
	/**
* 通过医疗信息编码查找客户基本数据
*/
public void queryClientInfoByMedicalName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("medicalCode","A01AA01"); //通过变量medicalCode设置医疗编码
params.put("size",1000); //最多返回size变量对应的记录条数
ESDatas<Basic> bascis = clientUtil.searchList("client_info/basic/_search","queryClientInfoByMedicalName",params,Basic.class);
List<Basic> bascisList = bascis.getDatas();//获取符合条件的客户信息
long totalSize = bascis.getTotalSize();
}
5.同时返回父子数据-Elasticsearch 5.x 中如何在检索中同时返回父子数据
这一节中我们介绍同时返回父子数据的玩法 :inner_hits的妙用
  • 根据父条件查询所有子数据集合并返回父数据,根据客户名称查询所有体检数据,同时返回客户信息

在配置文件esmapper/Client_Info.xml增加检索dsl-queryDiagnosisByClientName
    <!--根据客户名称获取客户体检诊断数据,并返回客户信息-->
<property name="queryDiagnosisByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
},
"inner_hits": {} ## 通过变量inner_hits表示要返回对应的客户信息
}
}
}
]]>
</property>
执行检索并遍历结果
	/**
* 根据客户名称获取客户体检诊断数据,并返回客户数据
*/
public void queryDiagnosisByClientName(){

ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);

try {
ESInnerHitSerialThreadLocal.setESInnerTypeReferences(Basic.class);//指定inner查询结果对应的客户基本信息类型,Basic只有一个文档类型,索引不需要显示指定basic对应的mapping type名称
ESDatas<Diagnosis> diagnosiss = clientUtil.searchList("client_info/diagnosis/_search",
"queryDiagnosisByClientName",params,Diagnosis.class);
List<Diagnosis> diagnosisList = diagnosiss.getDatas();//获取符合条件的体检报告数据
long totalSize = diagnosiss.getTotalSize();
//遍历诊断报告信息,并查看报告对应的客户基本信息
for(int i = 0; diagnosisList != null && i < diagnosisList.size(); i ++) {
Diagnosis diagnosis = diagnosisList.get(i);
List<Basic> basics = ResultUtil.getInnerHits(diagnosis.getInnerHits(), "basic");
if(basics != null) {
System.out.println(basics.size());
}
}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对应的客户基本信息类型
}
}

  •  根据子条件查询父数据并返回符合条件的父的子数据集合,查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录

在配置文件esmapper/Client_Info.xml增加检索dsl-queryClientAndAllSons
    <!--查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录-->
<property name="queryClientAndAllSons">
<![CDATA[
{
"query": {
"bool": {
"should": [
{
"match_all":{}
}
]
,"must": [
{
"has_child": {
"score_mode": "none",
"type": "diagnosis"
,"query": {
"bool": {
"must": [
{
"term": {
"icd10_code": {
"value": "J00"
}
}
}
]
}
},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"score_mode": "none",
"type": "medical"
,"query": {
"match_all": {}

},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"type": "exam",
"query": {
"match_all": {}
},"inner_hits":{}
}
}
]
}
}
}
]]>
</property>
执行查询:
	/**
* 查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
*/
public void queryClientAndAllSons(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
Map<String,Object> params = null;//没有检索条件,构造一个空的参数对象

try {
//设置子文档的类型和对象映射关系
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("exam",Exam.class);//指定inner查询结果对于exam类型和对应的对象类型Exam
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("diagnosis",Diagnosis.class);//指定inner查询结果对于diagnosis类型和对应的对象类型Diagnosis
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("medical",Medical.class);//指定inner查询结果对于medical类型和对应的对象类型Medical
ESDatas<Basic> escompanys = clientUtil.searchList("client_info/basic/_search",
"queryClientAndAllSons",params,Basic.class);
//String response = clientUtil.executeRequest("client_info/basic/_search","queryClientAndAllSons",params);直接获取原始的json报文
// escompanys = clientUtil.searchAll("client_info",Basic.class);
long totalSize = escompanys.getTotalSize();
List<Basic> clientInfos = escompanys.getDatas();//获取符合条件的数据
//查看公司下面的雇员信息(符合检索条件的雇员信息)
for (int i = 0; clientInfos != null && i < clientInfos.size(); i++) {
Basic clientInfo = clientInfos.get(i);
List<Exam> exams = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "exam");
if(exams != null)
System.out.println(exams.size());
List<Diagnosis> diagnosiss = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "diagnosis");
if(diagnosiss != null)
System.out.println(diagnosiss.size());
List<Medical> medicals = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "medical");
if(medicals != null)
System.out.println(medicals.size());

}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对于各种类型信息
}
}
最后我们按顺序执行所有方法,验证功能:
	@Test
public void testMutil(){
this.createClientIndice();//创建indice client_info
// this.importClientInfoDataFromBeans(); //通过api添加测试数据
this.importClientInfoFromJsonData();//导入测试数据
this.queryExamSearchByClientName(); //根据客户端名称查询提交报告
this.queryClientInfoByMedicalName();//通过医疗信息编码查找客户基本数据
this.queryDiagnosisByClientName();//根据客户名称获取客户体检诊断数据,并返回客户数据
this.queryClientAndAllSons();//查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
}
可以下载完整的demo工程运行本文中的测试用例方法,地址见相关资料。
到此Elasticsearch 5.x 父子关系维护检索实战介绍完毕,谢谢大家!

相关资料
完整demo工程  https://github.com/bbossgroups/eshelloword-booter
对应的类文件和配置文件
org.bboss.elasticsearchtest.parentchild.ParentChildTest
esmapper/Client_Info.xml
 
开发交流
bboss交流群 166471282
bboss公众号
getqrcode.jpg

 
敬请关注:父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检 收起阅读 »

logstash input插件开发

logstash作为一个数据管道中间件,支持对各种类型数据的采集与转换,并将数据发送到各种类型的存储库,比如实现消费kafka数据并且写入到Elasticsearch, 日志文件同步到对象存储S3等,mysql数据同步到Elasticsearch等。

logstash内部主要包含三个模块:

* input: 从数据源获取数据
* filter: 过滤、转换数据
* output: 输出数据

不同类型的数据都可以通过对应的input-plugin, output-plugin完成数据的输入与输出。如需要消费kafka中的数据并写入到Elasticsearch中,则需要使用logstash的kafka-input-plugin完成数据输入,logstash-output-elasticsearch完成数据输出。如果需要对输入数据进行过滤或者转换,比如根据关键词过滤掉不需要的内容,或者时间字段的格式转换,就需要又filter-plugin完成了。

logstash的input插件目前已经有几十种了,支持大多数比较通用或开源的数据源的输入。但如果公司内部开发的数据库或其它存储类的服务不能和开源产品在接口协议上兼容,比如腾讯自研的消息队列服务CMQ不依赖于其它的开源消息队列产品,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的数据;腾讯云对象存储服务COS, 在鉴权方式上和AWS的S3存在差异,也不能直接使用logstash-input-s3插件从COS中读取数据,对于这种情况,就需要自己开发logstash的input插件了。

本文以开发logstash的cos input插件为例,介绍如何开发logstash的input插件。

logstash官方提供了有个简单的input plugin example可供参考: https://github.com/logstash-plugins/logstash-input-example/

环境准备

logstash使用jruby开发,首先要配置jruby环境:

  1. 安装rvm:

    rvm是一个ruby管理器,可以安装并管理ruby环境,也可以通过命令行切换到不同的ruby版本。

    gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
    \curl -sSL https://get.rvm.io | bash -s stable
    source /etc/profile.d/rvm.sh
  2. 安装jruby

    rvm install jruby
    rvm use jruby
  3. 安装包管理工具bundle和测试工具rspec

    gem install bundle
    gem install rspec

从example开始

  1. clone logstash-input-example

    git clone https://github.com/logstash-plugins/logstash-input-example.git
  2. 将clone出来的logstash-input-example源码copy到logstash-input-cos目录,并删除.git文件夹,目的是以logstash-input-example的源码为参考进行开发,同时把需要改动名称的地方修改一下:

    mv logstash-input-example.gemspec logstash-input-cos.gemspec
    mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb
    mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb
  3. 建立的源码目录结构如图所示:

其中,重要文件的作用说明如下:

  • cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
  • cos_spec.rb: 单元测试文件,通过rspec可以对cos.rb中的代码进行测试
  • logstash-input-cos.gemspec: 类似于maven中的pom.xml文件,配置工程的版本、名称、licene,包依赖等,通过bundle命令可以下载依赖包

配置并下载依赖

因为腾讯云COS服务没有ruby sdk, 因为只能依赖其Java sdk进行开发,首先添加对cos java sdk的依赖。在logstash-input-cos.gemspec中Gem dependencies配置栏中增加以下内容:

# Gem dependencies
  s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
  s.add_runtime_dependency 'logstash-codec-plain'
  s.add_runtime_dependency 'stud', '>= 0.0.22'
  s.add_runtime_dependency 'jar-dependencies'
  s.add_development_dependency 'logstash-devutils', '1.3.6'

相比logstash-input-example.gemspec,增加了对com.qcloud:cos_api包以及jar-dependencies包的依赖,jar-dependencies用于在ruby环境中管理jar包,并且可以跟踪jar包的加载状态。

然后,在logstash-input-cos.gemspec中增加配置:

s.platform = 'java'

这样可以成功下载java依赖包,并且可以在ruby代码中直接调用java代码。

最后,执行以下命令下载依赖:

bundle install

编写代码

logstash-input-cos的代码逻辑其实比较简单,主要是通过执行定时任务,调用cos java sdk中的listObjects方法,获取到指定bucket里的数据,并在每次定时任务执行结束后设置marker保存在本地,再次执行时从marker位置获取数据,以实现数据的增量同步。

jar包的引用

因为要调用cos java sdk中的代码,先引用该jar包:

require 'cos_api-5.4.4.jar'
java_import com.qcloud.cos.COSClient;
java_import com.qcloud.cos.ClientConfig;
java_import com.qcloud.cos.auth.BasicCOSCredentials;
java_import com.qcloud.cos.auth.COSCredentials;
java_import com.qcloud.cos.exception.CosClientException;
java_import com.qcloud.cos.exception.CosServiceException;
java_import com.qcloud.cos.model.COSObjectSummary;
java_import com.qcloud.cos.model.ListObjectsRequest;
java_import com.qcloud.cos.model.ObjectListing;
java_import com.qcloud.cos.region.Region;

读取配置文件

logstash配置文件读取的代码如图所示:

config_name为cos,其它的配置项读取代码按照ruby的代码规范编写,添加类型校验与默认值,就可以从以下配置文件中读取配置项:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

实现register方法

logstash input插件必须实现另个方法:register 和run

register方法类似于初始化方法,在该方法中可以直接使用从配置文件读取并赋值的变量,完成cos client的初始化,代码如下:

    # 1 初始化用户身份信息(appid, secretId, secretKey)
    cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
    # 2 设置bucket的区域, COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224
    clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
    # 3 生成cos客户端
    @cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
    # bucket名称, 需包含appid
    bucketName = @bucket + "-"+ @appId
    @bucketName = bucketName

    @listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()
    # 设置bucket名称
    @listObjectsRequest.setBucketName(bucketName)
    # prefix表示列出的object的key以prefix开始
    @listObjectsRequest.setPrefix(@prefix)
    # 设置最大遍历出多少个对象, 一次listobject最大支持1000
    @listObjectsRequest.setMaxKeys(1000)
    @listObjectsRequest.setMarker(@markerConfig.getMarker)

示例代码中设置了@cosclient和@listObjectRequest为全局变量, 因为在run方法中会用到这两个变量。

注意在ruby中调用java代码的方式:没有变量描述符;不能直接new Object(),而只能Object.new().

实现run方法

run方法获取数据并将数据流转换成event事件

最简单的run方法为:

def run(queue)
    Stud.interval(@interval) do
      event = LogStash::Event.new("message" => @message, "host" => @host)
      decorate(event)
      queue << event
    end # loop
  end # def run

代码说明:

  • 通过Stud ruby模块执行定时任务,interval可自定义,从配置文件中读取
  • 生成event, 示例代码生成了一个包含两个字段数据的event
  • 调用decorate()方法, 给该event打上tag,如果配置的话
  • queue<<event, 将event插入到数据管道中,发送给filter处理

logstash-input-cos的run方法实现为:

def run(queue)
    @current_thread = Thread.current
    Stud.interval(@interval) do
      process(queue)
    end
end

def process(queue)
    @logger.info('Marker from: ' + @markerConfig.getMarker)

    objectListing = @cosclient.listObjects(@listObjectsRequest)
    nextMarker = objectListing.getNextMarker()
    cosObjectSummaries = objectListing.getObjectSummaries()
    cosObjectSummaries.each do |obj|
       # 文件的路径key
       key = obj.getKey()

       if stop?
         @logger.info("stop while attempting to read log file")
         break
       end
       # 根据key获取内容
       getObject(key) { |log|
         # 发送消息
         @codec.decode(log) do |event|
           decorate(event)
           queue << event
         end
       }

       #记录 marker
       @markerConfig.setMarker(key)
       @logger.info('Marker end: ' + @markerConfig.getMarker)
    end
  end

  # 获取下载输入流
 def getObject(key, &block)
    getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
    cosObject = @cosclient.getObject(getObjectRequest)
    cosObjectInput = cosObject.getObjectContent()
    buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
    while (line = buffered.readLine())
      block.call(line)
    end
  end

测试代码

在spec/inputs/cos_spec.rb中增加如下测试代码:

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"

describe LogStash::Inputs::Cos do

  it_behaves_like "an interruptible input plugin" do
    let(:config) { {
        "endpoint" => 'cos.ap-guangzhou.myqcloud.com',
        "access_key_id" => '*',
        "access_key_secret" => '*',
        "bucket" => '*',
         "region" => 'ap-guangzhou',
         "appId" => '*',
        "interval" => 60 } }
  end
end

rspec是一个ruby测试库,通过bundle命令执行rspec:

bundle exec rspec

如果cos.rb中的代码没有语法或运行时错误,则会出现如果信息表明测试成功:

Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures

构建并测试input-plugin-cos

build

使用gem对input-plugin-cos插件源码进行build:

gem build logstash-input-cos.gemspec

构建完成后会生成一个名为logstash-input-cos-0.0.1-java.gem的文件

test

在logstash的解压目录下,执行一下命令安装logstash-input-cos plugin:

./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem

执行结果为:

Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
Installing logstash-input-cos
Installation successful

另外,可以通过./bin/logstash-plugin list命令查看logstash已经安装的所有input/output/filter/codec插件。

生成配置文件cos.logstash.conf,内容为:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

该配置文件使用腾讯云官网账号的secret_id和secret_key进行权限验证,拉取指定bucket里的数据,为了测试,将output设置为标准输出。

执行logstash:

./bin/logstash -f cos.logstash.conf

输出结果为:

Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties
[2018-07-30T19:26:17,039][WARN ][logstash.runner          ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.
[2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}
[2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}
[2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos      ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.
[2018-07-30T19:26:17,341][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos      ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}
[2018-07-30T19:26:17,528][INFO ][logstash.pipeline        ] Pipeline main started
[2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos      ] Marker from:
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2018-07-30T19:26:17,574][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos      ] Marker end: access.log
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.710Z
}
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://localhost:8080/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.711Z
}

在cos中的bucket里上传了名为access.log的nginx日志,上述输出结果中最后打印出来的每个json结构体构成一个event, 其中message消息即为access.log中每一条日志。

继续阅读 »

logstash作为一个数据管道中间件,支持对各种类型数据的采集与转换,并将数据发送到各种类型的存储库,比如实现消费kafka数据并且写入到Elasticsearch, 日志文件同步到对象存储S3等,mysql数据同步到Elasticsearch等。

logstash内部主要包含三个模块:

* input: 从数据源获取数据
* filter: 过滤、转换数据
* output: 输出数据

不同类型的数据都可以通过对应的input-plugin, output-plugin完成数据的输入与输出。如需要消费kafka中的数据并写入到Elasticsearch中,则需要使用logstash的kafka-input-plugin完成数据输入,logstash-output-elasticsearch完成数据输出。如果需要对输入数据进行过滤或者转换,比如根据关键词过滤掉不需要的内容,或者时间字段的格式转换,就需要又filter-plugin完成了。

logstash的input插件目前已经有几十种了,支持大多数比较通用或开源的数据源的输入。但如果公司内部开发的数据库或其它存储类的服务不能和开源产品在接口协议上兼容,比如腾讯自研的消息队列服务CMQ不依赖于其它的开源消息队列产品,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的数据;腾讯云对象存储服务COS, 在鉴权方式上和AWS的S3存在差异,也不能直接使用logstash-input-s3插件从COS中读取数据,对于这种情况,就需要自己开发logstash的input插件了。

本文以开发logstash的cos input插件为例,介绍如何开发logstash的input插件。

logstash官方提供了有个简单的input plugin example可供参考: https://github.com/logstash-plugins/logstash-input-example/

环境准备

logstash使用jruby开发,首先要配置jruby环境:

  1. 安装rvm:

    rvm是一个ruby管理器,可以安装并管理ruby环境,也可以通过命令行切换到不同的ruby版本。

    gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
    \curl -sSL https://get.rvm.io | bash -s stable
    source /etc/profile.d/rvm.sh
  2. 安装jruby

    rvm install jruby
    rvm use jruby
  3. 安装包管理工具bundle和测试工具rspec

    gem install bundle
    gem install rspec

从example开始

  1. clone logstash-input-example

    git clone https://github.com/logstash-plugins/logstash-input-example.git
  2. 将clone出来的logstash-input-example源码copy到logstash-input-cos目录,并删除.git文件夹,目的是以logstash-input-example的源码为参考进行开发,同时把需要改动名称的地方修改一下:

    mv logstash-input-example.gemspec logstash-input-cos.gemspec
    mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb
    mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb
  3. 建立的源码目录结构如图所示:

其中,重要文件的作用说明如下:

  • cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
  • cos_spec.rb: 单元测试文件,通过rspec可以对cos.rb中的代码进行测试
  • logstash-input-cos.gemspec: 类似于maven中的pom.xml文件,配置工程的版本、名称、licene,包依赖等,通过bundle命令可以下载依赖包

配置并下载依赖

因为腾讯云COS服务没有ruby sdk, 因为只能依赖其Java sdk进行开发,首先添加对cos java sdk的依赖。在logstash-input-cos.gemspec中Gem dependencies配置栏中增加以下内容:

# Gem dependencies
  s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
  s.add_runtime_dependency 'logstash-codec-plain'
  s.add_runtime_dependency 'stud', '>= 0.0.22'
  s.add_runtime_dependency 'jar-dependencies'
  s.add_development_dependency 'logstash-devutils', '1.3.6'

相比logstash-input-example.gemspec,增加了对com.qcloud:cos_api包以及jar-dependencies包的依赖,jar-dependencies用于在ruby环境中管理jar包,并且可以跟踪jar包的加载状态。

然后,在logstash-input-cos.gemspec中增加配置:

s.platform = 'java'

这样可以成功下载java依赖包,并且可以在ruby代码中直接调用java代码。

最后,执行以下命令下载依赖:

bundle install

编写代码

logstash-input-cos的代码逻辑其实比较简单,主要是通过执行定时任务,调用cos java sdk中的listObjects方法,获取到指定bucket里的数据,并在每次定时任务执行结束后设置marker保存在本地,再次执行时从marker位置获取数据,以实现数据的增量同步。

jar包的引用

因为要调用cos java sdk中的代码,先引用该jar包:

require 'cos_api-5.4.4.jar'
java_import com.qcloud.cos.COSClient;
java_import com.qcloud.cos.ClientConfig;
java_import com.qcloud.cos.auth.BasicCOSCredentials;
java_import com.qcloud.cos.auth.COSCredentials;
java_import com.qcloud.cos.exception.CosClientException;
java_import com.qcloud.cos.exception.CosServiceException;
java_import com.qcloud.cos.model.COSObjectSummary;
java_import com.qcloud.cos.model.ListObjectsRequest;
java_import com.qcloud.cos.model.ObjectListing;
java_import com.qcloud.cos.region.Region;

读取配置文件

logstash配置文件读取的代码如图所示:

config_name为cos,其它的配置项读取代码按照ruby的代码规范编写,添加类型校验与默认值,就可以从以下配置文件中读取配置项:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

实现register方法

logstash input插件必须实现另个方法:register 和run

register方法类似于初始化方法,在该方法中可以直接使用从配置文件读取并赋值的变量,完成cos client的初始化,代码如下:

    # 1 初始化用户身份信息(appid, secretId, secretKey)
    cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
    # 2 设置bucket的区域, COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224
    clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
    # 3 生成cos客户端
    @cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
    # bucket名称, 需包含appid
    bucketName = @bucket + "-"+ @appId
    @bucketName = bucketName

    @listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()
    # 设置bucket名称
    @listObjectsRequest.setBucketName(bucketName)
    # prefix表示列出的object的key以prefix开始
    @listObjectsRequest.setPrefix(@prefix)
    # 设置最大遍历出多少个对象, 一次listobject最大支持1000
    @listObjectsRequest.setMaxKeys(1000)
    @listObjectsRequest.setMarker(@markerConfig.getMarker)

示例代码中设置了@cosclient和@listObjectRequest为全局变量, 因为在run方法中会用到这两个变量。

注意在ruby中调用java代码的方式:没有变量描述符;不能直接new Object(),而只能Object.new().

实现run方法

run方法获取数据并将数据流转换成event事件

最简单的run方法为:

def run(queue)
    Stud.interval(@interval) do
      event = LogStash::Event.new("message" => @message, "host" => @host)
      decorate(event)
      queue << event
    end # loop
  end # def run

代码说明:

  • 通过Stud ruby模块执行定时任务,interval可自定义,从配置文件中读取
  • 生成event, 示例代码生成了一个包含两个字段数据的event
  • 调用decorate()方法, 给该event打上tag,如果配置的话
  • queue<<event, 将event插入到数据管道中,发送给filter处理

logstash-input-cos的run方法实现为:

def run(queue)
    @current_thread = Thread.current
    Stud.interval(@interval) do
      process(queue)
    end
end

def process(queue)
    @logger.info('Marker from: ' + @markerConfig.getMarker)

    objectListing = @cosclient.listObjects(@listObjectsRequest)
    nextMarker = objectListing.getNextMarker()
    cosObjectSummaries = objectListing.getObjectSummaries()
    cosObjectSummaries.each do |obj|
       # 文件的路径key
       key = obj.getKey()

       if stop?
         @logger.info("stop while attempting to read log file")
         break
       end
       # 根据key获取内容
       getObject(key) { |log|
         # 发送消息
         @codec.decode(log) do |event|
           decorate(event)
           queue << event
         end
       }

       #记录 marker
       @markerConfig.setMarker(key)
       @logger.info('Marker end: ' + @markerConfig.getMarker)
    end
  end

  # 获取下载输入流
 def getObject(key, &block)
    getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
    cosObject = @cosclient.getObject(getObjectRequest)
    cosObjectInput = cosObject.getObjectContent()
    buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
    while (line = buffered.readLine())
      block.call(line)
    end
  end

测试代码

在spec/inputs/cos_spec.rb中增加如下测试代码:

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"

describe LogStash::Inputs::Cos do

  it_behaves_like "an interruptible input plugin" do
    let(:config) { {
        "endpoint" => 'cos.ap-guangzhou.myqcloud.com',
        "access_key_id" => '*',
        "access_key_secret" => '*',
        "bucket" => '*',
         "region" => 'ap-guangzhou',
         "appId" => '*',
        "interval" => 60 } }
  end
end

rspec是一个ruby测试库,通过bundle命令执行rspec:

bundle exec rspec

如果cos.rb中的代码没有语法或运行时错误,则会出现如果信息表明测试成功:

Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures

构建并测试input-plugin-cos

build

使用gem对input-plugin-cos插件源码进行build:

gem build logstash-input-cos.gemspec

构建完成后会生成一个名为logstash-input-cos-0.0.1-java.gem的文件

test

在logstash的解压目录下,执行一下命令安装logstash-input-cos plugin:

./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem

执行结果为:

Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
Installing logstash-input-cos
Installation successful

另外,可以通过./bin/logstash-plugin list命令查看logstash已经安装的所有input/output/filter/codec插件。

生成配置文件cos.logstash.conf,内容为:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

该配置文件使用腾讯云官网账号的secret_id和secret_key进行权限验证,拉取指定bucket里的数据,为了测试,将output设置为标准输出。

执行logstash:

./bin/logstash -f cos.logstash.conf

输出结果为:

Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties
[2018-07-30T19:26:17,039][WARN ][logstash.runner          ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.
[2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}
[2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}
[2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos      ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.
[2018-07-30T19:26:17,341][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos      ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}
[2018-07-30T19:26:17,528][INFO ][logstash.pipeline        ] Pipeline main started
[2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos      ] Marker from:
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2018-07-30T19:26:17,574][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos      ] Marker end: access.log
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.710Z
}
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://localhost:8080/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.711Z
}

在cos中的bucket里上传了名为access.log的nginx日志,上述输出结果中最后打印出来的每个json结构体构成一个event, 其中message消息即为access.log中每一条日志。

收起阅读 »

Day 10 - Elasticsearch 分片恢复并发数过大引发的bug分析

       大家好,今天为大家分享一次 ES 的填坑经验。主要是关于集群恢复过程中,分片恢复并发数调整过大导致集群 hang 死的问题。

场景描述

       废话不多说,先来描述场景。某日,腾讯云线上某 ES 集群,15个节点,2700+ 索引,15000+ 分片,数十 TB 数据。由于机器故障,某个节点被重启,此时集群有大量的 unassigned 分片,集群处于 yellow 状态。为了加快集群恢复的速度,手动调整分片恢复并发数,原本想将默认值为2的 node_concurrent_recoveries 调整为10,结果手一抖多加了一个0,设定了如下参数:

curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
        "cluster.routing.allocation.node_concurrent_recoveries": 100,
        "indices.recovery.max_bytes_per_sec": "40mb"
    }
}
'

       设定之后,观察集群 unassigned 分片,一开始下降的速度很快。大约几分钟后,数量维持在一个固定值不变了,然后,然后就没有然后了,集群所有节点 generic 线程池卡死,虽然已存在的索引读写没问题,但是新建索引以及所有涉及 generic 线程池的操作全部卡住。立马修改分片恢复并发数到10,通过管控平台一把重启了全部节点,约15分钟后集群恢复正常。接下来会先介绍一些基本的概念,然后再重现这个问题并做详细分析。

基本概念

ES 线程池(thread pool)

ES 中每个节点有多种线程池,各有用途。重要的有:

  • generic :通用线程池,后台的 node discovery,上述的分片恢复(node recovery)等等一些通用后台的操作都会用到该线程池。该线程池线程数量默认为配置的处理器数量(processors)* 4,最小128,最大512。
  • index :index/delete 等索引操作会用到该线程池,包括自动创建索引等。默认线程数量为配置的处理器数量,默认队列大小:200.
  • search :查询请求处理线程池。默认线程数量:int((# of available_processors * 3) / 2) + 1,默认队列大小:1000.
  • get :get 请求处理线程池。默认线程数量为配置的处理器数量,默认队列大小:1000.
  • write :单个文档的 index/delete/update 以及 bulk 请求处理线程。默认线程数量为配置的处理器数量,默认队列大小:200,在写多的日志场景我们一般会将队列调大。 还有其它线程池,例如备份回档(snapshot)、analyze、refresh 等,这里就不一一介绍了。详细可参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html

集群恢复之分片恢复

       我们知道 ES 集群状态分为三种,green、yellow、red。green 状态表示所有分片包括主副本均正常被分配;yellow 状态表示所有主分片已分配,但是有部分副本分片未分配;red 表示有部分主分片未分配。 一般当集群中某个节点因故障失联或者重启之后,如果集群索引有副本的场景,集群将进入分片恢复阶段(recovery)。此时一般是 master 节点发起更新集群元数据任务,分片的分配策略由 master 决定,具体分配策略可以参考腾讯云+社区的这篇文章了解细节:https://cloud.tencent.com/developer/article/1334743 。各节点收到集群元数据更新请求,检查分片状态并触发分片恢复流程,根据分片数据所在的位置,有多种恢复的方式,主要有以下几种:

  • EXISTING_STORE : 数据在节点本地存在,从本地节点恢复。
  • PEER :本地数据不可用或不存在,从远端节点(源分片,一般是主分片)恢复。
  • SNAPSHOT : 数据从备份仓库恢复。
  • LOCAL_SHARDS : 分片合并(shrink)场景,从本地别的分片恢复。

PEER 场景分片恢复并发数主要由如下参数控制:

  • cluster.routing.allocation.node_concurrent_incoming_recoveries:节点上最大接受的分片恢复并发数。一般指分片从其它节点恢复至本节点。
  • cluster.routing.allocation.node_concurrent_outgoing_recoveries :节点上最大发送的分片恢复并发数。一般指分片从本节点恢复至其它节点。
  • cluster.routing.allocation.node_concurrent_recoveries :该参数同时设置上述接受发送分片恢复并发数为相同的值。 详细参数可参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/shards-allocation.html

       集群卡住的主要原因就是从远端节点恢复(PEER Recovery)的并发数过多,导致 generic 线程池被用完。涉及目标节点(target)和源节点(source)的恢复交互流程,后面分析问题时我们再来详细讨论。

问题复现与剖析

       为了便于描述,我用 ES 6.4.3版本重新搭建了一个三节点的集群。单节点 1 core,2GB memory。新建了300个 index, 单个 index 5个分片一个副本,共 3000 个 shard。每个 index 插入大约100条数据。 先设定分片恢复并发数,为了夸张一点,我直接调整到200,如下所示:

curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
        "cluster.routing.allocation.node_concurrent_recoveries": 200 // 设定分片恢复并发数
    }
}
'

  接下来停掉某节点,模拟机器挂掉场景。几分钟后,观察集群分片恢复数量,卡在固定数值不再变化:

分片恢复统计信息.png

  通过 allocation explain 查看分片分配状态,未分配的原因是受到最大恢复并发数的限制:

分片恢复限制.png
  观察线程池的数量,generic 线程池打满128.

线程池统计.png

此时查询或写入已有索引不受影响,但是新建索引这种涉及到 generic 线程池的操作都会卡住。 通过堆栈分析,128 个 generic 线程全部卡在 PEER recovery 阶段。

堆栈分析.png
         现象有了,我们来分析一下这种场景,远程分片恢复(PEER Recovery)流程为什么会导致集群卡住。

       当集群中有分片的状态发生变更时,master 节点会发起集群元数据更新(cluster state update)请求给所有节点。其它节点收到该请求后,感知到分片状态的变更,启动分片恢复流程。部分分片需要从其它节点恢复,代码层面,涉及分片分配的目标节点(target)和源节点(source)的交互流程如下:

远端恢复时序分析.png
 

       6.x 版本之后引入了 seqNo,恢复会涉及到 seqNo+translog,这也是6.x提升恢复速度的一大改进。我们重点关注流程中第 2、4、5、7、10、12 步骤中的远程调用,他们的作用分别是:

  • 第2步:分片分配的目标节点向源节点(一般是主分片)发起分片恢复请求,携带起始 seqNo 和 syncId。
  • 第4步:发送数据文件信息,告知目标节点待接收的文件清单。
  • 第5步:发送 diff 数据文件给目标节点。
  • 第7步:源节点发送 prepare translog 请求给目标节点,等目标节点打开 shard level 引擎,准备接受 translog。
  • 第10步:源节点发送指定范围的 translog 快照给目标节点。
  • 第12步:结束恢复流程。

       我们可以看到除第5步发送数据文件外,多次远程交互 submitRequest 都会调用 txGet,这个调用底层用的是基于 AQS 改造过的 sync 对象,是一个同步调用。 如果一端 generic 线程池被这些请求打满,发出的请求等待对端返回,而发出的这些请求由于对端 generic 线程池同样的原因被打满,只能 pending 在队列中,这样两边的线程池都满了而且相互等待对端队列中的线程返回,就出现了分布式死锁现象。

问题处理

       为了避免改动太大带来不确定的 side effect,针对腾讯云 ES 集群我们目前先在 rest 层拒掉了并发数超过一定值的参数设定请求并提醒用户。与此同时,我们向官方提交了 issue:https://github.com/elastic/elasticsearch/issues/36195 进行跟踪。

总结

       本文旨在描述集群恢复过程出现的集群卡死场景,避免更多的 ES 用户踩坑,没有对整体分片恢复做详细的分析,大家想了解详细的分片恢复流程可以参考腾讯云+社区 Elasticsearch 专栏相关的文章:https://cloud.tencent.com/developer/column/2428

完结,谢谢!

继续阅读 »

       大家好,今天为大家分享一次 ES 的填坑经验。主要是关于集群恢复过程中,分片恢复并发数调整过大导致集群 hang 死的问题。

场景描述

       废话不多说,先来描述场景。某日,腾讯云线上某 ES 集群,15个节点,2700+ 索引,15000+ 分片,数十 TB 数据。由于机器故障,某个节点被重启,此时集群有大量的 unassigned 分片,集群处于 yellow 状态。为了加快集群恢复的速度,手动调整分片恢复并发数,原本想将默认值为2的 node_concurrent_recoveries 调整为10,结果手一抖多加了一个0,设定了如下参数:

curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
        "cluster.routing.allocation.node_concurrent_recoveries": 100,
        "indices.recovery.max_bytes_per_sec": "40mb"
    }
}
'

       设定之后,观察集群 unassigned 分片,一开始下降的速度很快。大约几分钟后,数量维持在一个固定值不变了,然后,然后就没有然后了,集群所有节点 generic 线程池卡死,虽然已存在的索引读写没问题,但是新建索引以及所有涉及 generic 线程池的操作全部卡住。立马修改分片恢复并发数到10,通过管控平台一把重启了全部节点,约15分钟后集群恢复正常。接下来会先介绍一些基本的概念,然后再重现这个问题并做详细分析。

基本概念

ES 线程池(thread pool)

ES 中每个节点有多种线程池,各有用途。重要的有:

  • generic :通用线程池,后台的 node discovery,上述的分片恢复(node recovery)等等一些通用后台的操作都会用到该线程池。该线程池线程数量默认为配置的处理器数量(processors)* 4,最小128,最大512。
  • index :index/delete 等索引操作会用到该线程池,包括自动创建索引等。默认线程数量为配置的处理器数量,默认队列大小:200.
  • search :查询请求处理线程池。默认线程数量:int((# of available_processors * 3) / 2) + 1,默认队列大小:1000.
  • get :get 请求处理线程池。默认线程数量为配置的处理器数量,默认队列大小:1000.
  • write :单个文档的 index/delete/update 以及 bulk 请求处理线程。默认线程数量为配置的处理器数量,默认队列大小:200,在写多的日志场景我们一般会将队列调大。 还有其它线程池,例如备份回档(snapshot)、analyze、refresh 等,这里就不一一介绍了。详细可参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html

集群恢复之分片恢复

       我们知道 ES 集群状态分为三种,green、yellow、red。green 状态表示所有分片包括主副本均正常被分配;yellow 状态表示所有主分片已分配,但是有部分副本分片未分配;red 表示有部分主分片未分配。 一般当集群中某个节点因故障失联或者重启之后,如果集群索引有副本的场景,集群将进入分片恢复阶段(recovery)。此时一般是 master 节点发起更新集群元数据任务,分片的分配策略由 master 决定,具体分配策略可以参考腾讯云+社区的这篇文章了解细节:https://cloud.tencent.com/developer/article/1334743 。各节点收到集群元数据更新请求,检查分片状态并触发分片恢复流程,根据分片数据所在的位置,有多种恢复的方式,主要有以下几种:

  • EXISTING_STORE : 数据在节点本地存在,从本地节点恢复。
  • PEER :本地数据不可用或不存在,从远端节点(源分片,一般是主分片)恢复。
  • SNAPSHOT : 数据从备份仓库恢复。
  • LOCAL_SHARDS : 分片合并(shrink)场景,从本地别的分片恢复。

PEER 场景分片恢复并发数主要由如下参数控制:

  • cluster.routing.allocation.node_concurrent_incoming_recoveries:节点上最大接受的分片恢复并发数。一般指分片从其它节点恢复至本节点。
  • cluster.routing.allocation.node_concurrent_outgoing_recoveries :节点上最大发送的分片恢复并发数。一般指分片从本节点恢复至其它节点。
  • cluster.routing.allocation.node_concurrent_recoveries :该参数同时设置上述接受发送分片恢复并发数为相同的值。 详细参数可参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/shards-allocation.html

       集群卡住的主要原因就是从远端节点恢复(PEER Recovery)的并发数过多,导致 generic 线程池被用完。涉及目标节点(target)和源节点(source)的恢复交互流程,后面分析问题时我们再来详细讨论。

问题复现与剖析

       为了便于描述,我用 ES 6.4.3版本重新搭建了一个三节点的集群。单节点 1 core,2GB memory。新建了300个 index, 单个 index 5个分片一个副本,共 3000 个 shard。每个 index 插入大约100条数据。 先设定分片恢复并发数,为了夸张一点,我直接调整到200,如下所示:

curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
        "cluster.routing.allocation.node_concurrent_recoveries": 200 // 设定分片恢复并发数
    }
}
'

  接下来停掉某节点,模拟机器挂掉场景。几分钟后,观察集群分片恢复数量,卡在固定数值不再变化:

分片恢复统计信息.png

  通过 allocation explain 查看分片分配状态,未分配的原因是受到最大恢复并发数的限制:

分片恢复限制.png
  观察线程池的数量,generic 线程池打满128.

线程池统计.png

此时查询或写入已有索引不受影响,但是新建索引这种涉及到 generic 线程池的操作都会卡住。 通过堆栈分析,128 个 generic 线程全部卡在 PEER recovery 阶段。

堆栈分析.png
         现象有了,我们来分析一下这种场景,远程分片恢复(PEER Recovery)流程为什么会导致集群卡住。

       当集群中有分片的状态发生变更时,master 节点会发起集群元数据更新(cluster state update)请求给所有节点。其它节点收到该请求后,感知到分片状态的变更,启动分片恢复流程。部分分片需要从其它节点恢复,代码层面,涉及分片分配的目标节点(target)和源节点(source)的交互流程如下:

远端恢复时序分析.png
 

       6.x 版本之后引入了 seqNo,恢复会涉及到 seqNo+translog,这也是6.x提升恢复速度的一大改进。我们重点关注流程中第 2、4、5、7、10、12 步骤中的远程调用,他们的作用分别是:

  • 第2步:分片分配的目标节点向源节点(一般是主分片)发起分片恢复请求,携带起始 seqNo 和 syncId。
  • 第4步:发送数据文件信息,告知目标节点待接收的文件清单。
  • 第5步:发送 diff 数据文件给目标节点。
  • 第7步:源节点发送 prepare translog 请求给目标节点,等目标节点打开 shard level 引擎,准备接受 translog。
  • 第10步:源节点发送指定范围的 translog 快照给目标节点。
  • 第12步:结束恢复流程。

       我们可以看到除第5步发送数据文件外,多次远程交互 submitRequest 都会调用 txGet,这个调用底层用的是基于 AQS 改造过的 sync 对象,是一个同步调用。 如果一端 generic 线程池被这些请求打满,发出的请求等待对端返回,而发出的这些请求由于对端 generic 线程池同样的原因被打满,只能 pending 在队列中,这样两边的线程池都满了而且相互等待对端队列中的线程返回,就出现了分布式死锁现象。

问题处理

       为了避免改动太大带来不确定的 side effect,针对腾讯云 ES 集群我们目前先在 rest 层拒掉了并发数超过一定值的参数设定请求并提醒用户。与此同时,我们向官方提交了 issue:https://github.com/elastic/elasticsearch/issues/36195 进行跟踪。

总结

       本文旨在描述集群恢复过程出现的集群卡死场景,避免更多的 ES 用户踩坑,没有对整体分片恢复做详细的分析,大家想了解详细的分片恢复流程可以参考腾讯云+社区 Elasticsearch 专栏相关的文章:https://cloud.tencent.com/developer/column/2428

完结,谢谢!

收起阅读 »

社区日报 第474期 (2018-12-10)

1. 如何设置kibana的堆大小避免oom
http://t.cn/Ey1omYc

2. Es 另外一款web管理UI,包含导入,查看,编辑等功能
http://t.cn/Ey1dKAj

3. 使用elasticseach 搜索emoji表情
http://t.cn/Rf5r848

编辑:cyberdak
归档:https://elasticsearch.cn/article/6183
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1. 如何设置kibana的堆大小避免oom
http://t.cn/Ey1omYc

2. Es 另外一款web管理UI,包含导入,查看,编辑等功能
http://t.cn/Ey1dKAj

3. 使用elasticseach 搜索emoji表情
http://t.cn/Rf5r848

编辑:cyberdak
归档:https://elasticsearch.cn/article/6183
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

Day 9 - 动手实现一个自定义beat

参考

https://elasticsearch.cn/article/113

https://www.elastic.co/blog/build-your-own-beat

介绍

公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。 本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。

环境配置

go version go1.9.4 linux/amd64

python version: 2.7.9

不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9

下面的工具会提供python本身需要依赖的一些native包

yum install openssl -y
yum install openssl-devel -y
yum install zlib-devel -y
yum install zlib -y

安装python

wget https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz
tar -zxvf Python-2.7.9.tgz
cd ~/python/Python-2.7.9
./configure --prefix=/usr/local/python-2.7.9
make
make install

rm -f /bin/python
ln -s /usr/local/python-2.7.9/bin/python /bin/python

安装工具包 distribute, setuptools, pip

cd ~/python/setuptools-19.6 && python setup.py install
cd ~/python/pip-1.5.4  && python setup.py install
cd ~/python/distribute-0.7.3  && python setup.py install

安装cookiecutter

pip install --user cookiecutter

安装cookiecutter所依赖的工具

pip install backports.functools-lru-cache
pip install six
pip install virtualenv

*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到

代码实现

需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping

使用官方提供的beat模板创建自己的beat

  • 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go
$ go get github.com/elastic/beats
$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1

[root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat
project_name [Examplebeat]: hdfsauditbeat
github_name [your-github-name]: suxingfate
beat [hdfsauditbeat]:
beat_path [github.com/suxingfate]:
full_name [Firstname Lastname]: xinglong

make setup

到这里,模板就生成了,然后就是定制需要的东西。

  • 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
  • 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
  • 3 beater/hdfsauditbeat.go # 核心逻辑代码
  • 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping

1 _meta/beat.yml

这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑

[root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "."

2 config/config.go

这里把path定义到struct里面,后面核心代码就可以从config对象获得path了

[root@minikube-2830379 hdfsauditbeat]# cat config/config.go
// Config is put into a different package to prevent cyclic imports in case
// it is needed in several locations

package config

import "time"

type Config struct {
    Period time.Duration `config:"period"`
        Path   string        `config:"path"`
}

var DefaultConfig = Config{
    Period: 1 * time.Second,
        Path:   ".",
}

3 beater/hdfsauditbeat.go

这里需要改动的地方是: 3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去 3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入

[root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go
package beater

import (
    "fmt"
    "time"
        "os"
        "io"
        "bufio"
        "strings"
    "github.com/elastic/beats/libbeat/beat"
    "github.com/elastic/beats/libbeat/common"
    "github.com/elastic/beats/libbeat/logp"
    "github.com/elastic/beats/libbeat/publisher"

    "github.com/suxingfate/hdfsauditbeat/config"
)

type Hdfsauditbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
}

// Creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }

    bt := &Hdfsauditbeat{
        done: make(chan struct{}),
        config: config,
    }
    return bt, nil
}

func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {
    logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")

    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    counter := 1
    for {
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }

        bt.catAudit(bt.config.Path)

        logp.Info("Event sent")
        counter++
    }
}

func (bt *Hdfsauditbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

func (bt *Hdfsauditbeat) catAudit(auditFile string) {
    file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)
    if err != nil {
        //fmt.Println("Open file error!", err)
        return
    }
    defer file.Close()

    buf := bufio.NewReader(file)
    for {
        line, err := buf.ReadString('\n')
        line = strings.TrimSpace(line)
        if line == "" {
            return
        }

    timeEnd := strings.Index(line, ",")
        timeString := line[0 :timeEnd]
        tm, _ := time.Parse("2006-01-02 03:04:05", timeString)

        ugiStart := strings.Index(line, "ugi=") + 4
        ugiEnd := strings.Index(line, " (auth")
        ugi := line[ugiStart :ugiEnd]

        cmdStart := strings.Index(line, "cmd=") + 4
        line = line[cmdStart:len(line)]
        cmdEnd := strings.Index(line, " ")
        cmd := line[0 : cmdEnd]

        srcStart := strings.Index(line, "src=") + 4
        line = line[srcStart:len(line)]
        srcEnd := strings.Index(line, " ")
        src := line[0:srcEnd]

        dstStart := strings.Index(line, "dst=") + 4
        line = line[dstStart:len(line)]
        dstEnd := strings.Index(line, " ")
        dst := line[0:dstEnd]

        event := common.MapStr{
                "@timestamp": common.Time(time.Unix(tm.Unix(), 0)),
                "ugi":       ugi,
                "cmd":       cmd,
                "src":    src,
                "dst":   dst,
            }
            bt.client.PublishEvent(event)

        if err != nil {
            if err == io.EOF {
                //fmt.Println("File read ok!")
                break
            } else {
                //fmt.Println("Read file error!", err)
                return
            }
        }
    }
}

4 _meat/fields.yml

[root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml
- key: hdfsauditbeat
  title: hdfsauditbeat
  description:
  fields:
    - name: counter
      type: long
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added hdfsaudit
    - name: entrytime
      type: date
    - name: ugi
      type: keyword
    - name: cmd
      type: keyword
    - name: src
      type: keyword
    - name: dst
      type: keyword

测试

首先编译好项目

make update
make

然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。 修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console

[root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "/root/go/hdfs-audit.log"

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
#  hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

output.console:
  pretty: true
#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

开始执行

[root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}

结束

使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。

继续阅读 »

参考

https://elasticsearch.cn/article/113

https://www.elastic.co/blog/build-your-own-beat

介绍

公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。 本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。

环境配置

go version go1.9.4 linux/amd64

python version: 2.7.9

不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9

下面的工具会提供python本身需要依赖的一些native包

yum install openssl -y
yum install openssl-devel -y
yum install zlib-devel -y
yum install zlib -y

安装python

wget https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz
tar -zxvf Python-2.7.9.tgz
cd ~/python/Python-2.7.9
./configure --prefix=/usr/local/python-2.7.9
make
make install

rm -f /bin/python
ln -s /usr/local/python-2.7.9/bin/python /bin/python

安装工具包 distribute, setuptools, pip

cd ~/python/setuptools-19.6 && python setup.py install
cd ~/python/pip-1.5.4  && python setup.py install
cd ~/python/distribute-0.7.3  && python setup.py install

安装cookiecutter

pip install --user cookiecutter

安装cookiecutter所依赖的工具

pip install backports.functools-lru-cache
pip install six
pip install virtualenv

*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到

代码实现

需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping

使用官方提供的beat模板创建自己的beat

  • 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go
$ go get github.com/elastic/beats
$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1

[root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat
project_name [Examplebeat]: hdfsauditbeat
github_name [your-github-name]: suxingfate
beat [hdfsauditbeat]:
beat_path [github.com/suxingfate]:
full_name [Firstname Lastname]: xinglong

make setup

到这里,模板就生成了,然后就是定制需要的东西。

  • 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
  • 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
  • 3 beater/hdfsauditbeat.go # 核心逻辑代码
  • 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping

1 _meta/beat.yml

这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑

[root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "."

2 config/config.go

这里把path定义到struct里面,后面核心代码就可以从config对象获得path了

[root@minikube-2830379 hdfsauditbeat]# cat config/config.go
// Config is put into a different package to prevent cyclic imports in case
// it is needed in several locations

package config

import "time"

type Config struct {
    Period time.Duration `config:"period"`
        Path   string        `config:"path"`
}

var DefaultConfig = Config{
    Period: 1 * time.Second,
        Path:   ".",
}

3 beater/hdfsauditbeat.go

这里需要改动的地方是: 3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去 3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入

[root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go
package beater

import (
    "fmt"
    "time"
        "os"
        "io"
        "bufio"
        "strings"
    "github.com/elastic/beats/libbeat/beat"
    "github.com/elastic/beats/libbeat/common"
    "github.com/elastic/beats/libbeat/logp"
    "github.com/elastic/beats/libbeat/publisher"

    "github.com/suxingfate/hdfsauditbeat/config"
)

type Hdfsauditbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
}

// Creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }

    bt := &Hdfsauditbeat{
        done: make(chan struct{}),
        config: config,
    }
    return bt, nil
}

func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {
    logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")

    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    counter := 1
    for {
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }

        bt.catAudit(bt.config.Path)

        logp.Info("Event sent")
        counter++
    }
}

func (bt *Hdfsauditbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

func (bt *Hdfsauditbeat) catAudit(auditFile string) {
    file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)
    if err != nil {
        //fmt.Println("Open file error!", err)
        return
    }
    defer file.Close()

    buf := bufio.NewReader(file)
    for {
        line, err := buf.ReadString('\n')
        line = strings.TrimSpace(line)
        if line == "" {
            return
        }

    timeEnd := strings.Index(line, ",")
        timeString := line[0 :timeEnd]
        tm, _ := time.Parse("2006-01-02 03:04:05", timeString)

        ugiStart := strings.Index(line, "ugi=") + 4
        ugiEnd := strings.Index(line, " (auth")
        ugi := line[ugiStart :ugiEnd]

        cmdStart := strings.Index(line, "cmd=") + 4
        line = line[cmdStart:len(line)]
        cmdEnd := strings.Index(line, " ")
        cmd := line[0 : cmdEnd]

        srcStart := strings.Index(line, "src=") + 4
        line = line[srcStart:len(line)]
        srcEnd := strings.Index(line, " ")
        src := line[0:srcEnd]

        dstStart := strings.Index(line, "dst=") + 4
        line = line[dstStart:len(line)]
        dstEnd := strings.Index(line, " ")
        dst := line[0:dstEnd]

        event := common.MapStr{
                "@timestamp": common.Time(time.Unix(tm.Unix(), 0)),
                "ugi":       ugi,
                "cmd":       cmd,
                "src":    src,
                "dst":   dst,
            }
            bt.client.PublishEvent(event)

        if err != nil {
            if err == io.EOF {
                //fmt.Println("File read ok!")
                break
            } else {
                //fmt.Println("Read file error!", err)
                return
            }
        }
    }
}

4 _meat/fields.yml

[root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml
- key: hdfsauditbeat
  title: hdfsauditbeat
  description:
  fields:
    - name: counter
      type: long
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added hdfsaudit
    - name: entrytime
      type: date
    - name: ugi
      type: keyword
    - name: cmd
      type: keyword
    - name: src
      type: keyword
    - name: dst
      type: keyword

测试

首先编译好项目

make update
make

然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。 修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console

[root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "/root/go/hdfs-audit.log"

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
#  hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

output.console:
  pretty: true
#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

开始执行

[root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}

结束

使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。

收起阅读 »

社区日报 第473期 (2018-12-09)

1.ElasticSearch连接:Has_Child,Has_parent查询。
http://t.cn/EyEJZGO
2.(自备梯子)将full-scale ELK栈部署到Kubernetes。
http://t.cn/EyEiOtk
3.(自备梯子)Facebook建立在不平等的基础之上。
http://t.cn/EyE6quM

编辑:至尊宝
归档:https://elasticsearch.cn/article/6181
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.ElasticSearch连接:Has_Child,Has_parent查询。
http://t.cn/EyEJZGO
2.(自备梯子)将full-scale ELK栈部署到Kubernetes。
http://t.cn/EyEiOtk
3.(自备梯子)Facebook建立在不平等的基础之上。
http://t.cn/EyE6quM

编辑:至尊宝
归档:https://elasticsearch.cn/article/6181
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第472期 (2018-12-08)

  1. 用于ES数据警报的实时守护进程。 http://t.cn/EyQmiuE

  2. 最小的ibana Docker镜像。 http://t.cn/EyQbwGk

  3. Lucene列式存储格式DocValues详解。 http://t.cn/EyQ6pkK
继续阅读 »
  1. 用于ES数据警报的实时守护进程。 http://t.cn/EyQmiuE

  2. 最小的ibana Docker镜像。 http://t.cn/EyQbwGk

  3. Lucene列式存储格式DocValues详解。 http://t.cn/EyQ6pkK
收起阅读 »

Day 8 - 如何使用Spark快速将数据写入Elasticsearch

如何使用Spark快速将数据写入Elasticsearch

说到数据写入Elasticsearch,最先想到的肯定是Logstash。Logstash因为其简单上手、可扩展、可伸缩等优点被广大用户接受。但是尺有所短,寸有所长,Logstash肯定也有它无法适用的应用场景,比如:

  • 海量数据ETL
  • 海量数据聚合
  • 多源数据处理

为了满足这些场景,很多同学都会选择Spark,借助Spark算子进行数据处理,最后将处理结果写入Elasticsearch。

我们部门之前利用Spark对Nginx日志进行分析,统计我们的Web服务访问情况,将Nginx日志每分钟聚合一次最后将结果写入Elasticsearch,然后利用Kibana配置实时监控Dashboard。Elasticsearch和Kibana都很方便、实用,但是随着类似需求越来越多,如何快速通过Spark将数据写入Elasticsearch成为了我们的一大问题。

今天给大家推荐一款能够实现数据快速写入的黑科技——Waterdrop,一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上,简单易用,灵活配置,无需开发。

wd.png

Kafka to Elasticsearch

和Logstash一样,Waterdrop同样支持多种类型的数据输入,这里我们以最常见的Kakfa作为输入源为例,讲解如何使用Waterdrop将数据快速写入Elasticsearch

Log Sample

原始日志格式如下:

127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"

Elasticsearch Document

我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:

domain String
hostname String
status int
datetime String
count int

Waterdrop with Elasticsearch

接下来会给大家详细介绍,我们如何通过Waterdrop读取Kafka中的数据,对数据进行解析以及聚合,最后将处理结果写入Elasticsearch中。

Waterdrop

Waterdrop同样拥有着非常丰富的插件,支持从Kafka、HDFS、Hive中读取数据,进行各种各样的数据处理,并将结果写入Elasticsearch、Kudu或者Kafka中。

Prerequisites

首先我们需要安装Waterdrop,安装十分简单,无需配置系统环境变量

  1. 准备Spark环境
  2. 安装Waterdrop
  3. 配置Waterdrop

以下是简易步骤,具体安装可以参照Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip
unzip waterdrop-1.1.1.zip
cd waterdrop-1.1.1

vim config/waterdrop-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Waterdrop Pipeline

与Logstash一样,我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入,相信了解Logstash的朋友可以很快入手Waterdrop配置。

配置文件包括四个部分,分别是Spark、Input、filter和Output。

Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

Input

这一部分定义数据源,如下是从Kafka中读取数据的配置案例,

kafkaStream {
    topics = "waterdrop-es"
    consumer.bootstrap.servers = "localhost:9092"
    consumer.group.id = "waterdrop_es_group"
    consumer.rebalance.max.retries = 100
}

Filter

在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为Elasticsearch支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行数据聚合

filter {
    # 使用正则解析原始日志
    # 最开始数据都在raw_message字段中
    grok {
        source_field = "raw_message"
        pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
    # Elasticsearch中支持的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
    }
    ## 利用SQL对数据进行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
    }
 }

Output

最后我们将处理好的结构化数据写入Elasticsearch。

output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "waterdrop-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}

Running Waterdrop

我们将上述四部分配置组合成为我们的配置文件config/batch.conf

vim config/batch.conf
spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}
input {
    kafkaStream {
        topics = "waterdrop-es"
        consumer.bootstrap.servers = "localhost:9092"
        consumer.group.id = "waterdrop_es_group"
        consumer.rebalance.max.retries = 100
    }
}
filter {
    # 使用正则解析原始日志
    # 最开始数据都在raw_message字段中
    grok {
        source_field = "raw_message"
        pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
    # Elasticsearch中支持的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
    }
    ## 利用SQL对数据进行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, localhost, status, datetime"
    }
 }
output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "waterdrop-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}

执行命令,指定配置文件,运行Waterdrop,即可将数据写入Elasticsearch。这里我们以本地模式为例。

./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

最后,写入Elasticsearch中的数据如下,再配上Kibana就可以实现Web服务的实时监控了^_^.

"_source": {
    "domain": "elasticsearch.cn",
    "hostname": "localhost",
    "status": "200",
    "datetime": "2018-11-26T21:54:00.000+08:00",
    "count": 26
  }

Conclusion

在这篇文章中,我们介绍了如何通过Waterdrop将Kafka中的数据写入Elasticsearch中。仅仅通过一个配置文件便可快速运行一个Spark Application,完成数据的处理、写入,无需编写任何代码,十分简单。

当数据处理过程中有遇到Logstash无法支持的场景或者Logstah性能无法达到预期的情况下,都可以尝试使用Waterdrop解决问题。

希望了解Waterdrop与Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入项目主页https://github.com/InterestingLab/waterdrop

我们近期会再发布一篇《如何用Spark和Elasticsearch做交互式数据分析》,敬请期待.

Contract us

欢迎联系我们交流Spark和Elasticsearch:

Garyelephant: 微信: garyelephant

RickyHuo: 微信: chodomatte1994

继续阅读 »

如何使用Spark快速将数据写入Elasticsearch

说到数据写入Elasticsearch,最先想到的肯定是Logstash。Logstash因为其简单上手、可扩展、可伸缩等优点被广大用户接受。但是尺有所短,寸有所长,Logstash肯定也有它无法适用的应用场景,比如:

  • 海量数据ETL
  • 海量数据聚合
  • 多源数据处理

为了满足这些场景,很多同学都会选择Spark,借助Spark算子进行数据处理,最后将处理结果写入Elasticsearch。

我们部门之前利用Spark对Nginx日志进行分析,统计我们的Web服务访问情况,将Nginx日志每分钟聚合一次最后将结果写入Elasticsearch,然后利用Kibana配置实时监控Dashboard。Elasticsearch和Kibana都很方便、实用,但是随着类似需求越来越多,如何快速通过Spark将数据写入Elasticsearch成为了我们的一大问题。

今天给大家推荐一款能够实现数据快速写入的黑科技——Waterdrop,一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上,简单易用,灵活配置,无需开发。

wd.png

Kafka to Elasticsearch

和Logstash一样,Waterdrop同样支持多种类型的数据输入,这里我们以最常见的Kakfa作为输入源为例,讲解如何使用Waterdrop将数据快速写入Elasticsearch

Log Sample

原始日志格式如下:

127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"

Elasticsearch Document

我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:

domain String
hostname String
status int
datetime String
count int

Waterdrop with Elasticsearch

接下来会给大家详细介绍,我们如何通过Waterdrop读取Kafka中的数据,对数据进行解析以及聚合,最后将处理结果写入Elasticsearch中。

Waterdrop

Waterdrop同样拥有着非常丰富的插件,支持从Kafka、HDFS、Hive中读取数据,进行各种各样的数据处理,并将结果写入Elasticsearch、Kudu或者Kafka中。

Prerequisites

首先我们需要安装Waterdrop,安装十分简单,无需配置系统环境变量

  1. 准备Spark环境
  2. 安装Waterdrop
  3. 配置Waterdrop

以下是简易步骤,具体安装可以参照Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip
unzip waterdrop-1.1.1.zip
cd waterdrop-1.1.1

vim config/waterdrop-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Waterdrop Pipeline

与Logstash一样,我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入,相信了解Logstash的朋友可以很快入手Waterdrop配置。

配置文件包括四个部分,分别是Spark、Input、filter和Output。

Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

Input

这一部分定义数据源,如下是从Kafka中读取数据的配置案例,

kafkaStream {
    topics = "waterdrop-es"
    consumer.bootstrap.servers = "localhost:9092"
    consumer.group.id = "waterdrop_es_group"
    consumer.rebalance.max.retries = 100
}

Filter

在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为Elasticsearch支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行数据聚合

filter {
    # 使用正则解析原始日志
    # 最开始数据都在raw_message字段中
    grok {
        source_field = "raw_message"
        pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
    # Elasticsearch中支持的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
    }
    ## 利用SQL对数据进行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
    }
 }

Output

最后我们将处理好的结构化数据写入Elasticsearch。

output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "waterdrop-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}

Running Waterdrop

我们将上述四部分配置组合成为我们的配置文件config/batch.conf

vim config/batch.conf
spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}
input {
    kafkaStream {
        topics = "waterdrop-es"
        consumer.bootstrap.servers = "localhost:9092"
        consumer.group.id = "waterdrop_es_group"
        consumer.rebalance.max.retries = 100
    }
}
filter {
    # 使用正则解析原始日志
    # 最开始数据都在raw_message字段中
    grok {
        source_field = "raw_message"
        pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
    # Elasticsearch中支持的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
    }
    ## 利用SQL对数据进行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, localhost, status, datetime"
    }
 }
output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "waterdrop-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}

执行命令,指定配置文件,运行Waterdrop,即可将数据写入Elasticsearch。这里我们以本地模式为例。

./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

最后,写入Elasticsearch中的数据如下,再配上Kibana就可以实现Web服务的实时监控了^_^.

"_source": {
    "domain": "elasticsearch.cn",
    "hostname": "localhost",
    "status": "200",
    "datetime": "2018-11-26T21:54:00.000+08:00",
    "count": 26
  }

Conclusion

在这篇文章中,我们介绍了如何通过Waterdrop将Kafka中的数据写入Elasticsearch中。仅仅通过一个配置文件便可快速运行一个Spark Application,完成数据的处理、写入,无需编写任何代码,十分简单。

当数据处理过程中有遇到Logstash无法支持的场景或者Logstah性能无法达到预期的情况下,都可以尝试使用Waterdrop解决问题。

希望了解Waterdrop与Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入项目主页https://github.com/InterestingLab/waterdrop

我们近期会再发布一篇《如何用Spark和Elasticsearch做交互式数据分析》,敬请期待.

Contract us

欢迎联系我们交流Spark和Elasticsearch:

Garyelephant: 微信: garyelephant

RickyHuo: 微信: chodomatte1994

收起阅读 »

Day 7 - Elasticsearch中数据是如何存储的

前言

很多使用Elasticsearch的同学会关心数据存储在ES中的存储容量,会有这样的疑问:xxTB的数据入到ES会使用多少存储空间。这个问题其实很难直接回答的,只有数据写入ES后,才能观察到实际的存储空间。比如同样是1TB的数据,写入ES的存储空间可能差距会非常大,可能小到只有300~400GB,也可能多到6-7TB,为什么会造成这么大的差距呢?究其原因,我们来探究下Elasticsearch中的数据是如何存储。文章中我以Elasticsearch 2.3版本为示例,对应的lucene版本是5.5,Elasticsearch现在已经来到了6.5版本,数字类型、列存等存储结构有些变化,但基本的概念变化不多,文章中的内容依然适用。

Elasticsearch索引结构

Elasticsearch对外提供的是index的概念,可以类比为DB,用户查询是在index上完成的,每个index由若干个shard组成,以此来达到分布式可扩展的能力。比如下图是一个由10个shard组成的index。

elasticsearch_store_arc.png

shard是Elasticsearch数据存储的最小单位,index的存储容量为所有shard的存储容量之和。Elasticsearch集群的存储容量则为所有index存储容量之和。

一个shard就对应了一个lucene的library。对于一个shard,Elasticsearch增加了translog的功能,类似于HBase WAL,是数据写入过程中的中间数据,其余的数据都在lucene库中管理的。

所以Elasticsearch索引使用的存储内容主要取决于lucene中的数据存储。

lucene数据存储

下面我们主要看下lucene的文件内容,在了解lucene文件内容前,大家先了解些lucene的基本概念。

lucene基本概念

  • segment : lucene内部的数据是由一个个segment组成的,写入lucene的数据并不直接落盘,而是先写在内存中,经过了refresh间隔,lucene才将该时间段写入的全部数据refresh成一个segment,segment多了之后会进行merge成更大的segment。lucene查询时会遍历每个segment完成。由于lucene* 写入的数据是在内存中完成,所以写入效率非常高。但是也存在丢失数据的风险,所以Elasticsearch基于此现象实现了translog,只有在segment数据落盘后,Elasticsearch才会删除对应的translog。
  • doc : doc表示lucene中的一条记录
  • field :field表示记录中的字段概念,一个doc由若干个field组成。
  • term :term是lucene中索引的最小单位,某个field对应的内容如果是全文检索类型,会将内容进行分词,分词的结果就是由term组成的。如果是不分词的字段,那么该字段的内容就是一个term。
  • 倒排索引(inverted index): lucene索引的通用叫法,即实现了term到doc list的映射。
  • 正排数据:搜索引擎的通用叫法,即原始数据,可以理解为一个doc list。
  • docvalues :Elasticsearch中的列式存储的名称,Elasticsearch除了存储原始存储、倒排索引,还存储了一份docvalues,用作分析和排序。

lucene文件内容

lucene包的文件是由很多segment文件组成的,segments_xxx文件记录了lucene包下面的segment文件数量。每个segment会包含如下的文件。

Name Extension Brief Description
Segment Info .si segment的元数据文件
Compound File .cfs, .cfe 一个segment包含了如下表的各个文件,为减少打开文件的数量,在segment小的时候,segment的所有文件内容都保存在cfs文件中,cfe文件保存了lucene各文件在cfs文件的位置信息
Fields .fnm 保存了fields的相关信息
Field Index .fdx 正排存储文件的元数据信息
Field Data .fdt 存储了正排存储数据,写入的原文存储在这
Term Dictionary .tim 倒排索引的元数据信息
Term Index .tip 倒排索引文件,存储了所有的倒排索引数据
Frequencies .doc 保存了每个term的doc id列表和term在doc中的词频
Positions .pos Stores position information about where a term occurs in the index
全文索引的字段,会有该文件,保存了term在doc中的位置
Payloads .pay Stores additional per-position metadata information such as character offsets and user payloads
全文索引的字段,使用了一些像payloads的高级特性会有该文件,保存了term在doc中的一些高级特性
Norms .nvd, .nvm 文件保存索引字段加权数据
Per-Document Values .dvd, .dvm lucene的docvalues文件,即数据的列式存储,用作聚合和排序
Term Vector Data .tvx, .tvd, .tvf Stores offset into the document data file
保存索引字段的矢量信息,用在对term进行高亮,计算文本相关性中使用
Live Documents .liv 记录了segment中删除的doc

测试数据示例

下面我们以真实的数据作为示例,看看lucene中各类型数据的容量占比。

写100w数据,有一个uuid字段,写入的是长度为36位的uuid,字符串总为3600w字节,约为35M。

数据使用一个shard,不带副本,使用默认的压缩算法,写入完成后merge成一个segment方便观察。

使用线上默认的配置,uuid存为不分词的字符串类型。创建如下索引:

PUT test_field
{
  "settings": {
    "index": {
      "number_of_shards": "1",
      "number_of_replicas": "0",
      "refresh_interval": "30s"
    }
  },
  "mappings": {
    "type": {
      "_all": {
        "enabled": false
      }, 
      "properties": {
        "uuid": {
          "type": "string",
          "index": "not_analyzed"
        }
      }
    }
  }
}

首先写入100w不同的uuid,使用磁盘容量细节如下:


health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    122.7mb        122.7mb 

-rw-r--r--  1 weizijun  staff    41M Aug 19 21:23 _8.fdt
-rw-r--r--  1 weizijun  staff    17K Aug 19 21:23 _8.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 19 21:23 _8.fnm
-rw-r--r--  1 weizijun  staff   494B Aug 19 21:23 _8.si
-rw-r--r--  1 weizijun  staff   265K Aug 19 21:23 _8_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff    44M Aug 19 21:23 _8_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   340K Aug 19 21:23 _8_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 19 21:23 _8_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 19 21:23 _8_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   195B Aug 19 21:23 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 19 21:20 write.lock

可以看到正排数据、倒排索引数据,列存数据容量占比几乎相同,正排数据和倒排数据还会存储Elasticsearch的唯一id字段,所以容量会比列存多一些。

35M的uuid存入Elasticsearch后,数据膨胀了3倍,达到了122.7mb。Elasticsearch竟然这么消耗资源,不要着急下结论,接下来看另一个测试结果。

我们写入100w一样的uuid,然后看看Elasticsearch使用的容量。

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0     13.2mb         13.2mb 

-rw-r--r--  1 weizijun  staff   5.5M Aug 19 21:29 _6.fdt
-rw-r--r--  1 weizijun  staff    15K Aug 19 21:29 _6.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 19 21:29 _6.fnm
-rw-r--r--  1 weizijun  staff   494B Aug 19 21:29 _6.si
-rw-r--r--  1 weizijun  staff   309K Aug 19 21:29 _6_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   7.0M Aug 19 21:29 _6_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   195K Aug 19 21:29 _6_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff   244K Aug 19 21:29 _6_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   252B Aug 19 21:29 _6_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   195B Aug 19 21:29 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 19 21:26 write.lock

这回35M的数据Elasticsearch容量只有13.2mb,其中还有主要的占比还是Elasticsearch的唯一id,100w的uuid几乎不占存储容积。

所以在Elasticsearch中建立索引的字段如果基数越大(count distinct),越占用磁盘空间。

我们再看看存100w个不一样的整型会是如何。

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0     13.6mb         13.6mb 

-rw-r--r--  1 weizijun  staff   6.1M Aug 28 10:19 _42.fdt
-rw-r--r--  1 weizijun  staff    22K Aug 28 10:19 _42.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 28 10:19 _42.fnm
-rw-r--r--  1 weizijun  staff   503B Aug 28 10:19 _42.si
-rw-r--r--  1 weizijun  staff   2.8M Aug 28 10:19 _42_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   2.2M Aug 28 10:19 _42_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff    83K Aug 28 10:19 _42_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff   2.5M Aug 28 10:19 _42_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   228B Aug 28 10:19 _42_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   196B Aug 28 10:19 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 28 10:16 write.lock

从结果可以看到,100w整型数据,Elasticsearch的存储开销为13.6mb。如果以int型计算100w数据的长度的话,为400w字节,大概是3.8mb数据。忽略Elasticsearch唯一id字段的影响,Elasticsearch实际存储容量跟整型数据长度差不多。

我们再看一下开启最佳压缩参数对存储空间的影响:

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    107.2mb        107.2mb 

-rw-r--r--  1 weizijun  staff    25M Aug 20 12:30 _5.fdt
-rw-r--r--  1 weizijun  staff   6.0K Aug 20 12:30 _5.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 20 12:31 _5.fnm
-rw-r--r--  1 weizijun  staff   500B Aug 20 12:31 _5.si
-rw-r--r--  1 weizijun  staff   265K Aug 20 12:31 _5_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff    44M Aug 20 12:31 _5_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   322K Aug 20 12:31 _5_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 20 12:31 _5_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 20 12:31 _5_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   224B Aug 20 12:31 segments_4
-rw-r--r--  1 weizijun  staff     0B Aug 20 12:00 write.lock

结果中可以发现,只有正排数据会启动压缩,压缩能力确实强劲,不考虑唯一id字段,存储容量大概压缩到接近50%。

我们还做了一些实验,Elasticsearch默认是开启_all参数的,_all可以让用户传入的整体json数据作为全文检索的字段,可以更方便的检索,但在现实场景中已经使用的不多,相反会增加很多存储容量的开销,可以看下开启_all的磁盘空间使用情况:


health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    162.4mb        162.4mb 

-rw-r--r--  1 weizijun  staff    41M Aug 18 22:59 _20.fdt
-rw-r--r--  1 weizijun  staff    18K Aug 18 22:59 _20.fdx
-rw-r--r--  1 weizijun  staff   777B Aug 18 22:59 _20.fnm
-rw-r--r--  1 weizijun  staff    59B Aug 18 22:59 _20.nvd
-rw-r--r--  1 weizijun  staff    78B Aug 18 22:59 _20.nvm
-rw-r--r--  1 weizijun  staff   539B Aug 18 22:59 _20.si
-rw-r--r--  1 weizijun  staff   7.2M Aug 18 22:59 _20_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   4.2M Aug 18 22:59 _20_Lucene50_0.pos
-rw-r--r--  1 weizijun  staff    73M Aug 18 22:59 _20_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   832K Aug 18 22:59 _20_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 18 22:59 _20_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 18 22:59 _20_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   196B Aug 18 22:59 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 18 22:53 write.lock

开启_all比不开启多了40mb的存储空间,多的数据都在倒排索引上,大约会增加30%多的存储开销。所以线上都直接禁用。

然后我还做了其他几个尝试,为了验证存储容量是否和数据量成正比,写入1000w数据的uuid,发现存储容量基本为100w数据的10倍。我还验证了数据长度是否和数据量成正比,发现把uuid增长2倍、4倍,存储容量也响应的增加了2倍和4倍。在此就不一一列出数据了。

lucene各文件具体内容和实现

lucene数据元信息文件

文件名为:segments_xxx

该文件为lucene数据文件的元信息文件,记录所有segment的元数据信息。

该文件主要记录了目前有多少segment,每个segment有一些基本信息,更新这些信息定位到每个segment的元信息文件。

lucene元信息文件还支持记录userData,Elasticsearch可以在此记录translog的一些相关信息。

文件示例

elasticsearch_store_segments.png

具体实现类

public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo> {
  // generation是segment的版本的概念,从文件名中提取出来,实例中为:2t/101
  private long generation;     // generation of the "segments_N" for the next commit

  private long lastGeneration; // generation of the "segments_N" file we last successfully read
                               // or wrote; this is normally the same as generation except if
                               // there was an IOException that had interrupted a commit

  /** Id for this commit; only written starting with Lucene 5.0 */
  private byte[] id;

  /** Which Lucene version wrote this commit, or null if this commit is pre-5.3. */
  private Version luceneVersion;

  /** Counts how often the index has been changed.  */
  public long version;

  /** Used to name new segments. */
  // TODO: should this be a long ...?
  public int counter;

  /** Version of the oldest segment in the index, or null if there are no segments. */
  private Version minSegmentLuceneVersion;

  private List<SegmentCommitInfo> segments = new ArrayList<>();

  /** Opaque Map&lt;String, String&gt; that user can specify during IndexWriter.commit */
  public Map<String,String> userData = Collections.emptyMap();
}

/** Embeds a [read-only] SegmentInfo and adds per-commit
 *  fields.
 *
 *  @lucene.experimental */
public class SegmentCommitInfo {

  /** The {@link SegmentInfo} that we wrap. */
  public final SegmentInfo info;

  // How many deleted docs in the segment:
  private int delCount;

  // Generation number of the live docs file (-1 if there
  // are no deletes yet):
  private long delGen;

  // Normally 1+delGen, unless an exception was hit on last
  // attempt to write:
  private long nextWriteDelGen;

  // Generation number of the FieldInfos (-1 if there are no updates)
  private long fieldInfosGen;

  // Normally 1+fieldInfosGen, unless an exception was hit on last attempt to
  // write
  private long nextWriteFieldInfosGen; //fieldInfosGen == -1 ? 1 : fieldInfosGen + 1;

  // Generation number of the DocValues (-1 if there are no updates)
  private long docValuesGen;

  // Normally 1+dvGen, unless an exception was hit on last attempt to
  // write
  private long nextWriteDocValuesGen; //docValuesGen == -1 ? 1 : docValuesGen + 1;

  // TODO should we add .files() to FieldInfosFormat, like we have on
  // LiveDocsFormat?
  // track the fieldInfos update files
  private final Set<String> fieldInfosFiles = new HashSet<>();

  // Track the per-field DocValues update files
  private final Map<Integer,Set<String>> dvUpdatesFiles = new HashMap<>();

  // Track the per-generation updates files
  @Deprecated
  private final Map<Long,Set<String>> genUpdatesFiles = new HashMap<>();

  private volatile long sizeInBytes = -1;
}

segment的元信息文件

文件后缀:.si

每个segment都有一个.si文件,记录了该segment的元信息。

segment元信息文件中记录了segment的文档数量,segment对应的文件列表等信息。

文件示例

elasticsearch_store_si.png

具体实现类

/**
 * Information about a segment such as its name, directory, and files related
 * to the segment.
 *
 * @lucene.experimental
 */
public final class SegmentInfo {

  // _bl
  public final String name;

  /** Where this segment resides. */
  public final Directory dir;

  /** Id that uniquely identifies this segment. */
  private final byte[] id;

  private Codec codec;

  // Tracks the Lucene version this segment was created with, since 3.1. Null
  // indicates an older than 3.0 index, and it's used to detect a too old index.
  // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
  // specific versions afterwards ("3.0.0", "3.1.0" etc.).
  // see o.a.l.util.Version.
  private Version version;

  private int maxDoc;         // number of docs in seg

  private boolean isCompoundFile;

  private Map<String,String> diagnostics;

  private Set<String> setFiles;

  private final Map<String,String> attributes;
}

fields信息文件

文件后缀:.fnm

该文件存储了fields的基本信息。

fields信息中包括field的数量,field的类型,以及IndexOpetions,包括是否存储、是否索引,是否分词,是否需要列存等等。

文件示例

elasticsearch_store_fnm.png

具体实现类

/**
 *  Access to the Field Info file that describes document fields and whether or
 *  not they are indexed. Each segment has a separate Field Info file. Objects
 *  of this class are thread-safe for multiple readers, but only one thread can
 *  be adding documents at a time, with no other reader or writer threads
 *  accessing this object.
 **/
public final class FieldInfo {
  /** Field's name */
  public final String name;

  /** Internal field number */
  //field在内部的编号
  public final int number;

  //field docvalues的类型
  private DocValuesType docValuesType = DocValuesType.NONE;

  // True if any document indexed term vectors
  private boolean storeTermVector;

  private boolean omitNorms; // omit norms associated with indexed fields 

  //index的配置项
  private IndexOptions indexOptions = IndexOptions.NONE;

  private boolean storePayloads; // whether this field stores payloads together with term positions 

  private final Map<String,String> attributes;

  // docvalues的generation
  private long dvGen;
}

数据存储文件

文件后缀:.fdx, .fdt

索引文件为.fdx,数据文件为.fdt,数据存储文件功能为根据自动的文档id,得到文档的内容,搜索引擎的术语习惯称之为正排数据,即doc_id -> content,es的_source数据就存在这

索引文件记录了快速定位文档数据的索引信息,数据文件记录了所有文档id的具体内容。

文件示例

elasticsearch_store_fdt.png

具体实现类

/**
 * Random-access reader for {@link CompressingStoredFieldsIndexWriter}.
 * @lucene.internal
 */
public final class CompressingStoredFieldsIndexReader implements Cloneable, Accountable {
  private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CompressingStoredFieldsIndexReader.class);

  final int maxDoc;

  //docid索引,快速定位某个docid的数组坐标
  final int[] docBases;

  //快速定位某个docid所在的文件offset的startPointer
  final long[] startPointers;

  //平均一个chunk的文档数
  final int[] avgChunkDocs;

  //平均一个chunk的size
  final long[] avgChunkSizes;

  final PackedInts.Reader[] docBasesDeltas; // delta from the avg

  final PackedInts.Reader[] startPointersDeltas; // delta from the avg
}

/**
 * {@link StoredFieldsReader} impl for {@link CompressingStoredFieldsFormat}.
 * @lucene.experimental
 */
public final class CompressingStoredFieldsReader extends StoredFieldsReader {

  //从fdt正排索引文件中获得
  private final int version;

  // field的基本信息
  private final FieldInfos fieldInfos;

  //fdt正排索引文件reader
  private final CompressingStoredFieldsIndexReader indexReader;

  //从fdt正排索引文件中获得,用于指向fdx数据文件的末端,指向numChunks地址4
  private final long maxPointer;

  //fdx正排数据文件句柄
  private final IndexInput fieldsStream;

  //块大小
  private final int chunkSize;

  private final int packedIntsVersion;

  //压缩类型
  private final CompressionMode compressionMode;

  //解压缩处理对象
  private final Decompressor decompressor;

  //文档数量,从segment元数据中获得
  private final int numDocs;

  //是否正在merge,默认为false
  private final boolean merging;

  //初始化时new了一个BlockState,BlockState记录下当前正排文件读取的状态信息
  private final BlockState state;
  //chunk的数量
  private final long numChunks; // number of compressed blocks written

  //dirty chunk的数量
  private final long numDirtyChunks; // number of incomplete compressed blocks written

  //是否close,默认为false
  private boolean closed;
}

倒排索引文件

索引后缀:.tip,.tim

倒排索引也包含索引文件和数据文件,.tip为索引文件,.tim为数据文件,索引文件包含了每个字段的索引元信息,数据文件有具体的索引内容。

5.5.0版本的倒排索引实现为FST tree,FST tree的最大优势就是内存空间占用非常低 ,具体可以参看下这篇文章:http://www.cnblogs.com/bonelee/p/6226185.html

http://examples.mikemccandless.com/fst.py?terms=&cmd=Build+it 为FST图实例,可以根据输入的数据构造出FST图

输入到 FST 中的数据为:
String inputValues[] = {"mop","moth","pop","star","stop","top"};
long outputValues[] = {0,1,2,3,4,5};

生成的 FST 图为:

elasticsearch_store_tip1.png

elasticsearch_store_tip2.png

文件示例

elasticsearch_store_tip3.png

具体实现类

public final class BlockTreeTermsReader extends FieldsProducer {
  // Open input to the main terms dict file (_X.tib)
  final IndexInput termsIn;
  // Reads the terms dict entries, to gather state to
  // produce DocsEnum on demand
  final PostingsReaderBase postingsReader;
  private final TreeMap<String,FieldReader> fields = new TreeMap<>();

  /** File offset where the directory starts in the terms file. */
  /索引数据文件tim的数据的尾部的元数据的地址
  private long dirOffset;
  /** File offset where the directory starts in the index file. */

  //索引文件tip的数据的尾部的元数据的地址
  private long indexDirOffset;

  //semgent的名称
  final String segment;

  //版本号
  final int version;

  //5.3.x index, we record up front if we may have written any auto-prefix terms,示例中记录的是false
  final boolean anyAutoPrefixTerms;
}

/**
 * BlockTree's implementation of {@link Terms}.
 * @lucene.internal
 */
public final class FieldReader extends Terms implements Accountable {

  //term的数量
  final long numTerms;

  //field信息
  final FieldInfo fieldInfo;

  final long sumTotalTermFreq;

  //总的文档频率
  final long sumDocFreq;

  //文档数量
  final int docCount;

  //字段在索引文件tip中的起始位置
  final long indexStartFP;

  final long rootBlockFP;

  final BytesRef rootCode;

  final BytesRef minTerm;

  final BytesRef maxTerm;

  //longs:metadata buffer, holding monotonic values
  final int longsSize;

  final BlockTreeTermsReader parent;

  final FST<BytesRef> index;
}

倒排链文件

文件后缀:.doc, .pos, .pay

.doc保存了每个term的doc id列表和term在doc中的词频

全文索引的字段,会有.pos文件,保存了term在doc中的位置

全文索引的字段,使用了一些像payloads的高级特性才会有.pay文件,保存了term在doc中的一些高级特性

文件示例

elasticsearch_store_doc.png

具体实现类

/**
 * Concrete class that reads docId(maybe frq,pos,offset,payloads) list
 * with postings format.
 *
 * @lucene.experimental
 */
public final class Lucene50PostingsReader extends PostingsReaderBase {
  private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Lucene50PostingsReader.class);
  private final IndexInput docIn;
  private final IndexInput posIn;
  private final IndexInput payIn;
  final ForUtil forUtil;
  private int version;

  //不分词的字段使用的是该对象,基于skiplist实现了倒排链
  final class BlockDocsEnum extends PostingsEnum {
  }

  //全文检索字段使用的是该对象
  final class BlockPostingsEnum extends PostingsEnum {
  }

  //包含高级特性的字段使用的是该对象
  final class EverythingEnum extends PostingsEnum {
  }
}

列存文件(docvalues)

文件后缀:.dvm, .dvd

索引文件为.dvm,数据文件为.dvd。

lucene实现的docvalues有如下类型:

  • 1、NONE 不开启docvalue时的状态
  • 2、NUMERIC 单个数值类型的docvalue主要包括(int,long,float,double)
  • 3、BINARY 二进制类型值对应不同的codes最大值可能超过32766字节,
  • 4、SORTED 有序增量字节存储,仅仅存储不同部分的值和偏移量指针,值必须小于等于32766字节
  • 5、SORTED_NUMERIC 存储数值类型的有序数组列表
  • 6、SORTED_SET 可以存储多值域的docvalue值,但返回时,仅仅只能返回多值域的第一个docvalue
  • 7、对应not_anaylized的string字段,使用的是SORTED_SET类型,number的类型是SORTED_NUMERIC类型

其中SORTED_SET 的 SORTED_SINGLE_VALUED类型包括了两类数据 : binary + numeric, binary是按ord排序的term的列表,numeric是doc到ord的映射。

文件示例

elasticsearch_store_dvd.png

具体实现类

/** reader for {@link Lucene54DocValuesFormat} */
final class Lucene54DocValuesProducer extends DocValuesProducer implements Closeable {
  //number类型的field的列存列表
  private final Map<String,NumericEntry> numerics = new HashMap<>();

  //字符串类型的field的列存列表
  private final Map<String,BinaryEntry> binaries = new HashMap<>();

  //有序字符串类型的field的列存列表
  private final Map<String,SortedSetEntry> sortedSets = new HashMap<>();

  //有序number类型的field的列存列表
  private final Map<String,SortedSetEntry> sortedNumerics = new HashMap<>();

  //字符串类型的field的ords列表
  private final Map<String,NumericEntry> ords = new HashMap<>();

  //docId -> address -> ord 中field的ords列表
  private final Map<String,NumericEntry> ordIndexes = new HashMap<>();

  //field的数量
  private final int numFields;

  //内存使用量
  private final AtomicLong ramBytesUsed;

  //数据源的文件句柄
  private final IndexInput data;

  //文档数
  private final int maxDoc;
  // memory-resident structures
  private final Map<String,MonotonicBlockPackedReader> addressInstances = new HashMap<>();
  private final Map<String,ReverseTermsIndex> reverseIndexInstances = new HashMap<>();
  private final Map<String,DirectMonotonicReader.Meta> directAddressesMeta = new HashMap<>();

  //是否正在merge
  private final boolean merging;
}

/** metadata entry for a numeric docvalues field */
  static class NumericEntry {
    private NumericEntry() {}
    /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */
    long missingOffset;

    /** offset to the actual numeric values */
    //field的在数据文件中的起始地址
    public long offset;

    /** end offset to the actual numeric values */
    //field的在数据文件中的结尾地址
    public long endOffset;

    /** bits per value used to pack the numeric values */
    public int bitsPerValue;

    //format类型
    int format;
    /** count of values written */
    public long count;
    /** monotonic meta */
    public DirectMonotonicReader.Meta monotonicMeta;

    //最小的value
    long minValue;

    //Compressed by computing the GCD
    long gcd;

    //Compressed by giving IDs to unique values.
    long table[];
    /** for sparse compression */
    long numDocsWithValue;
    NumericEntry nonMissingValues;
    NumberType numberType;
  }

  /** metadata entry for a binary docvalues field */
  static class BinaryEntry {
    private BinaryEntry() {}
    /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */
    long missingOffset;
    /** offset to the actual binary values */
    //field的在数据文件中的起始地址
    long offset;
    int format;
    /** count of values written */
    public long count;

    //最短字符串的长度
    int minLength;

    //最长字符串的长度
    int maxLength;
    /** offset to the addressing data that maps a value to its slice of the byte[] */
    public long addressesOffset, addressesEndOffset;
    /** meta data for addresses */
    public DirectMonotonicReader.Meta addressesMeta;
    /** offset to the reverse index */
    public long reverseIndexOffset;
    /** packed ints version used to encode addressing information */
    public int packedIntsVersion;
    /** packed ints blocksize */
    public int blockSize;
  }

参考资料

lucene source code

lucene document

lucene字典实现原理——FST

继续阅读 »

前言

很多使用Elasticsearch的同学会关心数据存储在ES中的存储容量,会有这样的疑问:xxTB的数据入到ES会使用多少存储空间。这个问题其实很难直接回答的,只有数据写入ES后,才能观察到实际的存储空间。比如同样是1TB的数据,写入ES的存储空间可能差距会非常大,可能小到只有300~400GB,也可能多到6-7TB,为什么会造成这么大的差距呢?究其原因,我们来探究下Elasticsearch中的数据是如何存储。文章中我以Elasticsearch 2.3版本为示例,对应的lucene版本是5.5,Elasticsearch现在已经来到了6.5版本,数字类型、列存等存储结构有些变化,但基本的概念变化不多,文章中的内容依然适用。

Elasticsearch索引结构

Elasticsearch对外提供的是index的概念,可以类比为DB,用户查询是在index上完成的,每个index由若干个shard组成,以此来达到分布式可扩展的能力。比如下图是一个由10个shard组成的index。

elasticsearch_store_arc.png

shard是Elasticsearch数据存储的最小单位,index的存储容量为所有shard的存储容量之和。Elasticsearch集群的存储容量则为所有index存储容量之和。

一个shard就对应了一个lucene的library。对于一个shard,Elasticsearch增加了translog的功能,类似于HBase WAL,是数据写入过程中的中间数据,其余的数据都在lucene库中管理的。

所以Elasticsearch索引使用的存储内容主要取决于lucene中的数据存储。

lucene数据存储

下面我们主要看下lucene的文件内容,在了解lucene文件内容前,大家先了解些lucene的基本概念。

lucene基本概念

  • segment : lucene内部的数据是由一个个segment组成的,写入lucene的数据并不直接落盘,而是先写在内存中,经过了refresh间隔,lucene才将该时间段写入的全部数据refresh成一个segment,segment多了之后会进行merge成更大的segment。lucene查询时会遍历每个segment完成。由于lucene* 写入的数据是在内存中完成,所以写入效率非常高。但是也存在丢失数据的风险,所以Elasticsearch基于此现象实现了translog,只有在segment数据落盘后,Elasticsearch才会删除对应的translog。
  • doc : doc表示lucene中的一条记录
  • field :field表示记录中的字段概念,一个doc由若干个field组成。
  • term :term是lucene中索引的最小单位,某个field对应的内容如果是全文检索类型,会将内容进行分词,分词的结果就是由term组成的。如果是不分词的字段,那么该字段的内容就是一个term。
  • 倒排索引(inverted index): lucene索引的通用叫法,即实现了term到doc list的映射。
  • 正排数据:搜索引擎的通用叫法,即原始数据,可以理解为一个doc list。
  • docvalues :Elasticsearch中的列式存储的名称,Elasticsearch除了存储原始存储、倒排索引,还存储了一份docvalues,用作分析和排序。

lucene文件内容

lucene包的文件是由很多segment文件组成的,segments_xxx文件记录了lucene包下面的segment文件数量。每个segment会包含如下的文件。

Name Extension Brief Description
Segment Info .si segment的元数据文件
Compound File .cfs, .cfe 一个segment包含了如下表的各个文件,为减少打开文件的数量,在segment小的时候,segment的所有文件内容都保存在cfs文件中,cfe文件保存了lucene各文件在cfs文件的位置信息
Fields .fnm 保存了fields的相关信息
Field Index .fdx 正排存储文件的元数据信息
Field Data .fdt 存储了正排存储数据,写入的原文存储在这
Term Dictionary .tim 倒排索引的元数据信息
Term Index .tip 倒排索引文件,存储了所有的倒排索引数据
Frequencies .doc 保存了每个term的doc id列表和term在doc中的词频
Positions .pos Stores position information about where a term occurs in the index
全文索引的字段,会有该文件,保存了term在doc中的位置
Payloads .pay Stores additional per-position metadata information such as character offsets and user payloads
全文索引的字段,使用了一些像payloads的高级特性会有该文件,保存了term在doc中的一些高级特性
Norms .nvd, .nvm 文件保存索引字段加权数据
Per-Document Values .dvd, .dvm lucene的docvalues文件,即数据的列式存储,用作聚合和排序
Term Vector Data .tvx, .tvd, .tvf Stores offset into the document data file
保存索引字段的矢量信息,用在对term进行高亮,计算文本相关性中使用
Live Documents .liv 记录了segment中删除的doc

测试数据示例

下面我们以真实的数据作为示例,看看lucene中各类型数据的容量占比。

写100w数据,有一个uuid字段,写入的是长度为36位的uuid,字符串总为3600w字节,约为35M。

数据使用一个shard,不带副本,使用默认的压缩算法,写入完成后merge成一个segment方便观察。

使用线上默认的配置,uuid存为不分词的字符串类型。创建如下索引:

PUT test_field
{
  "settings": {
    "index": {
      "number_of_shards": "1",
      "number_of_replicas": "0",
      "refresh_interval": "30s"
    }
  },
  "mappings": {
    "type": {
      "_all": {
        "enabled": false
      }, 
      "properties": {
        "uuid": {
          "type": "string",
          "index": "not_analyzed"
        }
      }
    }
  }
}

首先写入100w不同的uuid,使用磁盘容量细节如下:


health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    122.7mb        122.7mb 

-rw-r--r--  1 weizijun  staff    41M Aug 19 21:23 _8.fdt
-rw-r--r--  1 weizijun  staff    17K Aug 19 21:23 _8.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 19 21:23 _8.fnm
-rw-r--r--  1 weizijun  staff   494B Aug 19 21:23 _8.si
-rw-r--r--  1 weizijun  staff   265K Aug 19 21:23 _8_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff    44M Aug 19 21:23 _8_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   340K Aug 19 21:23 _8_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 19 21:23 _8_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 19 21:23 _8_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   195B Aug 19 21:23 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 19 21:20 write.lock

可以看到正排数据、倒排索引数据,列存数据容量占比几乎相同,正排数据和倒排数据还会存储Elasticsearch的唯一id字段,所以容量会比列存多一些。

35M的uuid存入Elasticsearch后,数据膨胀了3倍,达到了122.7mb。Elasticsearch竟然这么消耗资源,不要着急下结论,接下来看另一个测试结果。

我们写入100w一样的uuid,然后看看Elasticsearch使用的容量。

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0     13.2mb         13.2mb 

-rw-r--r--  1 weizijun  staff   5.5M Aug 19 21:29 _6.fdt
-rw-r--r--  1 weizijun  staff    15K Aug 19 21:29 _6.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 19 21:29 _6.fnm
-rw-r--r--  1 weizijun  staff   494B Aug 19 21:29 _6.si
-rw-r--r--  1 weizijun  staff   309K Aug 19 21:29 _6_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   7.0M Aug 19 21:29 _6_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   195K Aug 19 21:29 _6_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff   244K Aug 19 21:29 _6_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   252B Aug 19 21:29 _6_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   195B Aug 19 21:29 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 19 21:26 write.lock

这回35M的数据Elasticsearch容量只有13.2mb,其中还有主要的占比还是Elasticsearch的唯一id,100w的uuid几乎不占存储容积。

所以在Elasticsearch中建立索引的字段如果基数越大(count distinct),越占用磁盘空间。

我们再看看存100w个不一样的整型会是如何。

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0     13.6mb         13.6mb 

-rw-r--r--  1 weizijun  staff   6.1M Aug 28 10:19 _42.fdt
-rw-r--r--  1 weizijun  staff    22K Aug 28 10:19 _42.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 28 10:19 _42.fnm
-rw-r--r--  1 weizijun  staff   503B Aug 28 10:19 _42.si
-rw-r--r--  1 weizijun  staff   2.8M Aug 28 10:19 _42_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   2.2M Aug 28 10:19 _42_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff    83K Aug 28 10:19 _42_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff   2.5M Aug 28 10:19 _42_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   228B Aug 28 10:19 _42_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   196B Aug 28 10:19 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 28 10:16 write.lock

从结果可以看到,100w整型数据,Elasticsearch的存储开销为13.6mb。如果以int型计算100w数据的长度的话,为400w字节,大概是3.8mb数据。忽略Elasticsearch唯一id字段的影响,Elasticsearch实际存储容量跟整型数据长度差不多。

我们再看一下开启最佳压缩参数对存储空间的影响:

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    107.2mb        107.2mb 

-rw-r--r--  1 weizijun  staff    25M Aug 20 12:30 _5.fdt
-rw-r--r--  1 weizijun  staff   6.0K Aug 20 12:30 _5.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 20 12:31 _5.fnm
-rw-r--r--  1 weizijun  staff   500B Aug 20 12:31 _5.si
-rw-r--r--  1 weizijun  staff   265K Aug 20 12:31 _5_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff    44M Aug 20 12:31 _5_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   322K Aug 20 12:31 _5_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 20 12:31 _5_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 20 12:31 _5_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   224B Aug 20 12:31 segments_4
-rw-r--r--  1 weizijun  staff     0B Aug 20 12:00 write.lock

结果中可以发现,只有正排数据会启动压缩,压缩能力确实强劲,不考虑唯一id字段,存储容量大概压缩到接近50%。

我们还做了一些实验,Elasticsearch默认是开启_all参数的,_all可以让用户传入的整体json数据作为全文检索的字段,可以更方便的检索,但在现实场景中已经使用的不多,相反会增加很多存储容量的开销,可以看下开启_all的磁盘空间使用情况:


health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    162.4mb        162.4mb 

-rw-r--r--  1 weizijun  staff    41M Aug 18 22:59 _20.fdt
-rw-r--r--  1 weizijun  staff    18K Aug 18 22:59 _20.fdx
-rw-r--r--  1 weizijun  staff   777B Aug 18 22:59 _20.fnm
-rw-r--r--  1 weizijun  staff    59B Aug 18 22:59 _20.nvd
-rw-r--r--  1 weizijun  staff    78B Aug 18 22:59 _20.nvm
-rw-r--r--  1 weizijun  staff   539B Aug 18 22:59 _20.si
-rw-r--r--  1 weizijun  staff   7.2M Aug 18 22:59 _20_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   4.2M Aug 18 22:59 _20_Lucene50_0.pos
-rw-r--r--  1 weizijun  staff    73M Aug 18 22:59 _20_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   832K Aug 18 22:59 _20_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 18 22:59 _20_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 18 22:59 _20_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   196B Aug 18 22:59 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 18 22:53 write.lock

开启_all比不开启多了40mb的存储空间,多的数据都在倒排索引上,大约会增加30%多的存储开销。所以线上都直接禁用。

然后我还做了其他几个尝试,为了验证存储容量是否和数据量成正比,写入1000w数据的uuid,发现存储容量基本为100w数据的10倍。我还验证了数据长度是否和数据量成正比,发现把uuid增长2倍、4倍,存储容量也响应的增加了2倍和4倍。在此就不一一列出数据了。

lucene各文件具体内容和实现

lucene数据元信息文件

文件名为:segments_xxx

该文件为lucene数据文件的元信息文件,记录所有segment的元数据信息。

该文件主要记录了目前有多少segment,每个segment有一些基本信息,更新这些信息定位到每个segment的元信息文件。

lucene元信息文件还支持记录userData,Elasticsearch可以在此记录translog的一些相关信息。

文件示例

elasticsearch_store_segments.png

具体实现类

public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo> {
  // generation是segment的版本的概念,从文件名中提取出来,实例中为:2t/101
  private long generation;     // generation of the "segments_N" for the next commit

  private long lastGeneration; // generation of the "segments_N" file we last successfully read
                               // or wrote; this is normally the same as generation except if
                               // there was an IOException that had interrupted a commit

  /** Id for this commit; only written starting with Lucene 5.0 */
  private byte[] id;

  /** Which Lucene version wrote this commit, or null if this commit is pre-5.3. */
  private Version luceneVersion;

  /** Counts how often the index has been changed.  */
  public long version;

  /** Used to name new segments. */
  // TODO: should this be a long ...?
  public int counter;

  /** Version of the oldest segment in the index, or null if there are no segments. */
  private Version minSegmentLuceneVersion;

  private List<SegmentCommitInfo> segments = new ArrayList<>();

  /** Opaque Map&lt;String, String&gt; that user can specify during IndexWriter.commit */
  public Map<String,String> userData = Collections.emptyMap();
}

/** Embeds a [read-only] SegmentInfo and adds per-commit
 *  fields.
 *
 *  @lucene.experimental */
public class SegmentCommitInfo {

  /** The {@link SegmentInfo} that we wrap. */
  public final SegmentInfo info;

  // How many deleted docs in the segment:
  private int delCount;

  // Generation number of the live docs file (-1 if there
  // are no deletes yet):
  private long delGen;

  // Normally 1+delGen, unless an exception was hit on last
  // attempt to write:
  private long nextWriteDelGen;

  // Generation number of the FieldInfos (-1 if there are no updates)
  private long fieldInfosGen;

  // Normally 1+fieldInfosGen, unless an exception was hit on last attempt to
  // write
  private long nextWriteFieldInfosGen; //fieldInfosGen == -1 ? 1 : fieldInfosGen + 1;

  // Generation number of the DocValues (-1 if there are no updates)
  private long docValuesGen;

  // Normally 1+dvGen, unless an exception was hit on last attempt to
  // write
  private long nextWriteDocValuesGen; //docValuesGen == -1 ? 1 : docValuesGen + 1;

  // TODO should we add .files() to FieldInfosFormat, like we have on
  // LiveDocsFormat?
  // track the fieldInfos update files
  private final Set<String> fieldInfosFiles = new HashSet<>();

  // Track the per-field DocValues update files
  private final Map<Integer,Set<String>> dvUpdatesFiles = new HashMap<>();

  // Track the per-generation updates files
  @Deprecated
  private final Map<Long,Set<String>> genUpdatesFiles = new HashMap<>();

  private volatile long sizeInBytes = -1;
}

segment的元信息文件

文件后缀:.si

每个segment都有一个.si文件,记录了该segment的元信息。

segment元信息文件中记录了segment的文档数量,segment对应的文件列表等信息。

文件示例

elasticsearch_store_si.png

具体实现类

/**
 * Information about a segment such as its name, directory, and files related
 * to the segment.
 *
 * @lucene.experimental
 */
public final class SegmentInfo {

  // _bl
  public final String name;

  /** Where this segment resides. */
  public final Directory dir;

  /** Id that uniquely identifies this segment. */
  private final byte[] id;

  private Codec codec;

  // Tracks the Lucene version this segment was created with, since 3.1. Null
  // indicates an older than 3.0 index, and it's used to detect a too old index.
  // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
  // specific versions afterwards ("3.0.0", "3.1.0" etc.).
  // see o.a.l.util.Version.
  private Version version;

  private int maxDoc;         // number of docs in seg

  private boolean isCompoundFile;

  private Map<String,String> diagnostics;

  private Set<String> setFiles;

  private final Map<String,String> attributes;
}

fields信息文件

文件后缀:.fnm

该文件存储了fields的基本信息。

fields信息中包括field的数量,field的类型,以及IndexOpetions,包括是否存储、是否索引,是否分词,是否需要列存等等。

文件示例

elasticsearch_store_fnm.png

具体实现类

/**
 *  Access to the Field Info file that describes document fields and whether or
 *  not they are indexed. Each segment has a separate Field Info file. Objects
 *  of this class are thread-safe for multiple readers, but only one thread can
 *  be adding documents at a time, with no other reader or writer threads
 *  accessing this object.
 **/
public final class FieldInfo {
  /** Field's name */
  public final String name;

  /** Internal field number */
  //field在内部的编号
  public final int number;

  //field docvalues的类型
  private DocValuesType docValuesType = DocValuesType.NONE;

  // True if any document indexed term vectors
  private boolean storeTermVector;

  private boolean omitNorms; // omit norms associated with indexed fields 

  //index的配置项
  private IndexOptions indexOptions = IndexOptions.NONE;

  private boolean storePayloads; // whether this field stores payloads together with term positions 

  private final Map<String,String> attributes;

  // docvalues的generation
  private long dvGen;
}

数据存储文件

文件后缀:.fdx, .fdt

索引文件为.fdx,数据文件为.fdt,数据存储文件功能为根据自动的文档id,得到文档的内容,搜索引擎的术语习惯称之为正排数据,即doc_id -> content,es的_source数据就存在这

索引文件记录了快速定位文档数据的索引信息,数据文件记录了所有文档id的具体内容。

文件示例

elasticsearch_store_fdt.png

具体实现类

/**
 * Random-access reader for {@link CompressingStoredFieldsIndexWriter}.
 * @lucene.internal
 */
public final class CompressingStoredFieldsIndexReader implements Cloneable, Accountable {
  private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CompressingStoredFieldsIndexReader.class);

  final int maxDoc;

  //docid索引,快速定位某个docid的数组坐标
  final int[] docBases;

  //快速定位某个docid所在的文件offset的startPointer
  final long[] startPointers;

  //平均一个chunk的文档数
  final int[] avgChunkDocs;

  //平均一个chunk的size
  final long[] avgChunkSizes;

  final PackedInts.Reader[] docBasesDeltas; // delta from the avg

  final PackedInts.Reader[] startPointersDeltas; // delta from the avg
}

/**
 * {@link StoredFieldsReader} impl for {@link CompressingStoredFieldsFormat}.
 * @lucene.experimental
 */
public final class CompressingStoredFieldsReader extends StoredFieldsReader {

  //从fdt正排索引文件中获得
  private final int version;

  // field的基本信息
  private final FieldInfos fieldInfos;

  //fdt正排索引文件reader
  private final CompressingStoredFieldsIndexReader indexReader;

  //从fdt正排索引文件中获得,用于指向fdx数据文件的末端,指向numChunks地址4
  private final long maxPointer;

  //fdx正排数据文件句柄
  private final IndexInput fieldsStream;

  //块大小
  private final int chunkSize;

  private final int packedIntsVersion;

  //压缩类型
  private final CompressionMode compressionMode;

  //解压缩处理对象
  private final Decompressor decompressor;

  //文档数量,从segment元数据中获得
  private final int numDocs;

  //是否正在merge,默认为false
  private final boolean merging;

  //初始化时new了一个BlockState,BlockState记录下当前正排文件读取的状态信息
  private final BlockState state;
  //chunk的数量
  private final long numChunks; // number of compressed blocks written

  //dirty chunk的数量
  private final long numDirtyChunks; // number of incomplete compressed blocks written

  //是否close,默认为false
  private boolean closed;
}

倒排索引文件

索引后缀:.tip,.tim

倒排索引也包含索引文件和数据文件,.tip为索引文件,.tim为数据文件,索引文件包含了每个字段的索引元信息,数据文件有具体的索引内容。

5.5.0版本的倒排索引实现为FST tree,FST tree的最大优势就是内存空间占用非常低 ,具体可以参看下这篇文章:http://www.cnblogs.com/bonelee/p/6226185.html

http://examples.mikemccandless.com/fst.py?terms=&cmd=Build+it 为FST图实例,可以根据输入的数据构造出FST图

输入到 FST 中的数据为:
String inputValues[] = {"mop","moth","pop","star","stop","top"};
long outputValues[] = {0,1,2,3,4,5};

生成的 FST 图为:

elasticsearch_store_tip1.png

elasticsearch_store_tip2.png

文件示例

elasticsearch_store_tip3.png

具体实现类

public final class BlockTreeTermsReader extends FieldsProducer {
  // Open input to the main terms dict file (_X.tib)
  final IndexInput termsIn;
  // Reads the terms dict entries, to gather state to
  // produce DocsEnum on demand
  final PostingsReaderBase postingsReader;
  private final TreeMap<String,FieldReader> fields = new TreeMap<>();

  /** File offset where the directory starts in the terms file. */
  /索引数据文件tim的数据的尾部的元数据的地址
  private long dirOffset;
  /** File offset where the directory starts in the index file. */

  //索引文件tip的数据的尾部的元数据的地址
  private long indexDirOffset;

  //semgent的名称
  final String segment;

  //版本号
  final int version;

  //5.3.x index, we record up front if we may have written any auto-prefix terms,示例中记录的是false
  final boolean anyAutoPrefixTerms;
}

/**
 * BlockTree's implementation of {@link Terms}.
 * @lucene.internal
 */
public final class FieldReader extends Terms implements Accountable {

  //term的数量
  final long numTerms;

  //field信息
  final FieldInfo fieldInfo;

  final long sumTotalTermFreq;

  //总的文档频率
  final long sumDocFreq;

  //文档数量
  final int docCount;

  //字段在索引文件tip中的起始位置
  final long indexStartFP;

  final long rootBlockFP;

  final BytesRef rootCode;

  final BytesRef minTerm;

  final BytesRef maxTerm;

  //longs:metadata buffer, holding monotonic values
  final int longsSize;

  final BlockTreeTermsReader parent;

  final FST<BytesRef> index;
}

倒排链文件

文件后缀:.doc, .pos, .pay

.doc保存了每个term的doc id列表和term在doc中的词频

全文索引的字段,会有.pos文件,保存了term在doc中的位置

全文索引的字段,使用了一些像payloads的高级特性才会有.pay文件,保存了term在doc中的一些高级特性

文件示例

elasticsearch_store_doc.png

具体实现类

/**
 * Concrete class that reads docId(maybe frq,pos,offset,payloads) list
 * with postings format.
 *
 * @lucene.experimental
 */
public final class Lucene50PostingsReader extends PostingsReaderBase {
  private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Lucene50PostingsReader.class);
  private final IndexInput docIn;
  private final IndexInput posIn;
  private final IndexInput payIn;
  final ForUtil forUtil;
  private int version;

  //不分词的字段使用的是该对象,基于skiplist实现了倒排链
  final class BlockDocsEnum extends PostingsEnum {
  }

  //全文检索字段使用的是该对象
  final class BlockPostingsEnum extends PostingsEnum {
  }

  //包含高级特性的字段使用的是该对象
  final class EverythingEnum extends PostingsEnum {
  }
}

列存文件(docvalues)

文件后缀:.dvm, .dvd

索引文件为.dvm,数据文件为.dvd。

lucene实现的docvalues有如下类型:

  • 1、NONE 不开启docvalue时的状态
  • 2、NUMERIC 单个数值类型的docvalue主要包括(int,long,float,double)
  • 3、BINARY 二进制类型值对应不同的codes最大值可能超过32766字节,
  • 4、SORTED 有序增量字节存储,仅仅存储不同部分的值和偏移量指针,值必须小于等于32766字节
  • 5、SORTED_NUMERIC 存储数值类型的有序数组列表
  • 6、SORTED_SET 可以存储多值域的docvalue值,但返回时,仅仅只能返回多值域的第一个docvalue
  • 7、对应not_anaylized的string字段,使用的是SORTED_SET类型,number的类型是SORTED_NUMERIC类型

其中SORTED_SET 的 SORTED_SINGLE_VALUED类型包括了两类数据 : binary + numeric, binary是按ord排序的term的列表,numeric是doc到ord的映射。

文件示例

elasticsearch_store_dvd.png

具体实现类

/** reader for {@link Lucene54DocValuesFormat} */
final class Lucene54DocValuesProducer extends DocValuesProducer implements Closeable {
  //number类型的field的列存列表
  private final Map<String,NumericEntry> numerics = new HashMap<>();

  //字符串类型的field的列存列表
  private final Map<String,BinaryEntry> binaries = new HashMap<>();

  //有序字符串类型的field的列存列表
  private final Map<String,SortedSetEntry> sortedSets = new HashMap<>();

  //有序number类型的field的列存列表
  private final Map<String,SortedSetEntry> sortedNumerics = new HashMap<>();

  //字符串类型的field的ords列表
  private final Map<String,NumericEntry> ords = new HashMap<>();

  //docId -> address -> ord 中field的ords列表
  private final Map<String,NumericEntry> ordIndexes = new HashMap<>();

  //field的数量
  private final int numFields;

  //内存使用量
  private final AtomicLong ramBytesUsed;

  //数据源的文件句柄
  private final IndexInput data;

  //文档数
  private final int maxDoc;
  // memory-resident structures
  private final Map<String,MonotonicBlockPackedReader> addressInstances = new HashMap<>();
  private final Map<String,ReverseTermsIndex> reverseIndexInstances = new HashMap<>();
  private final Map<String,DirectMonotonicReader.Meta> directAddressesMeta = new HashMap<>();

  //是否正在merge
  private final boolean merging;
}

/** metadata entry for a numeric docvalues field */
  static class NumericEntry {
    private NumericEntry() {}
    /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */
    long missingOffset;

    /** offset to the actual numeric values */
    //field的在数据文件中的起始地址
    public long offset;

    /** end offset to the actual numeric values */
    //field的在数据文件中的结尾地址
    public long endOffset;

    /** bits per value used to pack the numeric values */
    public int bitsPerValue;

    //format类型
    int format;
    /** count of values written */
    public long count;
    /** monotonic meta */
    public DirectMonotonicReader.Meta monotonicMeta;

    //最小的value
    long minValue;

    //Compressed by computing the GCD
    long gcd;

    //Compressed by giving IDs to unique values.
    long table[];
    /** for sparse compression */
    long numDocsWithValue;
    NumericEntry nonMissingValues;
    NumberType numberType;
  }

  /** metadata entry for a binary docvalues field */
  static class BinaryEntry {
    private BinaryEntry() {}
    /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */
    long missingOffset;
    /** offset to the actual binary values */
    //field的在数据文件中的起始地址
    long offset;
    int format;
    /** count of values written */
    public long count;

    //最短字符串的长度
    int minLength;

    //最长字符串的长度
    int maxLength;
    /** offset to the addressing data that maps a value to its slice of the byte[] */
    public long addressesOffset, addressesEndOffset;
    /** meta data for addresses */
    public DirectMonotonicReader.Meta addressesMeta;
    /** offset to the reverse index */
    public long reverseIndexOffset;
    /** packed ints version used to encode addressing information */
    public int packedIntsVersion;
    /** packed ints blocksize */
    public int blockSize;
  }

参考资料

lucene source code

lucene document

lucene字典实现原理——FST

收起阅读 »

社区日报 第471期 (2018-12-07)

1、使用Elasticsearch作为主数据存储实践
http://t.cn/EyS6wcL
2、Elasticsearch 6.x启动过程
http://t.cn/EyJgQ7U
3、Elasticsearch你知道多少?
http://t.cn/EyS6I4P

编辑:铭毅天下
归档: https://elasticsearch.cn/article/6177
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1、使用Elasticsearch作为主数据存储实践
http://t.cn/EyS6wcL
2、Elasticsearch 6.x启动过程
http://t.cn/EyJgQ7U
3、Elasticsearch你知道多少?
http://t.cn/EyS6I4P

编辑:铭毅天下
归档: https://elasticsearch.cn/article/6177
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

Day 6 - Logstash Pipeline-to-Pipeline 尝鲜

Logstash 在 6.0 推出了 multiple pipeline 的解决方案,即在一个 logstash 实例中可以同时进行多个独立数据流程的处理工作,如下图所示。

而在这之前用户只能通过在单机运行多个 logstash 实例或者在配置文件中增加大量 if-else 条件判断语句来解决。要使用 multiple pipeline 也很简单,只需要将不同的 pipeline 在 config/pipeline.yml中定义好即可,如下所示:

- pipeline.id: apache
  pipeline.batch.size: 125
  queue.type: persisted
  path.config: "/path/to/config/apache.cfg"
- pipeline.id: nginx
  path.config: "/path/to/config/nginx.cfg"

其中 apachenginx作为独立的 pipeline 执行,而且配置也可以独立设置,互不干扰。pipeline.yml的引入极大地简化了 logstash 的配置管理工作,使得新手也可以很快完成复杂的 ETL 配置。

在 6.3 版本中,Logstash 又增加了 Pipeline-to-Pipeline的管道机制(beta),即管道和管道之间可以连接在一起组成一个完成的数据处理流。熟悉 linux 的管道命令 |的同学应该可以很快明白这种模式的好处。这无疑使得 Logstash 的配置会更加灵活,今天我们就来了解下这种灵活自由的配置方式。

1. 上手

废话少说,快速上手。修改 config/pipeline.yml文件如下:

 - pipeline.id: upstream
   config.string: input { stdin {} } output { pipeline { send_to => [test_output] } }
 - pipeline.id: downstream
   config.string: input { pipeline { address => test_output } } output{ stdout{}}

然后运行 logstash,其中 -r 表示配置文件有改动时自动重新加载,方便我们调试。

bin/logstash -r

在终端随意输入字符(比如aaa)后回车,会看到屏幕输出了类似下面的内容,代表运行成功了。

{
    "@timestamp" => 2018-12-06T14:43:50.310Z,
    "@version" => "1",
    "message" => "aaa",
    "host" => "rockybean-MacBook-Pro.local"
}

我们再回头看下这个配置,upstreamoutput 使用了名为 pipeline 的 plugin,然后 send_to的输出对象test_output是在 downstreaminput pipeline plugin 中定义的。通过这个唯一的address(虚拟地址)就能够把不同的 pipeline 连接在一起组成一个更长的pipeline来处理数据。类似下图所示:

当数据由 upstream传递给 downstream时会进行一个复制操作,这也意味着在这两个 pipeline 中的数据是完全独立的,互不影响。有一点要注意的是:数据的复制会增加额外的性能开销,比如会加大 JVM Heap 的使用。

2. 使用场景

使用方法是不是很简单,接下来我们来看下官方为我们开的几个脑洞。

2.1 Distributor Pattern 分发者模式

该模式执行效果类似下图所示:

在一个 pipeline 处理输入,然后根据不同的数据类型再分发到对应的 Pipeline 去处理。这种模式的好处在于统一输入端口,隔离不同类型的处理配置文件,减少由于配置文件混合在一起带来的维护成本。大家可以想一想如果不用这种Pipeline-to-Pipeline的方式,我们如果轻松做到一个端口处理多个来源的数据呢?

这种模式的参考配置如下所示:

# config/pipelines.yml
- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [type] == apache {
          pipeline { send_to => weblogs }
        } else if [type] == system {
          pipeline { send_to => syslog }
        } else {
          pipeline { send_to => fallback }
        }
    }
- pipeline.id: weblog-processing
  config.string: |
    input { pipeline { address => weblogs } }
    filter {
       # Weblog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_a_host] }
    }
- pipeline.id: syslog-processing
  config.string: |
    input { pipeline { address => syslog } }
    filter {
       # Syslog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_b_host] }
    }
- pipeline.id: fallback-processing
    config.string: |
    input { pipeline { address => fallback } }
    output { elasticsearch { hosts => [es_cluster_b_host] } }

2.2 Output Isolator Pattern 输出隔离模式

虽然 Logstash 的一个 pipeline 可以配置多个 output,但是这多个 output 会相依为命,一旦某一个 output 出问题,会导致另一个 output 也无法接收新数据。而通过这种模式可以完美解决这个问题。其运行方式如下图所示:

通过输出到两个独立的 pipeline,解除相互之间的影响,比如 http service 出问题的时候,es 依然可以正常接收数据,而且两个 pipeline 可以配置独立的队列来保障数据的完备性,其配置如下所示:

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => es } }
    output { elasticsearch { } }
- pipeline.id: buffered-http
  queue.type: persisted
  config.string: |
    input { pipeline { address => http } }
    output { http { } }

2.3 Forked Path Pattern 克隆路径模式

这个模式类似 Output Isolator Pattern,只是在不同的 output pipeline 中可以配置不同的 filter 来完成各自输出的数据处理需求,这里就不展开讲了,可以参考如下的配置,其中不同 output pipeline 的 filter 是不同的,比如 partner 这个 pipeline 去掉了一些敏感数据:

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => ["internal-es", "partner-s3"] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => "internal-es" } }
    # Index the full event
    output { elasticsearch { } }
- pipeline.id: partner
  queue.type: persisted
  config.string: |
    input { pipeline { address => "partner-s3" } }
    filter {
      # Remove the sensitive data
      mutate { remove_field => 'sensitive-data' }
    }
    output { s3 { } } # Output to partner's bucket

2.4 Collector Pattern 收集者模式

从名字可以看出,该模式是将所有 Pipeline 汇集于一处的处理模式,如下图所示:

其配置参考如下:

# config/pipelines.yml
- pipeline.id: beats
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka
  config.string: |
    input { kafka { ... } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner
  # This common pipeline enforces the same logic whether data comes from Kafka or Beats
  config.string: |
    input { pipeline { address => commonOut } }
    filter {
      # Always remove sensitive data from all input sources
      mutate { remove_field => 'sensitive-data' }
    }
    output { elasticsearch { } }

3. 总结

本文简单给大家讲解了 Pipeline-to-Pipeline的使用方法及官方推荐的几种模式,希望可以给大家有所帮助。另外这个机制目前还处于 Beta 阶段,尝鲜需谨慎!

继续阅读 »

Logstash 在 6.0 推出了 multiple pipeline 的解决方案,即在一个 logstash 实例中可以同时进行多个独立数据流程的处理工作,如下图所示。

而在这之前用户只能通过在单机运行多个 logstash 实例或者在配置文件中增加大量 if-else 条件判断语句来解决。要使用 multiple pipeline 也很简单,只需要将不同的 pipeline 在 config/pipeline.yml中定义好即可,如下所示:

- pipeline.id: apache
  pipeline.batch.size: 125
  queue.type: persisted
  path.config: "/path/to/config/apache.cfg"
- pipeline.id: nginx
  path.config: "/path/to/config/nginx.cfg"

其中 apachenginx作为独立的 pipeline 执行,而且配置也可以独立设置,互不干扰。pipeline.yml的引入极大地简化了 logstash 的配置管理工作,使得新手也可以很快完成复杂的 ETL 配置。

在 6.3 版本中,Logstash 又增加了 Pipeline-to-Pipeline的管道机制(beta),即管道和管道之间可以连接在一起组成一个完成的数据处理流。熟悉 linux 的管道命令 |的同学应该可以很快明白这种模式的好处。这无疑使得 Logstash 的配置会更加灵活,今天我们就来了解下这种灵活自由的配置方式。

1. 上手

废话少说,快速上手。修改 config/pipeline.yml文件如下:

 - pipeline.id: upstream
   config.string: input { stdin {} } output { pipeline { send_to => [test_output] } }
 - pipeline.id: downstream
   config.string: input { pipeline { address => test_output } } output{ stdout{}}

然后运行 logstash,其中 -r 表示配置文件有改动时自动重新加载,方便我们调试。

bin/logstash -r

在终端随意输入字符(比如aaa)后回车,会看到屏幕输出了类似下面的内容,代表运行成功了。

{
    "@timestamp" => 2018-12-06T14:43:50.310Z,
    "@version" => "1",
    "message" => "aaa",
    "host" => "rockybean-MacBook-Pro.local"
}

我们再回头看下这个配置,upstreamoutput 使用了名为 pipeline 的 plugin,然后 send_to的输出对象test_output是在 downstreaminput pipeline plugin 中定义的。通过这个唯一的address(虚拟地址)就能够把不同的 pipeline 连接在一起组成一个更长的pipeline来处理数据。类似下图所示:

当数据由 upstream传递给 downstream时会进行一个复制操作,这也意味着在这两个 pipeline 中的数据是完全独立的,互不影响。有一点要注意的是:数据的复制会增加额外的性能开销,比如会加大 JVM Heap 的使用。

2. 使用场景

使用方法是不是很简单,接下来我们来看下官方为我们开的几个脑洞。

2.1 Distributor Pattern 分发者模式

该模式执行效果类似下图所示:

在一个 pipeline 处理输入,然后根据不同的数据类型再分发到对应的 Pipeline 去处理。这种模式的好处在于统一输入端口,隔离不同类型的处理配置文件,减少由于配置文件混合在一起带来的维护成本。大家可以想一想如果不用这种Pipeline-to-Pipeline的方式,我们如果轻松做到一个端口处理多个来源的数据呢?

这种模式的参考配置如下所示:

# config/pipelines.yml
- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [type] == apache {
          pipeline { send_to => weblogs }
        } else if [type] == system {
          pipeline { send_to => syslog }
        } else {
          pipeline { send_to => fallback }
        }
    }
- pipeline.id: weblog-processing
  config.string: |
    input { pipeline { address => weblogs } }
    filter {
       # Weblog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_a_host] }
    }
- pipeline.id: syslog-processing
  config.string: |
    input { pipeline { address => syslog } }
    filter {
       # Syslog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_b_host] }
    }
- pipeline.id: fallback-processing
    config.string: |
    input { pipeline { address => fallback } }
    output { elasticsearch { hosts => [es_cluster_b_host] } }

2.2 Output Isolator Pattern 输出隔离模式

虽然 Logstash 的一个 pipeline 可以配置多个 output,但是这多个 output 会相依为命,一旦某一个 output 出问题,会导致另一个 output 也无法接收新数据。而通过这种模式可以完美解决这个问题。其运行方式如下图所示:

通过输出到两个独立的 pipeline,解除相互之间的影响,比如 http service 出问题的时候,es 依然可以正常接收数据,而且两个 pipeline 可以配置独立的队列来保障数据的完备性,其配置如下所示:

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => es } }
    output { elasticsearch { } }
- pipeline.id: buffered-http
  queue.type: persisted
  config.string: |
    input { pipeline { address => http } }
    output { http { } }

2.3 Forked Path Pattern 克隆路径模式

这个模式类似 Output Isolator Pattern,只是在不同的 output pipeline 中可以配置不同的 filter 来完成各自输出的数据处理需求,这里就不展开讲了,可以参考如下的配置,其中不同 output pipeline 的 filter 是不同的,比如 partner 这个 pipeline 去掉了一些敏感数据:

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => ["internal-es", "partner-s3"] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => "internal-es" } }
    # Index the full event
    output { elasticsearch { } }
- pipeline.id: partner
  queue.type: persisted
  config.string: |
    input { pipeline { address => "partner-s3" } }
    filter {
      # Remove the sensitive data
      mutate { remove_field => 'sensitive-data' }
    }
    output { s3 { } } # Output to partner's bucket

2.4 Collector Pattern 收集者模式

从名字可以看出,该模式是将所有 Pipeline 汇集于一处的处理模式,如下图所示:

其配置参考如下:

# config/pipelines.yml
- pipeline.id: beats
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka
  config.string: |
    input { kafka { ... } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner
  # This common pipeline enforces the same logic whether data comes from Kafka or Beats
  config.string: |
    input { pipeline { address => commonOut } }
    filter {
      # Always remove sensitive data from all input sources
      mutate { remove_field => 'sensitive-data' }
    }
    output { elasticsearch { } }

3. 总结

本文简单给大家讲解了 Pipeline-to-Pipeline的使用方法及官方推荐的几种模式,希望可以给大家有所帮助。另外这个机制目前还处于 Beta 阶段,尝鲜需谨慎!

收起阅读 »

社区日报 第470期 (2018-12-06)

1.图解 Elasticsearch2.2.0 原理
http://t.cn/Eybu8a3
2.ElasticSearch 恢复类型之对等恢复
http://t.cn/EyaCYFV
3.使用带注释的文本插件搜索事物
http://t.cn/EyaCntg

编辑:金桥
归档:https://elasticsearch.cn/article/6175
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.图解 Elasticsearch2.2.0 原理
http://t.cn/Eybu8a3
2.ElasticSearch 恢复类型之对等恢复
http://t.cn/EyaCYFV
3.使用带注释的文本插件搜索事物
http://t.cn/EyaCntg

编辑:金桥
归档:https://elasticsearch.cn/article/6175
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

es创建索引失败

#10001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/mstore.log
  fields:
    env: uat-10001-log
  include_lines: ['ERROR']
#10001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/catalina.out
  fields:
    env: uat-10001-catalina
  include_lines: ['ERROR']
#11001-log
- type: log
  enabled: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/delivery.log
  include_lines: ['ERROR']
  fields:
    env: uat-11001-log
#11001-catalina.out
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/catalina.out
  fields:
    env: uat-11001-catalina
  include_lines: ['ERROR']
#12001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/mstore.log
  fields:
    env: uat-12001-log
  include_lines: ['ERROR']
#12001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/catalina.out
  fields:
    env: uat-12001-catalina
  include_lines: ['ERROR']
#13001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/pay-web-boss.log
  fields:
    env: uat-13001-log
    include_lines: ['ERROR']
#13001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/catalina.out
  fields:
    env: uat-13001-catalina
  include_lines: ['ERROR']
#14001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/pay-web-gateway.log
  fields:
    env: uat-14001-log
  include_lines: ['ERROR']
#14001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/catalina.out
  fields:
    env: uat-14001-catalina
  include_lines: ['ERROR']
#15001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/roncoo-pay-web-merchant.log
  fields:
    env: uat-15001-log
  include_lines: ['ERROR']
#15001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/catalina.out
  fields:
    env: uat-15001-catalina
  include_lines: ['ERROR']      每次创建索引会少uat-11001-log和uat-12001-log,filebeat读取了这个两个日志文件
继续阅读 »
#10001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/mstore.log
  fields:
    env: uat-10001-log
  include_lines: ['ERROR']
#10001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/catalina.out
  fields:
    env: uat-10001-catalina
  include_lines: ['ERROR']
#11001-log
- type: log
  enabled: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/delivery.log
  include_lines: ['ERROR']
  fields:
    env: uat-11001-log
#11001-catalina.out
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/catalina.out
  fields:
    env: uat-11001-catalina
  include_lines: ['ERROR']
#12001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/mstore.log
  fields:
    env: uat-12001-log
  include_lines: ['ERROR']
#12001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/catalina.out
  fields:
    env: uat-12001-catalina
  include_lines: ['ERROR']
#13001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/pay-web-boss.log
  fields:
    env: uat-13001-log
    include_lines: ['ERROR']
#13001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/catalina.out
  fields:
    env: uat-13001-catalina
  include_lines: ['ERROR']
#14001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/pay-web-gateway.log
  fields:
    env: uat-14001-log
  include_lines: ['ERROR']
#14001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/catalina.out
  fields:
    env: uat-14001-catalina
  include_lines: ['ERROR']
#15001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/roncoo-pay-web-merchant.log
  fields:
    env: uat-15001-log
  include_lines: ['ERROR']
#15001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/catalina.out
  fields:
    env: uat-15001-catalina
  include_lines: ['ERROR']      每次创建索引会少uat-11001-log和uat-12001-log,filebeat读取了这个两个日志文件 收起阅读 »