搭建Elasitc stack集群需要注意的日志问题
默认分类 • 点火三周 发表了文章 • 0 个评论 • 4317 次浏览 • 2018-12-12 11:07
@[toc]
搭建Elasitc stack集群时,我们往往把大部分注意力放在集群的搭建,索引的优化,分片的设置上等具体的调优参数上,很少有人会去关心Elasitc stack的日志配置的问题,大概是觉得,日志应该是一个公共的问题,默认的配置应该已经为我们处理好了。但很不幸,在不同的机器配置或者不同的运营策略下,如果采用默认的配置,会给我们带来麻烦。
默认配置带来的麻烦
以下例子是默认情况下,当Elasitc stack集群运行超过3个月之后的情况:
elasticsearch
elasticsearch默认情况下会每天rolling一个文件,当到达2G的时候,才开始清除超出的部分,当一个文件只有几十K的时候,文件会一直累计下来。
logstash
一直增长的gc文件和不停增多的rolling日志文件
kibana
默认日志输出到kibana.out文件当中,这个文件会变得越来越大
kafka
这里提到kafka是因为在大部分的架构当中,我们都会用到kafka作为中间件数据缓冲区,因此不得不维护kafka集群。同样,如果不做特定的配置,也会遇到日志的问题:不停增多的rolling日志文件
原因是kafka的默认log4j配置是使用DailyRollingFileAppender每隔一个小时生成一个文件 '.'yyyy-MM-dd-HH:
<br /> log4j.appender.stdout=org.apache.log4j.ConsoleAppender<br /> log4j.appender.stdout.layout=org.apache.log4j.PatternLayout<br /> log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n<br /> <br /> log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender<br /> log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH<br /> log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log<br /> log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout<br /> log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n<br /> <br /> log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender<br /> log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH<br /> log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log<br /> log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout<br /> log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n<br /> <br /> log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender<br /> log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH<br /> log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log<br /> log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout<br /> log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n<br /> <br /> log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender<br /> log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH<br /> log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log<br /> log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout<br /> log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n<br /> <br /> log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender<br /> log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH<br /> log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log<br /> log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout<br /> log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n<br /> <br /> log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender<br /> log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH<br /> log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log<br /> log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout<br /> log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n<br />
解决方案
因此,对于我们需要维护的这几个组件,需要配置合理的日志rotate策略。一个比较常用的策略就是时间+size,每天rotate一个日志文件或者每当日志文件大小超过256M,rotate一个新的日志文件,并且最多保留7天之内的日志文件。
elasticsearch
通过修改log4j2.properties文件来解决。该文件在/etc/elasticsesarch目录下(或者config目录)。
默认配置是:
<br /> appender.rolling.type = RollingFile <br /> appender.rolling.name = rolling<br /> appender.rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}.log <br /> appender.rolling.layout.type = PatternLayout<br /> appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %.-10000m%n<br /> appender.rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}-%d{yyyy-MM-dd}-%i.log.gz <br /> appender.rolling.policies.type = Policies<br /> appender.rolling.policies.time.type = TimeBasedTriggeringPolicy <br /> appender.rolling.policies.time.interval = 1 <br /> appender.rolling.policies.time.modulate = true <br /> appender.rolling.policies.size.type = SizeBasedTriggeringPolicy <br /> appender.rolling.policies.size.size = 256MB <br /> appender.rolling.strategy.type = DefaultRolloverStrategy<br /> appender.rolling.strategy.fileIndex = nomax<br /> appender.rolling.strategy.action.type = Delete <br /> appender.rolling.strategy.action.basepath = ${sys:es.logs.base_path}<br /> appender.rolling.strategy.action.condition.type = IfFileName <br /> appender.rolling.strategy.action.condition.glob = ${sys:es.logs.cluster_name}-* <br /> appender.rolling.strategy.action.condition.nested_condition.type = IfAccumulatedFileSize <br /> appender.rolling.strategy.action.condition.nested_condition.exceeds = 2GB <br />
以上默认配置,会保存2GB的日志,只有累计的日志大小超过2GB的时候,才会删除旧的日志文件。
建议改为如下配置,仅保留最近7天的日志
<br /> appender.rolling.strategy.type = DefaultRolloverStrategy<br /> appender.rolling.strategy.action.type = Delete<br /> appender.rolling.strategy.action.basepath = ${sys:es.logs.base_path}<br /> appender.rolling.strategy.action.condition.type = IfFileName<br /> appender.rolling.strategy.action.condition.glob = ${sys:es.logs.cluster_name}-*<br /> appender.rolling.strategy.action.condition.nested_condition.type = IfLastModified<br /> appender.rolling.strategy.action.condition.nested_condition.age = 7D<br />
这里必须注意,log4j2会因为末尾的空格导致无法识别配置
logstash
与elasticsearch类似,通过修改log4j2.properties文件来解决。该文件在/etc/logstash目录下(或者config目录)。
默认配置是不会删除历史日志的:
<br /> status = error<br /> name = LogstashPropertiesConfig<br /> <br /> appender.console.type = Console<br /> appender.console.name = plain_console<br /> appender.console.layout.type = PatternLayout<br /> appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n<br /> <br /> appender.json_console.type = Console<br /> appender.json_console.name = json_console<br /> appender.json_console.layout.type = JSONLayout<br /> appender.json_console.layout.compact = true<br /> appender.json_console.layout.eventEol = true<br /> <br /> appender.rolling.type = RollingFile<br /> appender.rolling.name = plain_rolling<br /> appender.rolling.fileName = ${sys:ls.logs}/logstash-${sys:ls.log.format}.log<br /> appender.rolling.filePattern = ${sys:ls.logs}/logstash-${sys:ls.log.format}-%d{yyyy-MM-dd}.log<br /> appender.rolling.policies.type = Policies<br /> appender.rolling.policies.time.type = TimeBasedTriggeringPolicy<br /> appender.rolling.policies.time.interval = 1<br /> appender.rolling.policies.time.modulate = true<br /> appender.rolling.layout.type = PatternLayout<br /> appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %-.10000m%n<br />
需手动加上:
<br /> appender.rolling.strategy.type = DefaultRolloverStrategy<br /> appender.rolling.strategy.action.type = Delete<br /> appender.rolling.strategy.action.basepath = ${sys:ls.logs}<br /> appender.rolling.strategy.action.condition.type = IfFileName<br /> appender.rolling.strategy.action.condition.glob = ${sys:ls.logs}/logstash-${sys:ls.log.format}<br /> appender.rolling.strategy.action.condition.nested_condition.type = IfLastModified<br /> appender.rolling.strategy.action.condition.nested_condition.age = 7D<br />
kibana
在kibana的配置文件中,只有以下几个选项:
<br /> logging.dest:<br /> Default: stdout Enables you specify a file where Kibana stores log output.<br /> logging.quiet:<br /> Default: false Set the value of this setting to true to suppress all logging output other than error messages.<br /> logging.silent:<br /> Default: false Set the value of this setting to true to suppress all logging output.<br /> logging.verbose:<br /> Default: false Set the value of this setting to true to log all events, including system usage information and all requests. Supported on Elastic Cloud Enterprise.<br /> logging.timezone<br /> Default: UTC Set to the canonical timezone id (e.g. US/Pacific) to log events using that timezone. A list of timezones can be referenced at <a href="https://en.wikipedia.org/wiki/List_of_tz_database_time_zones." rel="nofollow" target="_blank">https://en.wikipedia.org/wiki/ ... ones.</a><br />
我们可以指定输出的日志文件与日志内容,但是却不可以配置日志的rotate。这时,我们需要使用logrotate,这个linux默认安装的工具。
首先,我们要在配置文件里面指定生成pid文件:
<br /> pid.file: "pid.log"<br />
然后,修改/etc/logrotate.conf:
<br /> /var/log/kibana {<br /> missingok<br /> notifempty<br /> sharedscripts<br /> daily<br /> rotate 7<br /> copytruncate<br /> /bin/kill -HUP $(cat /usr/share/kibana/pid.log 2>/dev/null) 2>/dev/null<br /> endscript<br /> }<br />
kafka
如果不想写脚本清理过多的文件的话,需要修改config/log4j.properties文件。使用RollingFileAppender代替DailyRollingFileAppender,同时设置MaxFileSize和MaxBackupIndex。即修改为:
```
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.MaxFileSize=10MB
log4j.appender.kafkaAppender.MaxBackupIndex=10
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.MaxFileSize=10M
log4j.appender.stateChangeAppender.MaxBackupIndex=10
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.MaxFileSize=10MB
log4j.appender.requestAppender.MaxBackupIndex=10
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.MaxFileSize=10MB
log4j.appender.cleanerAppender.MaxBackupIndex=10
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.controllerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.MaxFileSize=10MB
log4j.appender.controllerAppender.MaxBackupIndex=10
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.MaxFileSize=10MB
log4j.appender.authorizerAppender.MaxBackupIndex=10
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
Turn on all our debugging info
log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
log4j.logger.kafka.perf=DEBUG, kafkaAppender
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
log4j.logger.kafka=INFO, kafkaAppender
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false
log4j.logger.kafka.network.Processor=TRACE, requestAppender
log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false
log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false
log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false
Change this to debug to get the actual audit log for authorizer.
log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false
```
Kibana优化过程(Optimize)过长或无法结束的解决方案
Kibana • 点火三周 发表了文章 • 5 个评论 • 7165 次浏览 • 2018-12-12 11:06
使用过Kibana的同学应该都知道,当我们在kibana的配置文件中打开或者关闭功能,或者安装、卸载额外的插件后,重启kibana会触发一个优化的过程(optimize),如下图:
这个过程或长或短,视你电脑的性能而定。这里简单介绍一下该过程所要完成的事情。
Kibana是一个单页Web应用
首先,Kibana是一个单页的web应用。何为单页web应用?即所有的页面的读取都是在浏览器上完成,而与后台服务器无关。与后台服务器的通信只关乎数据,而非页面。所以,应用上所有的UI都被打包在一起,一次性的发送到了浏览器端,而不是通过URL到后台进行获取。所以,我们看到kibana的首页是下面这样的:
<a href="http://localhost:5601/app/kibana#/" rel="nofollow" target="_blank">http://localhost:5601/app/kibana#/</a>
注意这里的#后,代表#后面的内容会被浏览器提取,不往服务器端进行url的情况,而是在浏览器上进行内部重新渲染。因为所有的页面都是存储在浏览器的,所有在初次访问的时候,会加载大量的代码到浏览器端,这些代码都是被压缩过的bundle文件:
而optimize的过程,就是把这些原本可读性的源代码压缩为bundle.js的过程。因此,每当你对Kibana进行裁剪之后重启,因为前端的部分是完全由浏览器负责的,所有bundle文件需要重新生成后再发给浏览器,所以会触发optimize的过程。
Kibana在6.2.0版本之后,常规版本已经默认自带了xpack(当然,你还是可以直接下载不带xpack的开源社区版),导致Kibana的size已经到了200M左右,而且越往后的版本,功能越多,代码量越大,每次optimize的过程都会耗费更多的时间。一般来说,我们会将Kibana部署在单独的机器上,因为这仅仅是一个web后端,通常我们不会分配比较优质的资源,(2C4G都算浪费的了),这种情况下面,每次我们裁剪后重启Kibana都会耗费半个小时~1个小时的时间,更有甚者直接hang住,查看系统日志才知道OOM了。
Nodejs的内存机制
Kibana是用Nodejs编写的程序,在一般的后端语言中,基本的内存使用上基本没有什么限制,但是在nodeJs中却只能使用部分内存。在64位系统下位约为1.4G,在32位系统下约为0.7G,造成这个问题的主要原因是因为nodeJs基于V8构建,V8使用自己的方式来管理和分配内存,这一套管理方式在浏览器端使用绰绰有余,但是在nodeJs中这却限制了开发者,在应用中如果碰到了这个限制,就会造成进程退出。
Nodejs内存机制对Kibana优化的影响
因为Kibana的代码体量越来越大,将所有的代码加载到内存之后,再解析语法树,进行bundle的转换所耗费的内存已经接近1.4G的限制了,当你安装更多插件,比如sentinl的时候,系统往往已经无法为继,导致Kibana无法启动
解决方案
这种情况下,我们需要在Kibana启动的时候,指定NodeJs使用更多的内存。这个可以通过设置Node的环境变量办到。
<br /> NODE_OPTIONS="--max-old-space-size=4096"<br />
当然,我的建议是直接指定在kibana的启动脚本当中,修改/usr/share/kibana/bin/kibana文件为:
```shell
!/bin/sh
SCRIPT=$0
SCRIPT may be an arbitrarily deep series of symlinks. Loop until we have the concrete path.
while [ -h "$SCRIPT" ] ; do
ls=$(ls -ld "$SCRIPT")
Drop everything prior to ->
link=$(expr "$ls" : '.-> (.)$')
if expr "$link" : '/.*' > /dev/null; then
SCRIPT="$link"
else
SCRIPT=$(dirname "$SCRIPT")/"$link"
fi
done
DIR="$(dirname "${SCRIPT}")/.."
NODE="${DIR}/node/bin/node"
test -x "$NODE" || NODE=$(which node)
if [ ! -x "$NODE" ]; then
echo "unable to find usable node.js executable."
exit 1
fi
NODE_ENV=production exec "${NODE}" $NODE_OPTIONS --max_old_space_size=3072 --no-warnings "${DIR}/src/cli" ${@}
``<br /> 改动在最后一句:NODE_ENV=production exec "${NODE}" $NODE_OPTIONS --max_old_space_size=3072 --no-warnings "${DIR}/src/cli" ${@}`
这样,我们可以保证Kibana能顺利的完成optimize的过程
ik分词器搜索不出单词、数字等
Elasticsearch • 端小强 回复了问题 • 5 人关注 • 4 个回复 • 8280 次浏览 • 2018-12-12 19:29
es索引更新时通过mapping限制指定字段更新
Elasticsearch • medcl 回复了问题 • 2 人关注 • 2 个回复 • 6867 次浏览 • 2018-12-11 20:34
elastic搜索排序问题,如果人为的影响score得分结果?
Elasticsearch • zz_hello 回复了问题 • 5 人关注 • 3 个回复 • 6108 次浏览 • 2018-12-12 10:26
ES5.3 BM25文档score=25000,explain后计算过程没问题,score为什么不在1-20范围呢?
Elasticsearch • medcl 回复了问题 • 4 人关注 • 2 个回复 • 4186 次浏览 • 2018-12-11 18:46
社区日报 第475期 (2018-12-11)
社区日报 • kimichen123 发表了文章 • 0 个评论 • 2049 次浏览 • 2018-12-11 14:22
http://t.cn/EUwFsy6
2、Elasticsearch检索 — 聚合和LBS
http://t.cn/EU7qsRb
3、有赞订单管理的三生三世与 “十面埋伏”
http://t.cn/EU75ZTF
编辑:叮咚光军
归档:https://elasticsearch.cn/article/6187
订阅:https://tinyletter.com/elastic-daily
ES集群多实例分片不能恢复;
Elasticsearch • zqc0512 回复了问题 • 4 人关注 • 6 个回复 • 3535 次浏览 • 2018-12-11 16:07
如何在搜索结果中加入概率?
Elasticsearch • medcl 回复了问题 • 3 人关注 • 2 个回复 • 1791 次浏览 • 2018-12-11 13:03
logstash日志grok然后output问题
Logstash • rochy 回复了问题 • 2 人关注 • 1 个回复 • 4401 次浏览 • 2018-12-11 11:16
Day 11 -父子关系维护检索实战一 - Elasticsearch 5.x-父子关系维护
Elasticsearch • yinbp 发表了文章 • 0 个评论 • 5355 次浏览 • 2018-12-11 10:00
- 父子关系维护检索实战一 Elasticsearch 5.x 父子关系维护检索实战
- 父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检索实战
本文是其中第一篇- Elasticsearch 5.x 父子关系维护检索实战,涵盖以下部分内容:
- Elasticsearch 5.x 中父子关系mapping结构设计
- Elasticsearch 5.x 中维护父子关系数据
- Elasticsearch 5.x 中has_child和has_parent查询的基本用法
- Elasticsearch 5.x 中如何在检索中同时返回父子数据
案例说明
以一个体检记录相关的数据来介绍本文涉及的相关功能,体检数据包括客户基本信息basic和客户医疗记录medical、客户体检记录exam、客户体检结果分析记录diagnosis,它们之间的关系图如下:
我们采用Elasticsearch java客户端 bboss-elastic 来实现本文相关功能。
1.准备工作
参考文档《高性能elasticsearch ORM开发库使用介绍》导入和配置bboss客户端
2.定义mapping结构-Elasticsearch 5.x 中父子关系mapping结构设计
Elasticsearch 5.x中一个indice mapping支持多个mapping type,通过在子类型mapping中指定父类型的mapping type名字来设置父子关系,例如:
父类型
"basic": {
....
}
子类型:
"medical": {
"_parent": { "type": "basic" },
.................
}
新建dsl配置文件-esmapper/Client_Info.xml,定义完整的mapping结构:createClientIndice
<properties>
<!--
创建客户信息索引索引表
-->
<property name="createClientIndice">
<![CDATA[{
"settings": {
"number_of_shards": 6,
"index.refresh_interval": "5s"
},
"mappings": {
"basic": { ##基本信息
"properties": {
"party_id": {
"type": "keyword"
},
"sex": {
"type": "keyword"
},
"mari_sts": {
"type": "keyword"
},
"ethnic": {
"type": "text"
},
"prof": {
"type": "text"
},
"province": {
"type": "text"
},
"city": {
"type": "text"
},
"client_type": {
"type": "keyword"
},
"client_name": {
"type": "text"
},
"age": {
"type": "integer"
},
"id_type": {
"type": "keyword"
},
"idno": {
"type": "keyword"
},
"education": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"diagnosis": { ##结果分析
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"provider": {
"type": "text"
},
"subject": {
"type": "text"
},
"diagnosis_type": {
"type": "text"
},
"icd10_code": {
"type": "text",
"type": "keyword"
},
"sd_disease_name": {
"type": "text",
"type": "keyword"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"medical": { ##医疗情况
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hos_name_yb": {
"type": "text"
},
"eivisions_name": {
"type": "text"
},
"medical_type": {
"type": "text"
},
"medical_common_name": {
"type": "text"
},
"medical_sale_name": {
"type": "text"
},
"medical_code": {
"type": "text"
},
"specification": {
"type": "text"
},
"usage_num": {
"type": "text"
},
"unit": {
"type": "text"
},
"usage_times": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"exam": { ##检查结果
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hospital": {
"type": "text"
},
"dept": {
"type": "text"
},
"is_ok": {
"type": "text"
},
"exam_result": {
"type": "text"
},
"fld1": {
"type": "text"
},
"fld2": {
"type": "text"
},
"fld3": {
"type": "text"
},
"fld4": {
"type": "text"
},
"fld5": {
"type": "text"
},
"fld901": {
"type": "text"
},
"fld6": {
"type": "text"
},
"fld902": {
"type": "text"
},
"fld14": {
"type": "text"
},
"fld20": {
"type": "text"
},
"fld21": {
"type": "text"
},
"fld23": {
"type": "text"
},
"fld24": {
"type": "text"
},
"fld65": {
"type": "text"
},
"fld66": {
"type": "text"
},
"fld67": {
"type": "text"
},
"fld68": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
}
}]]>
</property>
</properties>这个mapping中定义了4个索引类型:basic,exam,medical,diagnosis,其中basic是其他类型的父类型。
通过bboss客户端创建名称为client_info 的索引:
public void createClientIndice(){
//定义客户端实例,加载上面建立的dsl配置文件
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
try {
//client_info存在返回true,不存在返回false
boolean exist = clientUtil.existIndice("client_info");
//如果索引表client_info已经存在先删除mapping
if(exist) {//先删除mapping client_info
clientUtil.dropIndice("client_info");
}
} catch (ElasticSearchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//创建mapping client_info
clientUtil.createIndiceMapping("client_info","createClientIndice");
String client_info = clientUtil.getIndice("client_info");//获取最新建立的索引表结构client_info
System.out.println("after createClientIndice clientUtil.getIndice(\"client_info\") response:"+client_info);
}3.维护父子关系数据-Elasticsearch 5.x 中维护父子关系数据
- 定义对象
首先定义四个对象,分别对应mapping中的四个索引类型,篇幅关系只列出主要属性
- Basic
- Medical
- Exam
- Diagnosis
通过注解@ESId指定基本信息文档_id
public class Basic extends ESBaseData {
/**
* 索引_id
*/
@ESId
private String party_id;
private String sex; // 性别
......
}
通过注解@ESParentId指定Medical关联的基本信息文档_id,Medical文档_id由ElasticSearch自动生成
public class Medical extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hos_name_yb; //就诊医院
...
}
通过注解@ESParentId指定Exam关联的基本信息文档_id,Exam文档_id由ElasticSearch自动生成
public class Exam extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hospital; // 就诊医院
....
}
通过注解@ESParentId指定Diagnosis关联的基本信息文档_id,Diagnosis文档_id由ElasticSearch自动生成
public class Diagnosis extends ESBaseData {
@ESParentId
private String party_id; //父id
private String provider; //诊断医院
private String subject; //科室
......
} - 通过api维护测试数据
对象定义好了后,通过bboss客户数据到之前建立好的索引client_info中。
/**
* 录入体检医疗信息
*/
public void importClientInfoDataFromBeans() {
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
//导入基本信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Basic> basics = buildBasics();
clientUtil.addDocuments("client_info","basic",basics,"refresh");
//导入医疗信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Medical> medicals = buildMedicals();
clientUtil.addDocuments("client_info","medical",medicals,"refresh");
//导入体检结果数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Exam> exams = buildExams();
clientUtil.addDocuments("client_info","exam",exams,"refresh");
//导入结果诊断数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Diagnosis> diagnosiss = buildDiagnosiss();
clientUtil.addDocuments("client_info","diagnosis",diagnosiss,"refresh");
}
//构建基本信息集合
private List<Basic> buildBasics() {
List<Basic> basics = new ArrayList<Basic>();
Basic basic = new Basic();
basic.setParty_id("1");
basic.setAge(60);
basics.add(basic);
//继续添加其他数据
return basics;
}
//
构建医疗信息集合
private List<Medical> buildMedicals() {
List<Medical> medicals = new ArrayList<Medical>();
Medical medical = new Medical();
medical.setParty_id("1");//设置父文档id-基本信息文档_id
medical.setCreated_date(new Date());
medicals.add(medical);
//继续添加其他数据
return medicals;
}
//构建体检结果数据集合
private List<Exam> buildExams() {
List<Exam> exams = new ArrayList<Exam>();
Exam exam = new Exam();
exam.setParty_id("1");//设置父文档id-基本信息文档_id
exams.add(exam);
//继续添加其他数据
return exams;
}
//构建结果诊断数据集合
private List<Diagnosis> buildDiagnosiss() {
List<Diagnosis> diagnosiss = new ArrayList<Diagnosis>();
Diagnosis diagnosis = new Diagnosis();
diagnosis.setParty_id("1");//设置父文档id-基本信息文档_id
diagnosiss.add(diagnosis);
//继续添加其他数据
return diagnosiss;
}- 通过json报文批量导入测试数据
除了通过addDocuments录入数据,还可以通过json报文批量导入数据
在配置文件esmapper/Client_Info.xml增加以下内容:
<!--
导入基本信息:
-->
<property name="bulkImportBasicData" trim="false">
<![CDATA[
{ "index": { "_id": "1" }}
{ "party_id":"1", "sex":"男", "mari_sts":"不详", "ethnic":"蒙古族", "prof":"放牧","birth_date":"1966-2-14 00:00:00", "province":"内蒙古", "city":"赤峰市","client_type":"1", "client_name":"安", "age":52,"id_type":"1", "idno":"1", "education":"初中","created_date":"2013-04-24 00:00:00","last_modified_date":"2013-04-24 00:00:00", "etl_date":"2013-04-24 00:00:00"}
{ "index": { "_id": "2" }}
{ "party_id":"2", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"公务员","birth_date":"1986-07-06 00:00:00", "province":"广东", "city":"深圳","client_type":"1", "client_name":"彭", "age":32,"id_type":"1", "idno":"2", "education":"本科", "created_date":"2013-05-09 15:49:47","last_modified_date":"2013-05-09 15:49:47", "etl_date":"2013-05-09 15:49:47"}
{ "index": { "_id": "3" }}
{ "party_id":"3", "sex":"男", "mari_sts":"未婚", "ethnic":"汉族", "prof":"无业","birth_date":"2000-08-15 00:00:00", "province":"广东", "city":"佛山","client_type":"1", "client_name":"浩", "age":18,"id_type":"1", "idno":"3", "education":"高中", "created_date":"2014-09-01 09:49:27","last_modified_date":"2014-09-01 09:49:27", "etl_date":"2014-09-01 09:49:27" }
{ "index": { "_id": "4" }}
{ "party_id":"4", "sex":"女", "mari_sts":"未婚", "ethnic":"满族", "prof":"工人","birth_date":"1996-03-14 00:00:00", "province":"江苏", "city":"扬州","client_type":"1", "client_name":"慧", "age":22,"id_type":"1", "idno":"4", "education":"高中", "created_date":"2014-09-16 09:30:37","last_modified_date":"2014-09-16 09:30:37", "etl_date":"2014-09-16 09:30:37" }
{ "index": { "_id": "5" }}
{ "party_id":"5", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"教师","birth_date":"1983-08-14 00:00:00", "province":"宁夏", "city":"灵武","client_type":"1", "client_name":"英", "age":35,"id_type":"1", "idno":"5", "education":"本科", "created_date":"2015-09-16 09:30:37","last_modified_date":"2015-09-16 09:30:37", "etl_date":"2015-09-16 09:30:37" }
{ "index": { "_id": "6" }}
{ "party_id":"6", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"工人","birth_date":"1959-07-04 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"岭", "age":59,"id_type":"1", "idno":"6", "education":"小学", "created_date":"2015-09-01 09:49:27","last_modified_date":"2015-09-01 09:49:27", "etl_date":"2015-09-01 09:49:27" }
{ "index": { "_id": "7" }}
{ "party_id":"7", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"1999-02-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"欣", "age":19,"id_type":"1", "idno":"7", "education":"高中", "created_date":"2016-12-01 09:49:27","last_modified_date":"2016-12-01 09:49:27", "etl_date":"2016-12-01 09:49:27" }
{ "index": { "_id": "8" }}
{ "party_id":"8", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"2007-11-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"梅", "age":10,"id_type":"1", "idno":"8", "education":"小学", "created_date":"2016-11-21 09:49:27","last_modified_date":"2016-11-21 09:49:27", "etl_date":"2016-11-21 09:49:27" }
{ "index": { "_id": "9" }}
{ "party_id":"9", "sex":"男", "mari_sts":"不详", "ethnic":"回族", "prof":"个体户","birth_date":"1978-03-29 00:00:00", "province":"北京", "city":"北京","client_type":"1", "client_name":"磊", "age":40,"id_type":"1", "idno":"9", "education":"高中", "created_date":"2017-09-01 09:49:27","last_modified_date":"2017-09-01 09:49:27", "etl_date":"2017-09-01 09:49:27" }
{ "index": { "_id": "10" }}
{ "party_id":"10", "sex":"男", "mari_sts":"已婚", "ethnic":"汉族", "prof":"农民","birth_date":"1970-11-14 00:00:00", "province":"浙江", "city":"台州","client_type":"1", "client_name":"强", "age":47,"id_type":"1", "idno":"10", "education":"初中", "created_date":"2018-09-01 09:49:27","last_modified_date":"2018-09-01 09:49:27", "etl_date":"2018-09-01 09:49:27" }
]]>
</property>
<!--
导入诊断信息
-->
<property name="bulkImportDiagnosisData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"J31.0", "sd_disease_name":"鼻炎","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"E78.1", "sd_disease_name":"甘油三脂增高","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }
{ "index": { "parent": "4" }}
{ "party_id":"4", "provider":"江苏医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }
{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"H44", "sd_disease_name":"眼疾","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }
{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }
{ "index": { "parent": "7" }}
{ "party_id":"7", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2017-04-08 10:42:18", "last_modified_date":"2017-04-08 10:42:18", "etl_date":"2017-04-08 10:42:18" }
{ "index": { "parent": "8" }}
{ "party_id":"8", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }
{ "index": { "parent": "9" }}
{ "party_id":"9", "provider":"朝阳医院", "subject":"","diagnosis_type":"","icd10_code":"A03.901", "sd_disease_name":"急性细菌性痢疾","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }
]]>
</property>
<!--
导入医疗信息
-->
<property name="bulkImportMedicalData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"氟化钠", "medical_sale_name":"", "medical_code":"A01AA01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-05-31 00:00:00", "last_modified_date":"2016-05-31 00:00:00", "etl_date":"2016-05-31 00:00:00" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"", "medical_sale_name":"盐酸多西环素胶丸", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-03-18 00:00:00", "last_modified_date":"2016-03-18 00:00:00", "etl_date":"2016-03-18 00:00:00" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸多西环素分散片", "medical_sale_name":"", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"肾上腺素", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }
{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }
{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }
{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"诺氟沙星胶囊", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }
{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸异丙肾上腺素片", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44" }
{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"甲硝唑栓", "medical_sale_name":"", "medical_code":"A01AB17", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-06-08 10:42:18", "last_modified_date":"2018-06-08 10:42:18", "etl_date":"2018-06-08 10:42:18" }
{ "index": { "parent": "9" }}
{ "party_id":"9", "hos_name_yb":"朝阳医院", "eivisions_name":"", "medical_type":"","medical_common_name":"复方克霉唑乳膏", "medical_sale_name":"", "medical_code":"A01AB18", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44"}
]]>
</property>
<!--
导入体检信息
-->
<property name="bulkImportExamData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"高血压","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "2" }}
{ "party_id":"2", "hospital":"", "dept":"", "is_ok":"Y", "exam_result":"轻度脂肪肝","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "3" }}
{ "party_id":"3", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"急性细菌性痢疾","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "4" }}
{ "party_id":"4", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "5" }}
{ "party_id":"5", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "6" }}
{ "party_id":"6", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "7" }}
{ "party_id":"7", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "8" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "9" }}
{ "party_id":"9", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "10" }}
{ "party_id":"10", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
]]>
</property>
通过bboss提供的通用api,导入上面定义的数据: /**
* 通过读取配置文件中的dsl json数据导入医疗数据
*/
public void importClientInfoFromJsonData(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
clientUtil.executeHttp("client_info/basic/_bulk?refresh","bulkImportBasicData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/diagnosis/_bulk?refresh","bulkImportDiagnosisData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/medical/_bulk?refresh","bulkImportMedicalData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/exam/_bulk?refresh","bulkImportExamData",ClientUtil.HTTP_POST);统计导入的数据
long basiccount = clientUtil.countAll("client_info/basic");
System.out.println(basiccount);
long medicalcount = clientUtil.countAll("client_info/medical");
System.out.println(medicalcount);
long examcount = clientUtil.countAll("client_info/exam");
System.out.println(examcount);
long diagnosiscount = clientUtil.countAll("client_info/diagnosis");
System.out.println(diagnosiscount);
}4.父子关系查询-Elasticsearch 5.x 中has_child和has_parent查询的基本用法- 根据父查子-通过客户名称信息查询客户端体检结果
在配置文件esmapper/Client_Info.xml增加dsl语句:queryExamSearchByClientName
<!--根据客户名称查询客户体检报告-->
<property name="queryExamSearchByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
}
}
}
}
]]>
</property>执行查询,通过bboss的searchList 方法获取符合条件的体检报告以及总记录数据,返回size对应的1000条数据
/**
* 根据客户名称查询客户体检报告
*/
public void queryExamSearchByClientName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);
ESDatas<Exam> exams = clientUtil.searchList("client_info/exam/_search","queryExamSearchByClientName",params,Exam.class);
List<Exam> examList = exams.getDatas();//获取符合条件的体检数据
long totalSize = exams.getTotalSize();//符合条件的总记录数据
} - 根据子查父数据-通过医疗信息编码查找客户基本数据
在配置文件esmapper/Client_Info.xml增加查询dsl语句:queryClientInfoByMedicalName
<!--通过医疗信息编码查找客户基本数据-->
<property name="queryClientInfoByMedicalName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_child": {
"type": "medical",
"score_mode": "max",
"query": {
"match": {
"medical_code": #[medicalCode] ## 通过变量medicalCode设置医疗编码
}
}
}
}
}
]]>
</property>执行查询,通过bboss的searchList 方法获取符合条件的客户端基本信息以及总记录数据 /**
* 通过医疗信息编码查找客户基本数据
*/
public void queryClientInfoByMedicalName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("medicalCode","A01AA01"); //通过变量medicalCode设置医疗编码
params.put("size",1000); //最多返回size变量对应的记录条数
ESDatas<Basic> bascis = clientUtil.searchList("client_info/basic/_search","queryClientInfoByMedicalName",params,Basic.class);
List<Basic> bascisList = bascis.getDatas();//获取符合条件的客户信息
long totalSize = bascis.getTotalSize();
}5.同时返回父子数据-Elasticsearch 5.x 中如何在检索中同时返回父子数据这一节中我们介绍同时返回父子数据的玩法 :inner_hits的妙用
- 根据父条件查询所有子数据集合并返回父数据,根据客户名称查询所有体检数据,同时返回客户信息
在配置文件esmapper/Client_Info.xml增加检索dsl-queryDiagnosisByClientName
<!--根据客户名称获取客户体检诊断数据,并返回客户信息-->
<property name="queryDiagnosisByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
},
"inner_hits": {} ## 通过变量inner_hits表示要返回对应的客户信息
}
}
}
]]>
</property>执行检索并遍历结果 /**
* 根据客户名称获取客户体检诊断数据,并返回客户数据
*/
public void queryDiagnosisByClientName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);
try {
ESInnerHitSerialThreadLocal.setESInnerTypeReferences(Basic.class);//指定inner查询结果对应的客户基本信息类型,Basic只有一个文档类型,索引不需要显示指定basic对应的mapping type名称
ESDatas<Diagnosis> diagnosiss = clientUtil.searchList("client_info/diagnosis/_search",
"queryDiagnosisByClientName",params,Diagnosis.class);
List<Diagnosis> diagnosisList = diagnosiss.getDatas();//获取符合条件的体检报告数据
long totalSize = diagnosiss.getTotalSize();
//遍历诊断报告信息,并查看报告对应的客户基本信息
for(int i = 0; diagnosisList != null && i < diagnosisList.size(); i ++) {
Diagnosis diagnosis = diagnosisList.get(i);
List<Basic> basics = ResultUtil.getInnerHits(diagnosis.getInnerHits(), "basic");
if(basics != null) {
System.out.println(basics.size());
}
}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对应的客户基本信息类型
}
}- 根据子条件查询父数据并返回符合条件的父的子数据集合,查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
在配置文件esmapper/Client_Info.xml增加检索dsl-queryClientAndAllSons
<!--查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录-->
<property name="queryClientAndAllSons">
<![CDATA[
{
"query": {
"bool": {
"should": [
{
"match_all":{}
}
]
,"must": [
{
"has_child": {
"score_mode": "none",
"type": "diagnosis"
,"query": {
"bool": {
"must": [
{
"term": {
"icd10_code": {
"value": "J00"
}
}
}
]
}
},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"score_mode": "none",
"type": "medical"
,"query": {
"match_all": {}
},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"type": "exam",
"query": {
"match_all": {}
},"inner_hits":{}
}
}
]
}
}
}
]]>
</property>执行查询: /**
* 查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
*/
public void queryClientAndAllSons(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
Map<String,Object> params = null;//没有检索条件,构造一个空的参数对象
try {
//设置子文档的类型和对象映射关系
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("exam",Exam.class);//指定inner查询结果对于exam类型和对应的对象类型Exam
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("diagnosis",Diagnosis.class);//指定inner查询结果对于diagnosis类型和对应的对象类型Diagnosis
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("medical",Medical.class);//指定inner查询结果对于medical类型和对应的对象类型Medical
ESDatas<Basic> escompanys = clientUtil.searchList("client_info/basic/_search",
"queryClientAndAllSons",params,Basic.class);
//String response = clientUtil.executeRequest("client_info/basic/_search","queryClientAndAllSons",params);直接获取原始的json报文
// escompanys = clientUtil.searchAll("client_info",Basic.class);
long totalSize = escompanys.getTotalSize();
List<Basic> clientInfos = escompanys.getDatas();//获取符合条件的数据
//查看公司下面的雇员信息(符合检索条件的雇员信息)
for (int i = 0; clientInfos != null && i < clientInfos.size(); i++) {
Basic clientInfo = clientInfos.get(i);
List<Exam> exams = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "exam");
if(exams != null)
System.out.println(exams.size());
List<Diagnosis> diagnosiss = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "diagnosis");
if(diagnosiss != null)
System.out.println(diagnosiss.size());
List<Medical> medicals = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "medical");
if(medicals != null)
System.out.println(medicals.size());
}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对于各种类型信息
}
}最后我们按顺序执行所有方法,验证功能: @Test
public void testMutil(){
this.createClientIndice();//创建indice client_info
// this.importClientInfoDataFromBeans(); //通过api添加测试数据
this.importClientInfoFromJsonData();//导入测试数据
this.queryExamSearchByClientName(); //根据客户端名称查询提交报告
this.queryClientInfoByMedicalName();//通过医疗信息编码查找客户基本数据
this.queryDiagnosisByClientName();//根据客户名称获取客户体检诊断数据,并返回客户数据
this.queryClientAndAllSons();//查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
}可以下载完整的demo工程运行本文中的测试用例方法,地址见相关资料。到此Elasticsearch 5.x 父子关系维护检索实战介绍完毕,谢谢大家!
相关资料
完整demo工程 https://github.com/bbossgroups/eshelloword-booter
对应的类文件和配置文件
org.bboss.elasticsearchtest.parentchild.ParentChildTest
esmapper/Client_Info.xml
开发交流
bboss交流群 166471282
bboss公众号
敬请关注:父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检
restclient请求超时后,不断retry,尝试链接停掉的节点,如何限制retry次数?
Elasticsearch • rochy 回复了问题 • 2 人关注 • 1 个回复 • 3804 次浏览 • 2018-12-11 09:53
TF/IDF 计算fieldNorm,不同文档字段的词数(fieldNum)不一样,为什么得分结果都一样(版本2.3)
Elasticsearch • medcl 回复了问题 • 4 人关注 • 5 个回复 • 2438 次浏览 • 2018-12-11 12:52
logstash input插件开发
Logstash • bellengao 发表了文章 • 0 个评论 • 5066 次浏览 • 2018-12-10 19:56
logstash作为一个数据管道中间件,支持对各种类型数据的采集与转换,并将数据发送到各种类型的存储库,比如实现消费kafka数据并且写入到Elasticsearch, 日志文件同步到对象存储S3等,mysql数据同步到Elasticsearch等。
logstash内部主要包含三个模块:
- input: 从数据源获取数据
- filter: 过滤、转换数据
- output: 输出数据

不同类型的数据都可以通过对应的input-plugin, output-plugin完成数据的输入与输出。如需要消费kafka中的数据并写入到Elasticsearch中,则需要使用logstash的kafka-input-plugin完成数据输入,logstash-output-elasticsearch完成数据输出。如果需要对输入数据进行过滤或者转换,比如根据关键词过滤掉不需要的内容,或者时间字段的格式转换,就需要又filter-plugin完成了。
logstash的input插件目前已经有几十种了,支持大多数比较通用或开源的数据源的输入。但如果公司内部开发的数据库或其它存储类的服务不能和开源产品在接口协议上兼容,比如腾讯自研的消息队列服务CMQ不依赖于其它的开源消息队列产品,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的数据;腾讯云对象存储服务COS, 在鉴权方式上和AWS的S3存在差异,也不能直接使用logstash-input-s3插件从COS中读取数据,对于这种情况,就需要自己开发logstash的input插件了。
本文以开发logstash的cos input插件为例,介绍如何开发logstash的input插件。
logstash官方提供了有个简单的input plugin example可供参考:
[https://github.com/logstash-pl ... mple/](https://github.com/logstash-pl ... ample/)
环境准备
logstash使用jruby开发,首先要配置jruby环境:
- 安装rvm:
rvm是一个ruby管理器,可以安装并管理ruby环境,也可以通过命令行切换到不同的ruby版本。
<br /> gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB<br />
<br /> \curl -sSL <a href="https://get.rvm.io" rel="nofollow" target="_blank">https://get.rvm.io</a> | bash -s stable<br />
<br /> source /etc/profile.d/rvm.sh<br /> - 安装jruby
<br /> rvm install jruby<br />
<br /> rvm use jruby<br /> - 安装包管理工具bundle和测试工具rspec
<br /> gem install bundle<br /> gem install rspec<br />
从example开始
- clone logstash-input-example
<br /> git clone <a href="https://github.com/logstash-plugins/logstash-input-example.git" rel="nofollow" target="_blank">https://github.com/logstash-pl ... e.git</a><br /> - 将clone出来的logstash-input-example源码copy到logstash-input-cos目录,并删除.git文件夹,目的是以logstash-input-example的源码为参考进行开发,同时把需要改动名称的地方修改一下:
<br /> mv logstash-input-example.gemspec logstash-input-cos.gemspec<br /> mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb<br /> mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb<br /> - 建立的源码目录结构如图所示:

其中,重要文件的作用说明如下:
- cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
- cos_spec.rb: 单元测试文件,通过rspec可以对cos.rb中的代码进行测试
- logstash-input-cos.gemspec: 类似于maven中的pom.xml文件,配置工程的版本、名称、licene,包依赖等,通过bundle命令可以下载依赖包
配置并下载依赖
因为腾讯云COS服务没有ruby sdk, 因为只能依赖其Java sdk进行开发,首先添加对cos java sdk的依赖。在logstash-input-cos.gemspec中Gem dependencies配置栏中增加以下内容:
```Gem dependencies
s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud', '>= 0.0.22'
s.add_runtime_dependency 'jar-dependencies'
s.add_development_dependency 'logstash-devutils', '1.3.6'
<br /> 相比logstash-input-example.gemspec,增加了对com.qcloud:cos_api包以及jar-dependencies包的依赖,jar-dependencies用于在ruby环境中管理jar包,并且可以跟踪jar包的加载状态。<br /> <br /> 然后,在logstash-input-cos.gemspec中增加配置:<br /> <br />
s.platform = 'java'
<br /> 这样可以成功下载java依赖包,并且可以在ruby代码中直接调用java代码。<br /> <br /> 最后,执行以下命令下载依赖:<br /> <br />
bundle install
```
编写代码
logstash-input-cos的代码逻辑其实比较简单,主要是通过执行定时任务,调用cos java sdk中的listObjects方法,获取到指定bucket里的数据,并在每次定时任务执行结束后设置marker保存在本地,再次执行时从marker位置获取数据,以实现数据的增量同步。
jar包的引用
因为要调用cos java sdk中的代码,先引用该jar包:
<br /> require 'cos_api-5.4.4.jar'<br /> java_import com.qcloud.cos.COSClient;<br /> java_import com.qcloud.cos.ClientConfig;<br /> java_import com.qcloud.cos.auth.BasicCOSCredentials;<br /> java_import com.qcloud.cos.auth.COSCredentials;<br /> java_import com.qcloud.cos.exception.CosClientException;<br /> java_import com.qcloud.cos.exception.CosServiceException;<br /> java_import com.qcloud.cos.model.COSObjectSummary;<br /> java_import com.qcloud.cos.model.ListObjectsRequest;<br /> java_import com.qcloud.cos.model.ObjectListing;<br /> java_import com.qcloud.cos.region.Region;<br />
读取配置文件
logstash配置文件读取的代码如图所示:

config_name为cos,其它的配置项读取代码按照ruby的代码规范编写,添加类型校验与默认值,就可以从以下配置文件中读取配置项:
<br /> input {<br /> cos {<br /> "endpoint" => "cos.ap-guangzhou.myqcloud.com"<br /> "access_key_id" => "*****"<br /> "access_key_secret" => "****"<br /> "bucket" => "******"<br /> "region" => "ap-guangzhou"<br /> "appId" => "**********"<br /> "interval" => 60<br /> }<br /> }<br /> <br /> output {<br /> stdout {<br /> codec=>rubydebug<br /> }<br /> }<br />
实现register方法
logstash input插件必须实现另个方法:register 和run
register方法类似于初始化方法,在该方法中可以直接使用从配置文件读取并赋值的变量,完成cos client的初始化,代码如下:
```1 初始化用户身份信息(appid, secretId, secretKey)
cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
2 设置bucket的区域, COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224
clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
3 生成cos客户端
@cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
bucket名称, 需包含appid
bucketName = @bucket + "-"+ @appId
@bucketName = bucketName
@listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()设置bucket名称
@listObjectsRequest.setBucketName(bucketName)
prefix表示列出的object的key以prefix开始
@listObjectsRequest.setPrefix(@prefix)
设置最大遍历出多少个对象, 一次listobject最大支持1000
@listObjectsRequest.setMaxKeys(1000)
@listObjectsRequest.setMarker(@markerConfig.getMarker)
```
示例代码中设置了@cosclient和@listObjectRequest为全局变量, 因为在run方法中会用到这两个变量。
注意在ruby中调用java代码的方式:没有变量描述符;不能直接new Object(),而只能Object.new().
实现run方法
run方法获取数据并将数据流转换成event事件
最简单的run方法为:
<br /> def run(queue)<br /> Stud.interval(@interval) do<br /> event = LogStash::Event.new("message" => @message, "host" => @host)<br /> decorate(event)<br /> queue << event<br /> end # loop<br /> end # def run<br />
代码说明:
- 通过Stud ruby模块执行定时任务,interval可自定义,从配置文件中读取
- 生成event, 示例代码生成了一个包含两个字段数据的event
- 调用decorate()方法, 给该event打上tag,如果配置的话
- queue<<event, 将event插入到数据管道中,发送给filter处理
logstash-input-cos的run方法实现为:
```
def run(queue)
@current_thread = Thread.current
Stud.interval(@interval) do
process(queue)
end
end
def process(queue)
@logger.info('Marker from: ' + @markerConfig.getMarker)
objectListing = @cosclient.listObjects(@listObjectsRequest)
nextMarker = objectListing.getNextMarker()
cosObjectSummaries = objectListing.getObjectSummaries()
cosObjectSummaries.each do |obj|
文件的路径key
key = obj.getKey()
if stop?
@logger.info("stop while attempting to read log file")
break
end根据key获取内容
getObject(key) { |log|
发送消息
@codec.decode(log) do |event|
decorate(event)
queue << event
end
}
记录 marker
@markerConfig.setMarker(key)
@logger.info('Marker end: ' + @markerConfig.getMarker)
end
end
获取下载输入流
def getObject(key, &block)
getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
cosObject = @cosclient.getObject(getObjectRequest)
cosObjectInput = cosObject.getObjectContent()
buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
while (line = buffered.readLine())
block.call(line)
end
end
```
测试代码
在spec/inputs/cos_spec.rb中增加如下测试代码:
```encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"
describe LogStash::Inputs::Cos do
it_behaves_like "an interruptible input plugin" do
let(:config) { {
"endpoint" => 'cos.ap-guangzhou.myqcloud.com',
"access_key_id" => '',
"access_key_secret" => '',
"bucket" => '',
"region" => 'ap-guangzhou',
"appId" => '',
"interval" => 60 } }
end
end
<br /> <br /> rspec是一个ruby测试库,通过bundle命令执行rspec:<br /> <br />
bundle exec rspec
<br /> 如果cos.rb中的代码没有语法或运行时错误,则会出现如果信息表明测试成功:<br /> <br />
Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures
```
构建并测试input-plugin-cos
build
使用gem对input-plugin-cos插件源码进行build:
<br /> gem build logstash-input-cos.gemspec<br />
构建完成后会生成一个名为logstash-input-cos-0.0.1-java.gem的文件
test
在logstash的解压目录下,执行一下命令安装logstash-input-cos plugin:
<br /> ./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem<br />
执行结果为:
<br /> Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem<br /> Installing logstash-input-cos<br /> Installation successful<br />
另外,可以通过./bin/logstash-plugin list命令查看logstash已经安装的所有input/output/filter/codec插件。
生成配置文件cos.logstash.conf,内容为:
<br /> input {<br /> cos {<br /> "endpoint" => "cos.ap-guangzhou.myqcloud.com"<br /> "access_key_id" => "*****"<br /> "access_key_secret" => "****"<br /> "bucket" => "******"<br /> "region" => "ap-guangzhou"<br /> "appId" => "**********"<br /> "interval" => 60<br /> }<br /> }<br /> <br /> output {<br /> stdout {<br /> codec=>rubydebug<br /> }<br /> }<br />
该配置文件使用腾讯云官网账号的secret_id和secret_key进行权限验证,拉取指定bucket里的数据,为了测试,将output设置为标准输出。
执行logstash:
<br /> ./bin/logstash -f cos.logstash.conf<br />
输出结果为:
<br /> Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties<br /> [2018-07-30T19:26:17,039][WARN ][logstash.runner ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.<br /> [2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}<br /> [2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}<br /> [2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.<br /> [2018-07-30T19:26:17,341][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}<br /> [2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}<br /> [2018-07-30T19:26:17,528][INFO ][logstash.pipeline ] Pipeline main started<br /> [2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos ] Marker from:<br /> log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).<br /> log4j:WARN Please initialize the log4j system properly.<br /> log4j:WARN See <a href="http://logging.apache.org/log4j/1.2/faq.html#noconfig" rel="nofollow" target="_blank">http://logging.apache.org/log4 ... onfig</a> for more info.<br /> [2018-07-30T19:26:17,574][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}<br /> [2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos ] Marker end: access.log<br /> {<br /> "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",<br /> "@version" => "1",<br /> "@timestamp" => 2018-07-30T11:26:17.710Z<br /> }<br /> {<br /> "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"<a href="http://localhost:8080/" rel="nofollow" target="_blank">http://localhost:8080/</a>\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",<br /> "@version" => "1",<br /> "@timestamp" => 2018-07-30T11:26:17.711Z<br /> }<br />
在cos中的bucket里上传了名为access.log的nginx日志,上述输出结果中最后打印出来的每个json结构体构成一个event, 其中message消息即为access.log中每一条日志。
- cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
- 安装rvm:









