绊脚石乃是进身之阶。

【2019.01.12】OpenResty x Open Talk 丨深圳站

一、活动介绍

OpenResty 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。目前 OpenResty 是全球占有率排名第五的 Web 服务器,超过 1 万多家商业公司都在使用 OpenResty,比如又拍云、腾讯、奇虎360、京东、12306、Kong 等公司,应用在 CDN、广告系统、搜索、实现业务逻辑、余票查询、网关、ingress controller 等各种不同的场景下。

又拍云 Open Talk 是由又拍云发起的系列主题分享沙龙,秉承又拍云帮助企业提升发展速度的初衷,从 2015 年开启以来,Open Talk 至今已成功举办 45 期,辐射线上线下近十万技术人群,分别在北京、上海、广州、深圳、杭州等 12 座城市举办,覆盖腾讯、华为、网易、京东、唯品会、哔哩哔哩、美团点评、有赞、无码科技等诸多知名企业,往期的活动的讲稿及视频详见:https://opentalk.upyun.com

为促进 OpenResty 在技术圈的发展,增进 OpenResty 使用者的交流与学习,OpenResty 中国社区联合又拍云,举办 “OpenResty × Open Talk” 全国巡回沙龙,邀请业内资深的 OpenResty 技术专家,分享 OpenResty 实战经验,推动 OpenResty 开源项目的发展,促进互联网技术的教育。

二、活动时间

2019 年 01 月 12 日(周六)

三、活动地址

广东省深圳市南山区深圳虚拟大学园 R3-B 栋一楼触梦社区

四、报名地址

http://www.huodongxing.com/event/5470242889300

五、嘉宾及议题介绍

alt

分享嘉宾一:温铭 / OpenResty 软件基金会主席

OpenResty 软件基金会主席,前360开源委员会委员;现为 OpenResty Inc. 合伙人,致力于打造基于 OpenResty、Linux 内核等基础平台的企业级产品和解决方案。创业之前在互联网安全公司工作了 10 年,主要从事服务端的开发和架构,负责开发过木马云查杀、反钓鱼系统和企业安全产品。

分享主题:《 使用 OpenResty 实现 memcached server 》

OpenResty 软件基金会主席,前360开源委员会委员;现为 OpenResty Inc. 合伙人,致力于打造基于 OpenResty、Linux 内核等基础平台的企业级产品和解决方案。创业之前在互联网安全公司工作了 10 年,主要从事服务端的开发和架构,负责开发过木马云查杀、反钓鱼系统和企业安全产品。

alt

分享嘉宾二:张聪 / 又拍云首席架构师

多年 CDN 行业产品设计、技术开发和团队管理相关经验,个人技术方向集中在 Nginx、OpenResty 等高性能 Web 服务器方面,国内 OpenResty 技术早期推广者之一;目前担任又拍云内容加速部技术负责人,主导又拍云 CDN 技术平台的建设和发展。

分享主题:《 OpenResty 动态流控的几种姿势 》

本次分享主要介绍 OpenResty 上几种常用的请求限制和流量整形的方式,特别值得一提的是,详细介绍在又拍云网关层的实际应用案例,以及又拍云贡献的开源 Resty 库。

alt

分享嘉宾三:张波 / 虎牙直播运维研发架构师

目前主要负责虎牙直播运维体系的建设,主要针对 web 和后台类程序的发布、监控、运维自动化相关的运维系统的设计和开发。

分享主题:《 掘金 Nginx 日志 》

Nginx 是现在最流行的负载均衡和反向代理服务器之一,如果你是一位中小微型网站的开发运维人员,可能仅 Nginx 每天就会产生上百 M 甚至数以十 G 的日志文件,Nginx 日志被删除以前,或者我们可以想想,其中是否蕴含着位置的金矿等待挖掘?

alt

分享嘉宾四:陈乾龙 / 京东微信手 Q 业务部

负责微信、手Q抢购后台,以及 OpenResty 入口流量的网关

分享主题:《 基于 OpenResty 流量的防刷及容灾 》

本次分享主要介绍基于 OpenResty 搭建流量网关的实践以及开发过程中碰到的问题及优化。

六、现场礼品

alt

继续阅读 »

一、活动介绍

OpenResty 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。目前 OpenResty 是全球占有率排名第五的 Web 服务器,超过 1 万多家商业公司都在使用 OpenResty,比如又拍云、腾讯、奇虎360、京东、12306、Kong 等公司,应用在 CDN、广告系统、搜索、实现业务逻辑、余票查询、网关、ingress controller 等各种不同的场景下。

又拍云 Open Talk 是由又拍云发起的系列主题分享沙龙,秉承又拍云帮助企业提升发展速度的初衷,从 2015 年开启以来,Open Talk 至今已成功举办 45 期,辐射线上线下近十万技术人群,分别在北京、上海、广州、深圳、杭州等 12 座城市举办,覆盖腾讯、华为、网易、京东、唯品会、哔哩哔哩、美团点评、有赞、无码科技等诸多知名企业,往期的活动的讲稿及视频详见:https://opentalk.upyun.com

为促进 OpenResty 在技术圈的发展,增进 OpenResty 使用者的交流与学习,OpenResty 中国社区联合又拍云,举办 “OpenResty × Open Talk” 全国巡回沙龙,邀请业内资深的 OpenResty 技术专家,分享 OpenResty 实战经验,推动 OpenResty 开源项目的发展,促进互联网技术的教育。

二、活动时间

2019 年 01 月 12 日(周六)

三、活动地址

广东省深圳市南山区深圳虚拟大学园 R3-B 栋一楼触梦社区

四、报名地址

http://www.huodongxing.com/event/5470242889300

五、嘉宾及议题介绍

alt

分享嘉宾一:温铭 / OpenResty 软件基金会主席

OpenResty 软件基金会主席,前360开源委员会委员;现为 OpenResty Inc. 合伙人,致力于打造基于 OpenResty、Linux 内核等基础平台的企业级产品和解决方案。创业之前在互联网安全公司工作了 10 年,主要从事服务端的开发和架构,负责开发过木马云查杀、反钓鱼系统和企业安全产品。

分享主题:《 使用 OpenResty 实现 memcached server 》

OpenResty 软件基金会主席,前360开源委员会委员;现为 OpenResty Inc. 合伙人,致力于打造基于 OpenResty、Linux 内核等基础平台的企业级产品和解决方案。创业之前在互联网安全公司工作了 10 年,主要从事服务端的开发和架构,负责开发过木马云查杀、反钓鱼系统和企业安全产品。

alt

分享嘉宾二:张聪 / 又拍云首席架构师

多年 CDN 行业产品设计、技术开发和团队管理相关经验,个人技术方向集中在 Nginx、OpenResty 等高性能 Web 服务器方面,国内 OpenResty 技术早期推广者之一;目前担任又拍云内容加速部技术负责人,主导又拍云 CDN 技术平台的建设和发展。

分享主题:《 OpenResty 动态流控的几种姿势 》

本次分享主要介绍 OpenResty 上几种常用的请求限制和流量整形的方式,特别值得一提的是,详细介绍在又拍云网关层的实际应用案例,以及又拍云贡献的开源 Resty 库。

alt

分享嘉宾三:张波 / 虎牙直播运维研发架构师

目前主要负责虎牙直播运维体系的建设,主要针对 web 和后台类程序的发布、监控、运维自动化相关的运维系统的设计和开发。

分享主题:《 掘金 Nginx 日志 》

Nginx 是现在最流行的负载均衡和反向代理服务器之一,如果你是一位中小微型网站的开发运维人员,可能仅 Nginx 每天就会产生上百 M 甚至数以十 G 的日志文件,Nginx 日志被删除以前,或者我们可以想想,其中是否蕴含着位置的金矿等待挖掘?

alt

分享嘉宾四:陈乾龙 / 京东微信手 Q 业务部

负责微信、手Q抢购后台,以及 OpenResty 入口流量的网关

分享主题:《 基于 OpenResty 流量的防刷及容灾 》

本次分享主要介绍基于 OpenResty 搭建流量网关的实践以及开发过程中碰到的问题及优化。

六、现场礼品

alt

收起阅读 »

Day 16 - Elasticsearch性能调优

因为总是看到很多同学在说elasticsearch性能不够好,集群不够稳定,询问关于elasticsearch的调优,但是每次都是一个个点的单独讲,很多时候都是case by case的解答,今天简单梳理下日常的elasticsearch使用调优,以下仅为自己日常经验之谈,如有疏漏,还请大家帮忙指正。

一、配置文件调优

elasticsearch.yml

内存锁定

bootstrap.memory_lock:true 允许 JVM 锁住内存,禁止操作系统交换出去。

zen.discovery

Elasticsearch 默认被配置为使用单播发现,以防止节点无意中加入集群。组播发现应该永远不被使用在生产环境了,否则你得到的结果就是一个节点意外的加入到了你的生产环境,仅仅是因为他们收到了一个错误的组播信号。 ES是一个P2P类型的分布式系统,使用gossip协议,集群的任意请求都可以发送到集群的任一节点,然后es内部会找到需要转发的节点,并且与之进行通信。 在es1.x的版本,es默认是开启组播,启动es之后,可以快速将局域网内集群名称,默认端口的相同实例加入到一个大的集群,后续再es2.x之后,都调整成了单播,避免安全问题和网络风暴; 单播discovery.zen.ping.unicast.hosts,建议写入集群内所有的节点及端口,如果新实例加入集群,新实例只需要写入当前集群的实例,即可自动加入到当前集群,之后再处理原实例的配置即可,新实例加入集群,不需要重启原有实例; 节点zen相关配置: discovery.zen.ping_timeout:判断master选举过程中,发现其他node存活的超时设置,主要影响选举的耗时,参数仅在加入或者选举 master 主节点的时候才起作用 discovery.zen.join_timeout:节点确定加入到集群中,向主节点发送加入请求的超时时间,默认为3s discovery.zen.minimum_master_nodes:参与master选举的最小节点数,当集群能够被选为master的节点数量小于最小数量时,集群将无法正常选举。

故障检测( fault detection )

两种情况下回进行故障检测,第一种是由master向集群的所有其他节点发起ping,验证节点是否处于活动状态;第二种是:集群每个节点向master发起ping,判断master是否存活,是否需要发起选举。 故障检测需要配置以下设置使用 形如: discovery.zen.fd.ping_interval 节点被ping的频率,默认为1s。 discovery.zen.fd.ping_timeout 等待ping响应的时间,默认为 30s,运行的集群中,master 检测所有节点,以及节点检测 master 是否正常。 discovery.zen.fd.ping_retries ping失败/超时多少导致节点被视为失败,默认为3。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/modules-discovery-zen.html

队列数量

不建议盲目加大es的队列数量,如果是偶发的因为数据突增,导致队列阻塞,加大队列size可以使用内存来缓存数据,如果是持续性的数据阻塞在队列,加大队列size除了加大内存占用,并不能有效提高数据写入速率,反而可能加大es宕机时候,在内存中可能丢失的上数据量。 哪些情况下,加大队列size呢?GET /_cat/thread_pool,观察api中返回的queue和rejected,如果确实存在队列拒绝或者是持续的queue,可以酌情调整队列size。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/modules-threadpool.html

内存使用

设置indices的内存熔断相关参数,根据实际情况进行调整,防止写入或查询压力过高导致OOM, indices.breaker.total.limit: 50%,集群级别的断路器,默认为jvm堆的70%; indices.breaker.request.limit: 10%,单个request的断路器限制,默认为jvm堆的60%; indices.breaker.fielddata.limit: 10%,fielddata breaker限制,默认为jvm堆的60%。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/circuit-breaker.html

根据实际情况调整查询占用cache,避免查询cache占用过多的jvm内存,参数为静态的,需要在每个数据节点配置。 indices.queries.cache.size: 5%,控制过滤器缓存的内存大小,默认为10%。接受百分比值,5%或者精确值,例如512mb。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/query-cache.html

创建shard

如果集群规模较大,可以阻止新建shard时扫描集群内全部shard的元数据,提升shard分配速度。 cluster.routing.allocation.disk.include_relocations: false,默认为true。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/disk-allocator.html

二、系统层面调优

jdk版本

当前根据官方建议,选择匹配的jdk版本;

jdk内存配置

首先,-Xms和-Xmx设置为相同的值,避免在运行过程中再进行内存分配,同时,如果系统内存小于64G,建议设置略小于机器内存的一半,剩余留给系统使用。 同时,jvm heap建议不要超过32G(不同jdk版本具体的值会略有不同),否则jvm会因为内存指针压缩导致内存浪费,详见: https://www.elastic.co/guide/cn/elasticsearch/guide/current/heap-sizing.html

交换分区

关闭交换分区,防止内存发生交换导致性能下降(部分情况下,宁死勿慢) swapoff -a

文件句柄

Lucene 使用了 大量的 文件。 同时,Elasticsearch 在节点和 HTTP 客户端之间进行通信也使用了大量的套接字,所有这一切都需要足够的文件描述符,默认情况下,linux默认运行单个进程打开1024个文件句柄,这显然是不够的,故需要加大文件句柄数 ulimit -n 65536

https://www.elastic.co/guide/en/elasticsearch/reference/6.5/setting-system-settings.html

mmap

Elasticsearch 对各种文件混合使用了 NioFs( 注:非阻塞文件系统)和 MMapFs ( 注:内存映射文件系统)。请确保你配置的最大映射数量,以便有足够的虚拟内存可用于 mmapped 文件。这可以暂时设置: sysctl -w vm.max_map_count=262144 或者你可以在 /etc/sysctl.conf 通过修改 vm.max_map_count 永久设置它。

https://www.elastic.co/guide/cn/elasticsearch/guide/current/_file_descriptors_and_mmap.html

磁盘

如果你正在使用 SSDs,确保你的系统 I/O 调度程序是配置正确的。 当你向硬盘写数据,I/O 调度程序决定何时把数据实际发送到硬盘。 大多数默认 nix 发行版下的调度程序都叫做 cfq(完全公平队列)。但它是为旋转介质优化的: 机械硬盘的固有特性意味着它写入数据到基于物理布局的硬盘会更高效。 这对 SSD 来说是低效的,尽管这里没有涉及到机械硬盘。但是,deadline 或者 noop 应该被使用。deadline 调度程序基于写入等待时间进行优化, noop 只是一个简单的 FIFO 队列。 echo noop > /sys/block/sd/queue/scheduler

磁盘挂载

mount -o noatime,data=writeback,barrier=0,nobh /dev/sd* /esdata* 其中,noatime,禁止记录访问时间戳;data=writeback,不记录journal;barrier=0,因为关闭了journal,所以同步关闭barrier; nobh,关闭buffer_head,防止内核影响数据IO

磁盘其他注意事项

使用 RAID 0。条带化 RAID 会提高磁盘I/O,代价显然就是当一块硬盘故障时整个就故障了,不要使用镜像或者奇偶校验 RAID 因为副本已经提供了这个功能。 另外,使用多块硬盘,并允许 Elasticsearch 通过多个 path.data 目录配置把数据条带化分配到它们上面。 不要使用远程挂载的存储,比如 NFS 或者 SMB/CIFS。这个引入的延迟对性能来说完全是背道而驰的。

三、elasticsearch使用方式调优

当elasticsearch本身的配置没有明显的问题之后,发现es使用还是非常慢,这个时候,就需要我们去定位es本身的问题了,首先祭出定位问题的第一个命令:

hot_threads

GET /_nodes/hot_threads&interval=30s

抓取30s的节点上占用资源的热线程,并通过排查占用资源最多的TOP线程来判断对应的资源消耗是否正常,一般情况下,bulk,search类的线程占用资源都可能是业务造成的,但是如果是merge线程占用了大量的资源,就应该考虑是不是创建index或者刷磁盘间隔太小,批量写入size太小造成的。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/cluster-nodes-hot-threads.html

pending_tasks

GET /_cluster/pending_tasks

有一些任务只能由主节点去处理,比如创建一个新的 索引或者在集群中移动分片,由于一个集群中只能有一个主节点,所以只有这一master节点可以处理集群级别的元数据变动。在99.9999%的时间里,这不会有什么问题,元数据变动的队列基本上保持为零。在一些罕见的集群里,元数据变动的次数比主节点能处理的还快,这会导致等待中的操作会累积成队列。这个时候可以通过pending_tasks api分析当前什么操作阻塞了es的队列,比如,集群异常时,会有大量的shard在recovery,如果集群在大量创建新字段,会出现大量的put_mappings的操作,所以正常情况下,需要禁用动态mapping。

https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-pending.html

字段存储

当前es主要有doc_values,fielddata,storefield三种类型,大部分情况下,并不需要三种类型都存储,可根据实际场景进行调整: 当前用得最多的就是doc_values,列存储,对于不需要进行分词的字段,都可以开启doc_values来进行存储(且只保留keyword字段),节约内存,当然,开启doc_values会对查询性能有一定的影响,但是,这个性能损耗是比较小的,而且是值得的;

fielddata构建和管理 100% 在内存中,常驻于 JVM 内存堆,所以可用于快速查询,但是这也意味着它本质上是不可扩展的,有很多边缘情况下要提防,如果对于字段没有分析需求,可以关闭fielddata;

storefield主要用于_source字段,默认情况下,数据在写入es的时候,es会将doc数据存储为_source字段,查询时可以通过_source字段快速获取doc的原始结构,如果没有update,reindex等需求,可以将_source字段disable;

_all,ES在6.x以前的版本,默认将写入的字段拼接成一个大的字符串,并对该字段进行分词,用于支持整个doc的全文检索,在知道doc字段名称的情况下,建议关闭掉该字段,节约存储空间,也避免不带字段key的全文检索;

norms:搜索时进行评分,日志场景一般不需要评分,建议关闭;

tranlog

Elasticsearch 2.0之后为了保证不丢数据,每次 index、bulk、delete、update 完成的时候,一定触发刷新 translog 到磁盘上,才给请求返回 200 OK。这个改变在提高数据安全性的同时当然也降低了一点性能。 如果你不在意这点可能性,还是希望性能优先,可以在 index template 里设置如下参数:

{
    "index.translog.durability": "async"
}

index.translog.sync_interval:对于一些大容量的偶尔丢失几秒数据问题也并不严重的集群,使用异步的 fsync 还是比较有益的。比如,写入的数据被缓存到内存中,再每5秒执行一次 fsync ,默认为5s。小于的值100ms是不允许的。 index.translog.flush_threshold_size:translog存储尚未安全保存在Lucene中的所有操作。虽然这些操作可用于读取,但如果要关闭并且必须恢复,则需要重新编制索引。此设置控制这些操作的最大总大小,以防止恢复时间过长。达到设置的最大size后,将发生刷新,生成新的Lucene提交点,默认为512mb。

refresh_interval

执行刷新操作的频率,这会使索引的最近更改对搜索可见,默认为1s,可以设置-1为禁用刷新,对于写入速率要求较高的场景,可以适当的加大对应的时长,减小磁盘io和segment的生成;

禁止动态mapping

动态mapping的坏处: 1.造成集群元数据一直变更,导致集群不稳定; 2.可能造成数据类型与实际类型不一致; 3.对于一些异常字段或者是扫描类的字段,也会频繁的修改mapping,导致业务不可控。 动态mapping配置的可选值及含义如下: true:支持动态扩展,新增数据有新的字段属性时,自动添加对于的mapping,数据写入成功 false:不支持动态扩展,新增数据有新的字段属性时,直接忽略,数据写入成功 strict:不支持动态扩展,新增数据有新的字段时,报错,数据写入失败

批量写入

批量请求显然会大大提升写入速率,且这个速率是可以量化的,官方建议每次批量的数据物理字节数5-15MB是一个比较不错的起点,注意这里说的是物理字节数大小。文档计数对批量大小来说不是一个好指标。比如说,如果你每次批量索引 1000 个文档,记住下面的事实: 1000 个 1 KB 大小的文档加起来是 1 MB 大。 1000 个 100 KB 大小的文档加起来是 100 MB 大。 这可是完完全全不一样的批量大小了。批量请求需要在协调节点上加载进内存,所以批量请求的物理大小比文档计数重要得多。 从 5–15 MB 开始测试批量请求大小,缓慢增加这个数字,直到你看不到性能提升为止。然后开始增加你的批量写入的并发度(多线程等等办法)。 用iostat 、 top 和 ps 等工具监控你的节点,观察资源什么时候达到瓶颈。如果你开始收到 EsRejectedExecutionException ,你的集群没办法再继续了:至少有一种资源到瓶颈了。或者减少并发数,或者提供更多的受限资源(比如从机械磁盘换成 SSD),或者添加更多节点。

索引和shard

es的索引,shard都会有对应的元数据,且因为es的元数据都是保存在master节点,且元数据的更新是要hang住集群向所有节点同步的,当es的新建字段或者新建索引的时候,都会要获取集群元数据,并对元数据进行变更及同步,此时会影响集群的响应,所以需要关注集群的index和shard数量,建议如下: 1.使用shrink和rollover api,相对生成合适的数据shard数; 2.根据数据量级及对应的性能需求,选择创建index的名称,形如:按月生成索引:test-YYYYMM,按天生成索引:test-YYYYMMDD; 3.控制单个shard的size,正常情况下,日志场景,建议单个shard不大于50GB,线上业务场景,建议单个shard不超过20GB;

segment merge

段合并的计算量庞大, 而且还要吃掉大量磁盘 I/O。合并在后台定期操作,因为他们可能要很长时间才能完成,尤其是比较大的段。这个通常来说都没问题,因为大规模段合并的概率是很小的。 如果发现merge占用了大量的资源,可以设置: index.merge.scheduler.max_thread_count: 1 特别是机械磁盘在并发 I/O 支持方面比较差,所以我们需要降低每个索引并发访问磁盘的线程数。这个设置允许 max_thread_count + 2 个线程同时进行磁盘操作,也就是设置为 1 允许三个线程。 对于 SSD,你可以忽略这个设置,默认是 Math.min(3, Runtime.getRuntime().availableProcessors() / 2) ,对 SSD 来说运行的很好。 业务低峰期通过force_merge强制合并segment,降低segment的数量,减小内存消耗; 关闭冷索引,业务需要的时候再进行开启,如果一直不使用的索引,可以定期删除,或者备份到hadoop集群;

自动生成_id

当写入端使用特定的id将数据写入es时,es会去检查对应的index下是否存在相同的id,这个操作会随着文档数量的增加而消耗越来越大,所以如果业务上没有强需求,建议使用es自动生成的id,加快写入速率。

routing

对于数据量较大的业务查询场景,es侧一般会创建多个shard,并将shard分配到集群中的多个实例来分摊压力,正常情况下,一个查询会遍历查询所有的shard,然后将查询到的结果进行merge之后,再返回给查询端。此时,写入的时候设置routing,可以避免每次查询都遍历全量shard,而是查询的时候也指定对应的routingkey,这种情况下,es会只去查询对应的shard,可以大幅度降低合并数据和调度全量shard的开销。

使用alias

生产提供服务的索引,切记使用别名提供服务,而不是直接暴露索引名称,避免后续因为业务变更或者索引数据需要reindex等情况造成业务中断。

避免宽表

在索引中定义太多字段是一种可能导致映射爆炸的情况,这可能导致内存不足错误和难以恢复的情况,这个问题可能比预期更常见,index.mapping.total_fields.limit ,默认值是1000

避免稀疏索引

因为索引稀疏之后,对应的相邻文档id的delta值会很大,lucene基于文档id做delta编码压缩导致压缩率降低,从而导致索引文件增大,同时,es的keyword,数组类型采用doc_values结构,每个文档都会占用一定的空间,即使字段是空值,所以稀疏索引会造成磁盘size增大,导致查询和写入效率降低。

继续阅读 »

因为总是看到很多同学在说elasticsearch性能不够好,集群不够稳定,询问关于elasticsearch的调优,但是每次都是一个个点的单独讲,很多时候都是case by case的解答,今天简单梳理下日常的elasticsearch使用调优,以下仅为自己日常经验之谈,如有疏漏,还请大家帮忙指正。

一、配置文件调优

elasticsearch.yml

内存锁定

bootstrap.memory_lock:true 允许 JVM 锁住内存,禁止操作系统交换出去。

zen.discovery

Elasticsearch 默认被配置为使用单播发现,以防止节点无意中加入集群。组播发现应该永远不被使用在生产环境了,否则你得到的结果就是一个节点意外的加入到了你的生产环境,仅仅是因为他们收到了一个错误的组播信号。 ES是一个P2P类型的分布式系统,使用gossip协议,集群的任意请求都可以发送到集群的任一节点,然后es内部会找到需要转发的节点,并且与之进行通信。 在es1.x的版本,es默认是开启组播,启动es之后,可以快速将局域网内集群名称,默认端口的相同实例加入到一个大的集群,后续再es2.x之后,都调整成了单播,避免安全问题和网络风暴; 单播discovery.zen.ping.unicast.hosts,建议写入集群内所有的节点及端口,如果新实例加入集群,新实例只需要写入当前集群的实例,即可自动加入到当前集群,之后再处理原实例的配置即可,新实例加入集群,不需要重启原有实例; 节点zen相关配置: discovery.zen.ping_timeout:判断master选举过程中,发现其他node存活的超时设置,主要影响选举的耗时,参数仅在加入或者选举 master 主节点的时候才起作用 discovery.zen.join_timeout:节点确定加入到集群中,向主节点发送加入请求的超时时间,默认为3s discovery.zen.minimum_master_nodes:参与master选举的最小节点数,当集群能够被选为master的节点数量小于最小数量时,集群将无法正常选举。

故障检测( fault detection )

两种情况下回进行故障检测,第一种是由master向集群的所有其他节点发起ping,验证节点是否处于活动状态;第二种是:集群每个节点向master发起ping,判断master是否存活,是否需要发起选举。 故障检测需要配置以下设置使用 形如: discovery.zen.fd.ping_interval 节点被ping的频率,默认为1s。 discovery.zen.fd.ping_timeout 等待ping响应的时间,默认为 30s,运行的集群中,master 检测所有节点,以及节点检测 master 是否正常。 discovery.zen.fd.ping_retries ping失败/超时多少导致节点被视为失败,默认为3。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/modules-discovery-zen.html

队列数量

不建议盲目加大es的队列数量,如果是偶发的因为数据突增,导致队列阻塞,加大队列size可以使用内存来缓存数据,如果是持续性的数据阻塞在队列,加大队列size除了加大内存占用,并不能有效提高数据写入速率,反而可能加大es宕机时候,在内存中可能丢失的上数据量。 哪些情况下,加大队列size呢?GET /_cat/thread_pool,观察api中返回的queue和rejected,如果确实存在队列拒绝或者是持续的queue,可以酌情调整队列size。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/modules-threadpool.html

内存使用

设置indices的内存熔断相关参数,根据实际情况进行调整,防止写入或查询压力过高导致OOM, indices.breaker.total.limit: 50%,集群级别的断路器,默认为jvm堆的70%; indices.breaker.request.limit: 10%,单个request的断路器限制,默认为jvm堆的60%; indices.breaker.fielddata.limit: 10%,fielddata breaker限制,默认为jvm堆的60%。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/circuit-breaker.html

根据实际情况调整查询占用cache,避免查询cache占用过多的jvm内存,参数为静态的,需要在每个数据节点配置。 indices.queries.cache.size: 5%,控制过滤器缓存的内存大小,默认为10%。接受百分比值,5%或者精确值,例如512mb。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/query-cache.html

创建shard

如果集群规模较大,可以阻止新建shard时扫描集群内全部shard的元数据,提升shard分配速度。 cluster.routing.allocation.disk.include_relocations: false,默认为true。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/disk-allocator.html

二、系统层面调优

jdk版本

当前根据官方建议,选择匹配的jdk版本;

jdk内存配置

首先,-Xms和-Xmx设置为相同的值,避免在运行过程中再进行内存分配,同时,如果系统内存小于64G,建议设置略小于机器内存的一半,剩余留给系统使用。 同时,jvm heap建议不要超过32G(不同jdk版本具体的值会略有不同),否则jvm会因为内存指针压缩导致内存浪费,详见: https://www.elastic.co/guide/cn/elasticsearch/guide/current/heap-sizing.html

交换分区

关闭交换分区,防止内存发生交换导致性能下降(部分情况下,宁死勿慢) swapoff -a

文件句柄

Lucene 使用了 大量的 文件。 同时,Elasticsearch 在节点和 HTTP 客户端之间进行通信也使用了大量的套接字,所有这一切都需要足够的文件描述符,默认情况下,linux默认运行单个进程打开1024个文件句柄,这显然是不够的,故需要加大文件句柄数 ulimit -n 65536

https://www.elastic.co/guide/en/elasticsearch/reference/6.5/setting-system-settings.html

mmap

Elasticsearch 对各种文件混合使用了 NioFs( 注:非阻塞文件系统)和 MMapFs ( 注:内存映射文件系统)。请确保你配置的最大映射数量,以便有足够的虚拟内存可用于 mmapped 文件。这可以暂时设置: sysctl -w vm.max_map_count=262144 或者你可以在 /etc/sysctl.conf 通过修改 vm.max_map_count 永久设置它。

https://www.elastic.co/guide/cn/elasticsearch/guide/current/_file_descriptors_and_mmap.html

磁盘

如果你正在使用 SSDs,确保你的系统 I/O 调度程序是配置正确的。 当你向硬盘写数据,I/O 调度程序决定何时把数据实际发送到硬盘。 大多数默认 nix 发行版下的调度程序都叫做 cfq(完全公平队列)。但它是为旋转介质优化的: 机械硬盘的固有特性意味着它写入数据到基于物理布局的硬盘会更高效。 这对 SSD 来说是低效的,尽管这里没有涉及到机械硬盘。但是,deadline 或者 noop 应该被使用。deadline 调度程序基于写入等待时间进行优化, noop 只是一个简单的 FIFO 队列。 echo noop > /sys/block/sd/queue/scheduler

磁盘挂载

mount -o noatime,data=writeback,barrier=0,nobh /dev/sd* /esdata* 其中,noatime,禁止记录访问时间戳;data=writeback,不记录journal;barrier=0,因为关闭了journal,所以同步关闭barrier; nobh,关闭buffer_head,防止内核影响数据IO

磁盘其他注意事项

使用 RAID 0。条带化 RAID 会提高磁盘I/O,代价显然就是当一块硬盘故障时整个就故障了,不要使用镜像或者奇偶校验 RAID 因为副本已经提供了这个功能。 另外,使用多块硬盘,并允许 Elasticsearch 通过多个 path.data 目录配置把数据条带化分配到它们上面。 不要使用远程挂载的存储,比如 NFS 或者 SMB/CIFS。这个引入的延迟对性能来说完全是背道而驰的。

三、elasticsearch使用方式调优

当elasticsearch本身的配置没有明显的问题之后,发现es使用还是非常慢,这个时候,就需要我们去定位es本身的问题了,首先祭出定位问题的第一个命令:

hot_threads

GET /_nodes/hot_threads&interval=30s

抓取30s的节点上占用资源的热线程,并通过排查占用资源最多的TOP线程来判断对应的资源消耗是否正常,一般情况下,bulk,search类的线程占用资源都可能是业务造成的,但是如果是merge线程占用了大量的资源,就应该考虑是不是创建index或者刷磁盘间隔太小,批量写入size太小造成的。

https://www.elastic.co/guide/en/elasticsearch/reference/6.x/cluster-nodes-hot-threads.html

pending_tasks

GET /_cluster/pending_tasks

有一些任务只能由主节点去处理,比如创建一个新的 索引或者在集群中移动分片,由于一个集群中只能有一个主节点,所以只有这一master节点可以处理集群级别的元数据变动。在99.9999%的时间里,这不会有什么问题,元数据变动的队列基本上保持为零。在一些罕见的集群里,元数据变动的次数比主节点能处理的还快,这会导致等待中的操作会累积成队列。这个时候可以通过pending_tasks api分析当前什么操作阻塞了es的队列,比如,集群异常时,会有大量的shard在recovery,如果集群在大量创建新字段,会出现大量的put_mappings的操作,所以正常情况下,需要禁用动态mapping。

https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-pending.html

字段存储

当前es主要有doc_values,fielddata,storefield三种类型,大部分情况下,并不需要三种类型都存储,可根据实际场景进行调整: 当前用得最多的就是doc_values,列存储,对于不需要进行分词的字段,都可以开启doc_values来进行存储(且只保留keyword字段),节约内存,当然,开启doc_values会对查询性能有一定的影响,但是,这个性能损耗是比较小的,而且是值得的;

fielddata构建和管理 100% 在内存中,常驻于 JVM 内存堆,所以可用于快速查询,但是这也意味着它本质上是不可扩展的,有很多边缘情况下要提防,如果对于字段没有分析需求,可以关闭fielddata;

storefield主要用于_source字段,默认情况下,数据在写入es的时候,es会将doc数据存储为_source字段,查询时可以通过_source字段快速获取doc的原始结构,如果没有update,reindex等需求,可以将_source字段disable;

_all,ES在6.x以前的版本,默认将写入的字段拼接成一个大的字符串,并对该字段进行分词,用于支持整个doc的全文检索,在知道doc字段名称的情况下,建议关闭掉该字段,节约存储空间,也避免不带字段key的全文检索;

norms:搜索时进行评分,日志场景一般不需要评分,建议关闭;

tranlog

Elasticsearch 2.0之后为了保证不丢数据,每次 index、bulk、delete、update 完成的时候,一定触发刷新 translog 到磁盘上,才给请求返回 200 OK。这个改变在提高数据安全性的同时当然也降低了一点性能。 如果你不在意这点可能性,还是希望性能优先,可以在 index template 里设置如下参数:

{
    "index.translog.durability": "async"
}

index.translog.sync_interval:对于一些大容量的偶尔丢失几秒数据问题也并不严重的集群,使用异步的 fsync 还是比较有益的。比如,写入的数据被缓存到内存中,再每5秒执行一次 fsync ,默认为5s。小于的值100ms是不允许的。 index.translog.flush_threshold_size:translog存储尚未安全保存在Lucene中的所有操作。虽然这些操作可用于读取,但如果要关闭并且必须恢复,则需要重新编制索引。此设置控制这些操作的最大总大小,以防止恢复时间过长。达到设置的最大size后,将发生刷新,生成新的Lucene提交点,默认为512mb。

refresh_interval

执行刷新操作的频率,这会使索引的最近更改对搜索可见,默认为1s,可以设置-1为禁用刷新,对于写入速率要求较高的场景,可以适当的加大对应的时长,减小磁盘io和segment的生成;

禁止动态mapping

动态mapping的坏处: 1.造成集群元数据一直变更,导致集群不稳定; 2.可能造成数据类型与实际类型不一致; 3.对于一些异常字段或者是扫描类的字段,也会频繁的修改mapping,导致业务不可控。 动态mapping配置的可选值及含义如下: true:支持动态扩展,新增数据有新的字段属性时,自动添加对于的mapping,数据写入成功 false:不支持动态扩展,新增数据有新的字段属性时,直接忽略,数据写入成功 strict:不支持动态扩展,新增数据有新的字段时,报错,数据写入失败

批量写入

批量请求显然会大大提升写入速率,且这个速率是可以量化的,官方建议每次批量的数据物理字节数5-15MB是一个比较不错的起点,注意这里说的是物理字节数大小。文档计数对批量大小来说不是一个好指标。比如说,如果你每次批量索引 1000 个文档,记住下面的事实: 1000 个 1 KB 大小的文档加起来是 1 MB 大。 1000 个 100 KB 大小的文档加起来是 100 MB 大。 这可是完完全全不一样的批量大小了。批量请求需要在协调节点上加载进内存,所以批量请求的物理大小比文档计数重要得多。 从 5–15 MB 开始测试批量请求大小,缓慢增加这个数字,直到你看不到性能提升为止。然后开始增加你的批量写入的并发度(多线程等等办法)。 用iostat 、 top 和 ps 等工具监控你的节点,观察资源什么时候达到瓶颈。如果你开始收到 EsRejectedExecutionException ,你的集群没办法再继续了:至少有一种资源到瓶颈了。或者减少并发数,或者提供更多的受限资源(比如从机械磁盘换成 SSD),或者添加更多节点。

索引和shard

es的索引,shard都会有对应的元数据,且因为es的元数据都是保存在master节点,且元数据的更新是要hang住集群向所有节点同步的,当es的新建字段或者新建索引的时候,都会要获取集群元数据,并对元数据进行变更及同步,此时会影响集群的响应,所以需要关注集群的index和shard数量,建议如下: 1.使用shrink和rollover api,相对生成合适的数据shard数; 2.根据数据量级及对应的性能需求,选择创建index的名称,形如:按月生成索引:test-YYYYMM,按天生成索引:test-YYYYMMDD; 3.控制单个shard的size,正常情况下,日志场景,建议单个shard不大于50GB,线上业务场景,建议单个shard不超过20GB;

segment merge

段合并的计算量庞大, 而且还要吃掉大量磁盘 I/O。合并在后台定期操作,因为他们可能要很长时间才能完成,尤其是比较大的段。这个通常来说都没问题,因为大规模段合并的概率是很小的。 如果发现merge占用了大量的资源,可以设置: index.merge.scheduler.max_thread_count: 1 特别是机械磁盘在并发 I/O 支持方面比较差,所以我们需要降低每个索引并发访问磁盘的线程数。这个设置允许 max_thread_count + 2 个线程同时进行磁盘操作,也就是设置为 1 允许三个线程。 对于 SSD,你可以忽略这个设置,默认是 Math.min(3, Runtime.getRuntime().availableProcessors() / 2) ,对 SSD 来说运行的很好。 业务低峰期通过force_merge强制合并segment,降低segment的数量,减小内存消耗; 关闭冷索引,业务需要的时候再进行开启,如果一直不使用的索引,可以定期删除,或者备份到hadoop集群;

自动生成_id

当写入端使用特定的id将数据写入es时,es会去检查对应的index下是否存在相同的id,这个操作会随着文档数量的增加而消耗越来越大,所以如果业务上没有强需求,建议使用es自动生成的id,加快写入速率。

routing

对于数据量较大的业务查询场景,es侧一般会创建多个shard,并将shard分配到集群中的多个实例来分摊压力,正常情况下,一个查询会遍历查询所有的shard,然后将查询到的结果进行merge之后,再返回给查询端。此时,写入的时候设置routing,可以避免每次查询都遍历全量shard,而是查询的时候也指定对应的routingkey,这种情况下,es会只去查询对应的shard,可以大幅度降低合并数据和调度全量shard的开销。

使用alias

生产提供服务的索引,切记使用别名提供服务,而不是直接暴露索引名称,避免后续因为业务变更或者索引数据需要reindex等情况造成业务中断。

避免宽表

在索引中定义太多字段是一种可能导致映射爆炸的情况,这可能导致内存不足错误和难以恢复的情况,这个问题可能比预期更常见,index.mapping.total_fields.limit ,默认值是1000

避免稀疏索引

因为索引稀疏之后,对应的相邻文档id的delta值会很大,lucene基于文档id做delta编码压缩导致压缩率降低,从而导致索引文件增大,同时,es的keyword,数组类型采用doc_values结构,每个文档都会占用一定的空间,即使字段是空值,所以稀疏索引会造成磁盘size增大,导致查询和写入效率降低。

收起阅读 »

社区日报 第480期 (2018-12-16)

1.Elasticsearch Ingest Node 与Logstash性能对比。
http://t.cn/EUQQ0EQ
2.方法:如何将rsyslog与Kafka和Logstash集成。
http://t.cn/EUQ8Lyy
3.(自备梯子)iPhone的黄金时代即将结束。
http://t.cn/EUQ86dB

编辑:至尊宝
归档:https://elasticsearch.cn/article/6201
订阅:https://tinyletter.com/elastic-daily 
继续阅读 »
1.Elasticsearch Ingest Node 与Logstash性能对比。
http://t.cn/EUQQ0EQ
2.方法:如何将rsyslog与Kafka和Logstash集成。
http://t.cn/EUQ8Lyy
3.(自备梯子)iPhone的黄金时代即将结束。
http://t.cn/EUQ86dB

编辑:至尊宝
归档:https://elasticsearch.cn/article/6201
订阅:https://tinyletter.com/elastic-daily  收起阅读 »

社区日报 第479期 (2018-12-15)

1.滴滴Elasticsearch多集群架构实践。 http://t.cn/EUNLkNU

  1. 用es作为存储进行机器学习的python库。 http://t.cn/EUWeDoI

  2. 一周热点:职场寒冬,给你讲四个小故事 http://t.cn/Eydyut9
继续阅读 »

1.滴滴Elasticsearch多集群架构实践。 http://t.cn/EUNLkNU

  1. 用es作为存储进行机器学习的python库。 http://t.cn/EUWeDoI

  2. 一周热点:职场寒冬,给你讲四个小故事 http://t.cn/Eydyut9
收起阅读 »

Day 15 - 基于海量公司分词ES中文分词插件

介绍

本次想和大家分享一款Elasticsearch分词插件,该插件是基于天津海量信息股份有限公司的中文分词核心开发的。海量分词针对大数据检索场景专门做了定制和优化,更贴近搜索需求,整体分词的性能也是非常高效。

本文章有广告成分。但希望将公司研究成果分享出来,给大家实际工作中多一种选择...

海量分词检索优化点

  • 地名方面海量分词5.0可以识别并检索出关于地名后缀的结果

    可以通过搜索“河南”得到“河南省”的结果,搜索“天津”得到“天津市”的搜索结果,而不是简单河南、天津的识别。

  • 著名人物的人名识别更精准,如刘翔、傅莹等

    部分分词器处理中文分词只有两种方式:一种是单字(unigrams)形式,即简单粗暴的将中文的每一个汉字作为一个词(token)分开;另一种是两字(bigrams)的,也就是任意相邻的两个汉字作为一个词分开。这种简单粗暴的切分方式无法实现时效性较新的人名识别,如刘翔、傅莹等会被识别为单字切开。

  • 外国人名识别方面海量可以将人名识别智能识别

    “玛利亚 凯利”、“乔治·史密斯”、“玛丽·戴维斯”将完整的外国人名识别出姓氏和名,如“乔治·史密斯”可以被识别为“乔治”和 “史密斯”。

  • 常见词的品牌名称识别方面,海量分词5.0识别的结果中包含实际意义的品牌名称

    如“乐高”,“吉米作为简单的词,可以被识别,但是词放在文档语境中有其品牌的属性,海量分词识别的结果中可以准确搜索出品牌的结果。

  • 机构名识别方面

    海量分词5.0可以识别完整的机构名称,如“天津海量信息技术股份有限公司”,可以完整的识别出全称。

海量分词性能评测

评测用例

本次评测选取的语料一共三个。一个是2MB的海量测试语料,一个是4MB的北大语料(新版旧版各2MB),一个是9.4GB海量的线上实际数据

评测指标

本次评测是在开源评测程序上修改而来,评测指标有分词速度、行数完美率、字数完美率(该指标仅供参考)、内存消耗

评测结果

2MB海量测试语料

分词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1049.0212 74.11% 65.97% 85
ltp / 33.748833 55.68% 45.23% 201
IctClass 普通分词 208.69612 48.77% 37.10% 51
IctClass 细粒度分词 691.5951 38.33% 27.95% 51
Jieba SEARCH分词 592.697 47.64% 36.25% 236
FudanNLP / 121.7537 42.99% 31.59% 99
HanLP 标准分词 212.74121 45.30% 34.00% 63
HanLP NLP分词 378.23676 44.09% 32.55% 71
HanLP N-最短路径分词 189.29959 44.19% 32.22% 60
HanLP 最短路径分词 415.63605 43.19% 31.28% 59
HanLP 极速词典分词 6735.1934 36.78% 25.10% 18
THULAC / 0.20857348 54.49% 43.79% 110
Stanford CTB 0.13520464 44.43% 33.25% 1101
Stanford PKU 0.12508623 45.15% 34.01% 1065

可以看到海量分词的行数完美率是最高的,而且速度十分优异;仅有的一个比海量分词速度快的算法是一个追求极限性能舍弃准确率的算法

4MB北大语料

词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1121.7269 85.94% 48.28% 85
ltp / 35.81329 87.37% 49.37% 201
IctClass 普通分词 226.11554 78.55% 42.04% 51
IctClass 细粒度分词 756.5135 59.06% 30.61% 51
Jieba SEARCH分词 957.52826 47.07% 20.01% 236
FudanNLP / 126.09879 58.54% 27.78% 99
HanLP 标准分词 369.66 65.46% 35.04% 63
HanLP NLP分词 439.75632 61.93% 31.37% 71
HanLP N-最短路径分词 223.30482 69.20% 35.07% 60
HanLP 最短路径分词 440.72244 67.74% 33.83% 59
HanLP 极速词典分词 7522.581 58.09% 27.82% 18

(注:THULAC和stanford由于速度问题,不纳入评测)

可以看到海量的速度和行数完美率都很优异而且达到了兼顾,行数完美率只落后更高的ltp算法1.4个百分点,速度却是它的三十多倍

9.4GB线上数据

分词器 分词模式 分词速度(字符/毫秒)
ltp / 33.592
海量 / 960.611
IctClass 普通分词 198.094
HanLP N-最短路径分词 201.735
HanLP 最短路径分词 425.482
HanLP 标准分词 473.400
HanLP NLP分词 361.842
IctClass 细粒度分词 689.183
FudanNLP / 120.860
HanLP 极速词典分词 6238.916
Jieba SEARCH分词 568.262

(注:THULAC和stanford由于速度问题,不纳入评测)

本表格中分词顺序按(4MB北大语料的)行数完美率进行排序,越靠前的(4MB北大语料的)行数完美率越高

可以看出海量的分词速度十分优秀,分词速度拉开了大多数分词数倍,相比于行数完美率小幅领先的ltp要快几十倍

海量分词插件使用方法

安装使用

  • 下载安装 - 地址: https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases

    unzip plugin to folder `your-es-root/plugins/`
  • 使用 elasticsearch-plugin 安装

    ./bin/elasticsearch-plugin install https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases/download/v6.4.2/elasticsearch-analysis-hlseg-6.4.2.zip
  • 重启es集群

实例(借用github-ik分词插件的实例)

1.创建index

curl -XPUT http://localhost:9200/hylanda_seg

2.配置mapping

curl -XPOST http://localhost:9200/hylanda_seg/data/_mapping -H 'Content-Type:application/json' -d'
{
  "properties": {
    "msg": {
      "type": "text",
      "analyzer": "hlseg_search"
    }
  }
}'

3.插入测试数据

curl -XPOST http://localhost:9200/hylanda_seg/data/1 -H 'Content-Type:application/json' -d'
{"content":"美国留给伊拉克的是个烂摊子吗"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/2 -H 'Content-Type:application/json' -d'
{"content":"公安部:各地校车将享最高路权"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/3 -H 'Content-Type:application/json' -d'
{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/4 -H 'Content-Type:application/json' -d'
{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}
'

4.查询

curl -XPOST http://localhost:9200/hylanda_seg/data/_search  -H 'Content-Type:application/json' -d'
{
  "query": {
    "match": {
      "content": "中国"
    }
  },
  "highlight": {
    "fields": {
      "content": {}
    }
  }
}
'

返回结果

{
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.5754429,
    "hits" : [
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "4",
        "_score" : 0.5754429,
        "_source" : {
          "content" : "中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"
        },
        "highlight" : {
          "content" : [
            "中韩渔警冲突调查:韩警平均每天扣1艘<em>中国</em>渔船"
          ]
        }
      },
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "5",
        "_score" : 0.2876821,
        "_source" : {
          "content" : "中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
        },
        "highlight" : {
          "content" : [
            "<em>中国</em>驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
          ]
        }
      }
    ]
  }
}

字典配置

海量分词分为基础词词典CoreDict.dat和自定义词典userDict_utf8.txt。基础词词典在dictionary目录下,需要将CoreDict.zip解压后放在config目录下,可以通过修改config下的userDict_utf8.txt来更新自定义词典

自定义词典格式如下


1.用户自定义词典采用文本格式,utf-8编码,每行一个词

2.每个词包含三列属性,分别是词串、词的属性以及idf值的加权等级,并以Tab作为分隔,其中除了词串必填外,其他列可以不填,不填写则系统采用默认值

3.“#”表示注释,会在加载时被忽略

4.词的属性以西文逗号分隔,可以是词性、停止词标志或者自定义属性

5.词性标记参考北大标准,用于词性标注时参考,该项不填则默认为名词

6.停止词标志为:stopword,由SegOption.outputStopWord来控制是否输出停止词

7.自定义属性不参与分词过程,分词结果中若Token.userTag不为空,则可以获取到该词的自定义属性。

8.idf值的加权分5级,从低到高的定义是idf-lv1 — idf-lv5,等级越高则该词在关键词计算时的权重会越大,若不填写该值则系统默认是idf-lv3(中等权重)
继续阅读 »

介绍

本次想和大家分享一款Elasticsearch分词插件,该插件是基于天津海量信息股份有限公司的中文分词核心开发的。海量分词针对大数据检索场景专门做了定制和优化,更贴近搜索需求,整体分词的性能也是非常高效。

本文章有广告成分。但希望将公司研究成果分享出来,给大家实际工作中多一种选择...

海量分词检索优化点

  • 地名方面海量分词5.0可以识别并检索出关于地名后缀的结果

    可以通过搜索“河南”得到“河南省”的结果,搜索“天津”得到“天津市”的搜索结果,而不是简单河南、天津的识别。

  • 著名人物的人名识别更精准,如刘翔、傅莹等

    部分分词器处理中文分词只有两种方式:一种是单字(unigrams)形式,即简单粗暴的将中文的每一个汉字作为一个词(token)分开;另一种是两字(bigrams)的,也就是任意相邻的两个汉字作为一个词分开。这种简单粗暴的切分方式无法实现时效性较新的人名识别,如刘翔、傅莹等会被识别为单字切开。

  • 外国人名识别方面海量可以将人名识别智能识别

    “玛利亚 凯利”、“乔治·史密斯”、“玛丽·戴维斯”将完整的外国人名识别出姓氏和名,如“乔治·史密斯”可以被识别为“乔治”和 “史密斯”。

  • 常见词的品牌名称识别方面,海量分词5.0识别的结果中包含实际意义的品牌名称

    如“乐高”,“吉米作为简单的词,可以被识别,但是词放在文档语境中有其品牌的属性,海量分词识别的结果中可以准确搜索出品牌的结果。

  • 机构名识别方面

    海量分词5.0可以识别完整的机构名称,如“天津海量信息技术股份有限公司”,可以完整的识别出全称。

海量分词性能评测

评测用例

本次评测选取的语料一共三个。一个是2MB的海量测试语料,一个是4MB的北大语料(新版旧版各2MB),一个是9.4GB海量的线上实际数据

评测指标

本次评测是在开源评测程序上修改而来,评测指标有分词速度、行数完美率、字数完美率(该指标仅供参考)、内存消耗

评测结果

2MB海量测试语料

分词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1049.0212 74.11% 65.97% 85
ltp / 33.748833 55.68% 45.23% 201
IctClass 普通分词 208.69612 48.77% 37.10% 51
IctClass 细粒度分词 691.5951 38.33% 27.95% 51
Jieba SEARCH分词 592.697 47.64% 36.25% 236
FudanNLP / 121.7537 42.99% 31.59% 99
HanLP 标准分词 212.74121 45.30% 34.00% 63
HanLP NLP分词 378.23676 44.09% 32.55% 71
HanLP N-最短路径分词 189.29959 44.19% 32.22% 60
HanLP 最短路径分词 415.63605 43.19% 31.28% 59
HanLP 极速词典分词 6735.1934 36.78% 25.10% 18
THULAC / 0.20857348 54.49% 43.79% 110
Stanford CTB 0.13520464 44.43% 33.25% 1101
Stanford PKU 0.12508623 45.15% 34.01% 1065

可以看到海量分词的行数完美率是最高的,而且速度十分优异;仅有的一个比海量分词速度快的算法是一个追求极限性能舍弃准确率的算法

4MB北大语料

词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1121.7269 85.94% 48.28% 85
ltp / 35.81329 87.37% 49.37% 201
IctClass 普通分词 226.11554 78.55% 42.04% 51
IctClass 细粒度分词 756.5135 59.06% 30.61% 51
Jieba SEARCH分词 957.52826 47.07% 20.01% 236
FudanNLP / 126.09879 58.54% 27.78% 99
HanLP 标准分词 369.66 65.46% 35.04% 63
HanLP NLP分词 439.75632 61.93% 31.37% 71
HanLP N-最短路径分词 223.30482 69.20% 35.07% 60
HanLP 最短路径分词 440.72244 67.74% 33.83% 59
HanLP 极速词典分词 7522.581 58.09% 27.82% 18

(注:THULAC和stanford由于速度问题,不纳入评测)

可以看到海量的速度和行数完美率都很优异而且达到了兼顾,行数完美率只落后更高的ltp算法1.4个百分点,速度却是它的三十多倍

9.4GB线上数据

分词器 分词模式 分词速度(字符/毫秒)
ltp / 33.592
海量 / 960.611
IctClass 普通分词 198.094
HanLP N-最短路径分词 201.735
HanLP 最短路径分词 425.482
HanLP 标准分词 473.400
HanLP NLP分词 361.842
IctClass 细粒度分词 689.183
FudanNLP / 120.860
HanLP 极速词典分词 6238.916
Jieba SEARCH分词 568.262

(注:THULAC和stanford由于速度问题,不纳入评测)

本表格中分词顺序按(4MB北大语料的)行数完美率进行排序,越靠前的(4MB北大语料的)行数完美率越高

可以看出海量的分词速度十分优秀,分词速度拉开了大多数分词数倍,相比于行数完美率小幅领先的ltp要快几十倍

海量分词插件使用方法

安装使用

  • 下载安装 - 地址: https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases

    unzip plugin to folder `your-es-root/plugins/`
  • 使用 elasticsearch-plugin 安装

    ./bin/elasticsearch-plugin install https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases/download/v6.4.2/elasticsearch-analysis-hlseg-6.4.2.zip
  • 重启es集群

实例(借用github-ik分词插件的实例)

1.创建index

curl -XPUT http://localhost:9200/hylanda_seg

2.配置mapping

curl -XPOST http://localhost:9200/hylanda_seg/data/_mapping -H 'Content-Type:application/json' -d'
{
  "properties": {
    "msg": {
      "type": "text",
      "analyzer": "hlseg_search"
    }
  }
}'

3.插入测试数据

curl -XPOST http://localhost:9200/hylanda_seg/data/1 -H 'Content-Type:application/json' -d'
{"content":"美国留给伊拉克的是个烂摊子吗"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/2 -H 'Content-Type:application/json' -d'
{"content":"公安部:各地校车将享最高路权"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/3 -H 'Content-Type:application/json' -d'
{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/4 -H 'Content-Type:application/json' -d'
{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}
'

4.查询

curl -XPOST http://localhost:9200/hylanda_seg/data/_search  -H 'Content-Type:application/json' -d'
{
  "query": {
    "match": {
      "content": "中国"
    }
  },
  "highlight": {
    "fields": {
      "content": {}
    }
  }
}
'

返回结果

{
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.5754429,
    "hits" : [
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "4",
        "_score" : 0.5754429,
        "_source" : {
          "content" : "中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"
        },
        "highlight" : {
          "content" : [
            "中韩渔警冲突调查:韩警平均每天扣1艘<em>中国</em>渔船"
          ]
        }
      },
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "5",
        "_score" : 0.2876821,
        "_source" : {
          "content" : "中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
        },
        "highlight" : {
          "content" : [
            "<em>中国</em>驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
          ]
        }
      }
    ]
  }
}

字典配置

海量分词分为基础词词典CoreDict.dat和自定义词典userDict_utf8.txt。基础词词典在dictionary目录下,需要将CoreDict.zip解压后放在config目录下,可以通过修改config下的userDict_utf8.txt来更新自定义词典

自定义词典格式如下


1.用户自定义词典采用文本格式,utf-8编码,每行一个词

2.每个词包含三列属性,分别是词串、词的属性以及idf值的加权等级,并以Tab作为分隔,其中除了词串必填外,其他列可以不填,不填写则系统采用默认值

3.“#”表示注释,会在加载时被忽略

4.词的属性以西文逗号分隔,可以是词性、停止词标志或者自定义属性

5.词性标记参考北大标准,用于词性标注时参考,该项不填则默认为名词

6.停止词标志为:stopword,由SegOption.outputStopWord来控制是否输出停止词

7.自定义属性不参与分词过程,分词结果中若Token.userTag不为空,则可以获取到该词的自定义属性。

8.idf值的加权分5级,从低到高的定义是idf-lv1 — idf-lv5,等级越高则该词在关键词计算时的权重会越大,若不填写该值则系统默认是idf-lv3(中等权重)
收起阅读 »

JDBC with ESQL


elsh.png

 
 https://github.com/unimassystem/elasticsearch-jdbc
	BasicDataSource basicDataSource = new BasicDataSource();
// 创建连接池 一次性创建多个连接池

// 连接池 创建连接 ---需要四个参数
basicDataSource.setDriverClassName("com.elasticsearch.jdbc.ElasticsearchDriver");
basicDataSource.setUrl("jdbc:elasticsearch://127.0.0.1:5000");

// 从连接池中获取连接
Connection conn = basicDataSource.getConnection();
String sql = "select SRC_IP,SRC_PORT from \"my_test-*\"";
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
System.out.println(rs.getString("SRC_IP"));
}
basicDataSource.close();


String sql = "select SRC_IP,SRC_PORT from my_test* where SRC_PORT between 10 and 100 limit 1000";
String url = "jdbc:elasticsearch://127.0.0.1:5000";
Connection connection = DriverManager.getConnection(url, "test", null);
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery(sql);
ResultSetMetaData meta = rs.getMetaData();
String columns = "|";
for (int i = 0; i < meta.getColumnCount(); i++) {
columns += meta.getColumnLabel(i) + " | ";
}
System.out.println(columns);
while (rs.next()) {
String row = "|";
for (int i = 0; i < meta.getColumnCount(); i++) {
row += rs.getString(i) + " | ";
}
System.out.println(row);
}
继续阅读 »

elsh.png

 
 https://github.com/unimassystem/elasticsearch-jdbc
	BasicDataSource basicDataSource = new BasicDataSource();
// 创建连接池 一次性创建多个连接池

// 连接池 创建连接 ---需要四个参数
basicDataSource.setDriverClassName("com.elasticsearch.jdbc.ElasticsearchDriver");
basicDataSource.setUrl("jdbc:elasticsearch://127.0.0.1:5000");

// 从连接池中获取连接
Connection conn = basicDataSource.getConnection();
String sql = "select SRC_IP,SRC_PORT from \"my_test-*\"";
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
System.out.println(rs.getString("SRC_IP"));
}
basicDataSource.close();


String sql = "select SRC_IP,SRC_PORT from my_test* where SRC_PORT between 10 and 100 limit 1000";
String url = "jdbc:elasticsearch://127.0.0.1:5000";
Connection connection = DriverManager.getConnection(url, "test", null);
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery(sql);
ResultSetMetaData meta = rs.getMetaData();
String columns = "|";
for (int i = 0; i < meta.getColumnCount(); i++) {
columns += meta.getColumnLabel(i) + " | ";
}
System.out.println(columns);
while (rs.next()) {
String row = "|";
for (int i = 0; i < meta.getColumnCount(); i++) {
row += rs.getString(i) + " | ";
}
System.out.println(row);
}
收起阅读 »

Day 14 - 订单中心基于elasticsearch 的解决方案

       ElasticSearch分布式搜索储存集群的引入,主要是为了解决订单数据的存储与搜索的问题。

项目背景:
      15年去哪儿网酒店日均订单量达到30w+,随着多平台订单的聚合日均订单能达到100w左右。原来采用的热表分库方式,即将最近6个月的订单的放置在一张表中,将历史订单放在在history表中。history表存储全量的数据,当用户查询的下单时间跨度超过6个月即查询历史订单表,此分表方式热表的数据量为4000w左右,当时能解决的问题。但是显然不能满足携程艺龙订单接入的需求。如果继续按照热表方式,数据量将超过1亿条。全量数据表保存2年的可能就超过4亿的数据量。所以寻找有效途径解决此问题迫在眉睫。由于对这预计4亿的数据量还需按照预定日期、入住日期、离店日期、订单号、联系人姓名、电话、酒店名称、订单状态……等多个条件查询。所以简单按照某一个维度进行分表操作没有意义。ElasticSearch分布式搜索储存集群的引入,就是为了解决订单数据的存储与搜索的问题。

具体解决方案:

1、系统性能
        对订单模型进行抽象和分类,将常用搜索字段和基础属性字段剥离DB做分库分表。存储订单详情,ElasticSearch存储搜素字段。订单复杂查询直接走ElasticSearch。如下图:

elasticsearch1.png

       通用数据存储模型

elasticsearch2.png


关键字段
    ■ 业务核心字段,用于查询过滤
系统字段
    ■ version 避免高并发操作导致数据覆盖
大字段
    ■ order_data订单详情数据(JSON)
    ■ 可灵活需要索引的字段返回的字段

 
 
2、系统可用性
     系统可用性保障:双机房高可用如下图。

       
elasticsearch3.png

     数据可用性保障:
            一、异步多写保证数据一致性。

                
二、数据补充机制:
  1、每天凌晨task扫描数据库热表数据与es数据版本进行比较。
  2、将第三方推送过来数据中的,订单号即时插入订单同步队列表中。如果数据模型解析转换、持久化成功。删除队列中订单号。同时设置1分钟一次的task 扫描队列表。
  3、推送第三方的数据也采用同样的方式。保证给第三方数据的准确性。


elasticsearch4.png


3、系统伸缩性
      elasticSearch中索引设置了8个分片,目前Es单个索引的文档达到到1.4亿,合计达到2亿条数据占磁盘大小64G,集群机器磁盘容量240G。

 
 
 
继续阅读 »
       ElasticSearch分布式搜索储存集群的引入,主要是为了解决订单数据的存储与搜索的问题。

项目背景:
      15年去哪儿网酒店日均订单量达到30w+,随着多平台订单的聚合日均订单能达到100w左右。原来采用的热表分库方式,即将最近6个月的订单的放置在一张表中,将历史订单放在在history表中。history表存储全量的数据,当用户查询的下单时间跨度超过6个月即查询历史订单表,此分表方式热表的数据量为4000w左右,当时能解决的问题。但是显然不能满足携程艺龙订单接入的需求。如果继续按照热表方式,数据量将超过1亿条。全量数据表保存2年的可能就超过4亿的数据量。所以寻找有效途径解决此问题迫在眉睫。由于对这预计4亿的数据量还需按照预定日期、入住日期、离店日期、订单号、联系人姓名、电话、酒店名称、订单状态……等多个条件查询。所以简单按照某一个维度进行分表操作没有意义。ElasticSearch分布式搜索储存集群的引入,就是为了解决订单数据的存储与搜索的问题。

具体解决方案:

1、系统性能
        对订单模型进行抽象和分类,将常用搜索字段和基础属性字段剥离DB做分库分表。存储订单详情,ElasticSearch存储搜素字段。订单复杂查询直接走ElasticSearch。如下图:

elasticsearch1.png

       通用数据存储模型

elasticsearch2.png


关键字段
    ■ 业务核心字段,用于查询过滤
系统字段
    ■ version 避免高并发操作导致数据覆盖
大字段
    ■ order_data订单详情数据(JSON)
    ■ 可灵活需要索引的字段返回的字段

 
 
2、系统可用性
     系统可用性保障:双机房高可用如下图。

       
elasticsearch3.png

     数据可用性保障:
            一、异步多写保证数据一致性。

                
二、数据补充机制:
  1、每天凌晨task扫描数据库热表数据与es数据版本进行比较。
  2、将第三方推送过来数据中的,订单号即时插入订单同步队列表中。如果数据模型解析转换、持久化成功。删除队列中订单号。同时设置1分钟一次的task 扫描队列表。
  3、推送第三方的数据也采用同样的方式。保证给第三方数据的准确性。


elasticsearch4.png


3、系统伸缩性
      elasticSearch中索引设置了8个分片,目前Es单个索引的文档达到到1.4亿,合计达到2亿条数据占磁盘大小64G,集群机器磁盘容量240G。

 
 
  收起阅读 »

社区日报 第478期 (2018-12-14)

1、Elasticsearch日志系统搭建
http://t.cn/EyOugcQ
2、Elasticsearch最佳实践之核心概念与原理
http://t.cn/EUJa22D
3、Elasticsearch和Hive比较
http://t.cn/EUJaPGa

编辑:铭毅天下
归档:https://elasticsearch.cn/article/6196
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1、Elasticsearch日志系统搭建
http://t.cn/EyOugcQ
2、Elasticsearch最佳实践之核心概念与原理
http://t.cn/EUJa22D
3、Elasticsearch和Hive比较
http://t.cn/EUJaPGa

编辑:铭毅天下
归档:https://elasticsearch.cn/article/6196
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第477期 (2018-12-13)

1.如何在Elasticsearch中查找和删除重复文档
http://t.cn/EUxkESo
2.Elasticsearch线程池分析
http://t.cn/EUxkDNg
3.Elasticsearch Pipeline Aggregations指南
http://t.cn/EUxFzZX

编辑:金桥
归档:https://elasticsearch.cn/article/6195
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.如何在Elasticsearch中查找和删除重复文档
http://t.cn/EUxkESo
2.Elasticsearch线程池分析
http://t.cn/EUxkDNg
3.Elasticsearch Pipeline Aggregations指南
http://t.cn/EUxFzZX

编辑:金桥
归档:https://elasticsearch.cn/article/6195
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

Day 13 - Elasticsearch-Hadoop打通Elasticsearch和Hadoop

ES-Hadoop打通Elasticsearch和Hadoop

介绍

Elasticsearch作为强大的搜索引擎,Hadoop HDFS是分布式文件系统。

ES-Hadoop是一个深度集成Hadoop和ElasticSearch的项目,也是ES官方来维护的一个子项目。Elasticsearch可以将自身的Document导入到HDFS中用作备份;同时也可以将存储在HDFS上的结构化文件导入为ES中的Document,通过实现Hadoop和ES之间的输入输出,可以在Hadoop里面对ES集群的数据进行读取和写入,充分发挥Map-Reduce并行处理的优势,为Hadoop数据带来实时搜索的可能。

ES-Hadoop插件支持Map-Reduce、Cascading、Hive、Pig、Spark、Storm、yarn等组件。

ES-Hadoop整个数据流转图如下:

ES-Hadoop.jpg

环境配置

  • Elasticsearch 5.0.2
  • Centos 7
  • elasticsearch-hadoop 5.0.2
  • repository-hdfs-5.0.2

Elasticsearch备份数据到HDFS

介绍

Elasticsearch副本提供了数据高可靠性,在部分节点丢失的情况下不中断服务;但是副本并不提供对灾难性故障的保护,同时在运维人员误操作情况下也不能保障数据的可恢复性。对于这种情况,我们需要对Elasticsearch集群数据的真正备份。

通过快照的方式,将Elasticsearch集群中的数据备份到HDFS上,这样数据既存在于Elasticsearch集群中,有存在于HDFS上。当ES集群出现不可恢复的故障时,可以将数据从HDFS上快速恢复。

操作步骤

备份与恢复

  • 构建一个仓库

    PUT http://192.168.10.74:9200/_snapshot/backup
    {  
    "type": "hdfs",  
      "settings": {  
              "uri": "hdfs://192.168.10.170:9000",  
              "path": "/es",  
              "conf_location": "/usr/local/hadoop/etc/hadoop/hdfs-site.xml"  
      }
    }
  • 备份快照

    PUT http://192.168.10.74:9200/_snapshot/backup/snapshot_users?wait_for_completion=true
    {
    "indices": "users",  //备份users的index,注意不设置这个属性,默认是备份所有index
    "ignore_unavailable": true,
    "include_global_state": false
    }
  • 恢复快照

    POST http://192.168.10.74:9200/_snapshot/backup/snapshot_users/_restore
    {
    "indices": "users",    //指定索引恢复,不指定就是所有
    "ignore_unavailable": true,     //忽略恢复时异常索引
    "include_global_state": false    //是否存储全局转态信息,fasle代表有一个或几个失败,不会导致整个任务失败
    }

整合Spark与Elasticsearch

整体思路

  • 数据首先存储在HDFS上,可以通过Spark SQL直接导入到ES中
  • Spark SQL可以直接通过建立Dataframe或者临时表连接ES,达到搜索优化、减少数据量和数据筛选的目的,此时数据只在ES内存中而不再Spark SQL中
  • 筛选后的数据重新导入到Spark SQL中进行查询

引入依赖

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>5.0.2</version>
</dependency>

具体流程

  • 数据在HDFS上,数据存储在HDFS的每个DataNode的block上

image-20181211115144840.png

  • 数据加载到Spark SQL

    • 数据从HDFS加载到Spark SQL中,以RDD形式存储
    JavaRDD<String> textFile = spark.read().textFile("hdfs://192.168.10.170:9000/csv/user.csv")
    • 添加数据结构信息转换为新的RDD
    JavaRDD<UserItem> dataSplits = textFile.map(line -> {
        String records = line.toString().trim();
        String record = records.substring(0,records.length() - 1).trim();
        String[] parts = record.split("\\|");
        UserItem u = new UserItem();
        u.setName(parts[0]);
        u.setAge(parts[1]);
        u.setHeight(parts[2]);
        return u;
    });
    • 根据新的RDD创建DataFrame
    DataSet<Row> ds = spark.createDataFrame(dataSplits, UserItem.class);

image-20181211120610726.png

  • 由Dataset创建索引,并写入ES

    JavaEsSparkSQL.saveToEs(ds, "es_spark/users");
  • 数据在ES中建立索引

image-20181211140747391.png

  • Spark SQL通过索引对ES中的数据进行查询

    SparkSession spark = SparkSession.builder().appName("es-spark").master("local").config("es.index.auto.create", true).getOrCreate();
    Map<String, String> options = new HashMap<>();
    options.put("pushdown", "true");
    options.put("es.nodes","192.168.10.74:9200");
    
    Dataset<Row> df = spark.read().options(options).format("org.elasticsearch.spark.sql").load("es_spark/users");
    df.createOrReplaceTempView("users");
    
    Dataset<Row> userSet = spark.sql("SELECT name FORM users WHERE age >=10 AND age <= 20");
    userSet.show();

结束

ES-Hadoop无缝打通了ES和Hadoop两个非常优秀的框架,从而让ES的强大检索性能帮助我们快速分析海量数据。

继续阅读 »

ES-Hadoop打通Elasticsearch和Hadoop

介绍

Elasticsearch作为强大的搜索引擎,Hadoop HDFS是分布式文件系统。

ES-Hadoop是一个深度集成Hadoop和ElasticSearch的项目,也是ES官方来维护的一个子项目。Elasticsearch可以将自身的Document导入到HDFS中用作备份;同时也可以将存储在HDFS上的结构化文件导入为ES中的Document,通过实现Hadoop和ES之间的输入输出,可以在Hadoop里面对ES集群的数据进行读取和写入,充分发挥Map-Reduce并行处理的优势,为Hadoop数据带来实时搜索的可能。

ES-Hadoop插件支持Map-Reduce、Cascading、Hive、Pig、Spark、Storm、yarn等组件。

ES-Hadoop整个数据流转图如下:

ES-Hadoop.jpg

环境配置

  • Elasticsearch 5.0.2
  • Centos 7
  • elasticsearch-hadoop 5.0.2
  • repository-hdfs-5.0.2

Elasticsearch备份数据到HDFS

介绍

Elasticsearch副本提供了数据高可靠性,在部分节点丢失的情况下不中断服务;但是副本并不提供对灾难性故障的保护,同时在运维人员误操作情况下也不能保障数据的可恢复性。对于这种情况,我们需要对Elasticsearch集群数据的真正备份。

通过快照的方式,将Elasticsearch集群中的数据备份到HDFS上,这样数据既存在于Elasticsearch集群中,有存在于HDFS上。当ES集群出现不可恢复的故障时,可以将数据从HDFS上快速恢复。

操作步骤

备份与恢复

  • 构建一个仓库

    PUT http://192.168.10.74:9200/_snapshot/backup
    {  
    "type": "hdfs",  
      "settings": {  
              "uri": "hdfs://192.168.10.170:9000",  
              "path": "/es",  
              "conf_location": "/usr/local/hadoop/etc/hadoop/hdfs-site.xml"  
      }
    }
  • 备份快照

    PUT http://192.168.10.74:9200/_snapshot/backup/snapshot_users?wait_for_completion=true
    {
    "indices": "users",  //备份users的index,注意不设置这个属性,默认是备份所有index
    "ignore_unavailable": true,
    "include_global_state": false
    }
  • 恢复快照

    POST http://192.168.10.74:9200/_snapshot/backup/snapshot_users/_restore
    {
    "indices": "users",    //指定索引恢复,不指定就是所有
    "ignore_unavailable": true,     //忽略恢复时异常索引
    "include_global_state": false    //是否存储全局转态信息,fasle代表有一个或几个失败,不会导致整个任务失败
    }

整合Spark与Elasticsearch

整体思路

  • 数据首先存储在HDFS上,可以通过Spark SQL直接导入到ES中
  • Spark SQL可以直接通过建立Dataframe或者临时表连接ES,达到搜索优化、减少数据量和数据筛选的目的,此时数据只在ES内存中而不再Spark SQL中
  • 筛选后的数据重新导入到Spark SQL中进行查询

引入依赖

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>5.0.2</version>
</dependency>

具体流程

  • 数据在HDFS上,数据存储在HDFS的每个DataNode的block上

image-20181211115144840.png

  • 数据加载到Spark SQL

    • 数据从HDFS加载到Spark SQL中,以RDD形式存储
    JavaRDD<String> textFile = spark.read().textFile("hdfs://192.168.10.170:9000/csv/user.csv")
    • 添加数据结构信息转换为新的RDD
    JavaRDD<UserItem> dataSplits = textFile.map(line -> {
        String records = line.toString().trim();
        String record = records.substring(0,records.length() - 1).trim();
        String[] parts = record.split("\\|");
        UserItem u = new UserItem();
        u.setName(parts[0]);
        u.setAge(parts[1]);
        u.setHeight(parts[2]);
        return u;
    });
    • 根据新的RDD创建DataFrame
    DataSet<Row> ds = spark.createDataFrame(dataSplits, UserItem.class);

image-20181211120610726.png

  • 由Dataset创建索引,并写入ES

    JavaEsSparkSQL.saveToEs(ds, "es_spark/users");
  • 数据在ES中建立索引

image-20181211140747391.png

  • Spark SQL通过索引对ES中的数据进行查询

    SparkSession spark = SparkSession.builder().appName("es-spark").master("local").config("es.index.auto.create", true).getOrCreate();
    Map<String, String> options = new HashMap<>();
    options.put("pushdown", "true");
    options.put("es.nodes","192.168.10.74:9200");
    
    Dataset<Row> df = spark.read().options(options).format("org.elasticsearch.spark.sql").load("es_spark/users");
    df.createOrReplaceTempView("users");
    
    Dataset<Row> userSet = spark.sql("SELECT name FORM users WHERE age >=10 AND age <= 20");
    userSet.show();

结束

ES-Hadoop无缝打通了ES和Hadoop两个非常优秀的框架,从而让ES的强大检索性能帮助我们快速分析海量数据。

收起阅读 »

社区日报 第476期 (2018-12-12)

1. 快来学习下如何便捷地记录文档插入 es 的时间?
http://t.cn/EUfESt7
2. 6.5.3发布了,还没升级到6的同学又多了一个新的选择!
http://t.cn/EUfEHvA
3. (自备梯子)如何不停机修改索引的 mapping?
http://t.cn/EUfErLr

编辑:rockybean
归档:https://elasticsearch.cn/article/6193
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1. 快来学习下如何便捷地记录文档插入 es 的时间?
http://t.cn/EUfESt7
2. 6.5.3发布了,还没升级到6的同学又多了一个新的选择!
http://t.cn/EUfEHvA
3. (自备梯子)如何不停机修改索引的 mapping?
http://t.cn/EUfErLr

编辑:rockybean
归档:https://elasticsearch.cn/article/6193
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

logstash filter如何判断字段是够为空或者null

为什么我的数据中没有updateTime 和 createTime 字段的;  理论上是不会执行if 里面的代码才对的;  但是为什么看日志输出好像是执行了if代码块的代码呢 
 
下面的是数据源, 并没有time字段的
{
"仓ku": "华南",
"originName": "",
"Code": "23248",
"BrandName": "",
"originCode": null,
"CategoryName": "原厂"
}
继续阅读 »
为什么我的数据中没有updateTime 和 createTime 字段的;  理论上是不会执行if 里面的代码才对的;  但是为什么看日志输出好像是执行了if代码块的代码呢 
 
下面的是数据源, 并没有time字段的
{
"仓ku": "华南",
"originName": "",
"Code": "23248",
"BrandName": "",
"originCode": null,
"CategoryName": "原厂"
}
收起阅读 »

Day 12 - Elasticsearch日志场景最佳实践

1. 背景

Elasticsearch可广泛应用于日志分析、全文检索、结构化数据分析等多种场景,大幅度降低维护多套专用系统的成本,在开源社区非常受欢迎。然而Elasticsearch为满足多种不同的使用场景,底层组合使用了多种数据结构,部分数据结构对具体的用户使用场景可能是冗余的,从而导致默认情况下无法达到性能和成本最优化。 幸运的是,Elasticsearch提供非常灵活的模板配置能力,用户可以按需进行优化。多数情况下,用户结合使用场景进行优化后,Elasticsearch的性能都会有数倍的提升,成本也对应有倍数级别的下降。本文主要介绍不同日志使用场景下的调优经验。

2. 日志处理基本流程

日志处理的基本流程包含:日志采集 -> 数据清洗 -> 存储 -> 可视化分析。Elastic Stack提供完整的日志解决方案,帮助用户完成对日志处理全链路的管理,推荐大家使用。每个流程的处理如下:

  • 日志采集:从业务所在的机器上,较实时的采集日志传递给下游。常用开源组件如Beats、Logstash、Fluentd等。
  • 数据清洗:利用正则解析等机制,完成日志从文本数据到结构化数据的转换。用户可使用Logstash 或 Elasticsearch Ingest模块等完成数据清洗。
  • 存储:使用Elasticsearch对数据进行持久存储,并提供全文搜索和分析能力。
  • 可视化分析:通过图形界面,完成对日志的搜索分析,常用的开源组件如Kibana、Grafana。

使用Elastic Stack处理日志的详细过程,用户可参考官方文章Getting started with the Elastic Stack,这里不展开介绍。

3. 日志场景调优

       对于Elasticsearch的通用调优,之前分享的文章Elasticsearch调优实践,详细介绍了Elasticsearch在性能、稳定性方面的调优经验。而对于日志场景,不同的场景使用方式差别较大,这里主要介绍常见使用方式下,性能和成本的优化思路。

3.1 基础场景

对于多数简单日志使用场景,用户一般只要求存储原始日志,并提供按关键字搜索日志记录的能力。对于此类场景,用户可跳过数据清洗阶段,并参考如下方式进行优化:

  • 建议打开最优压缩,一般可降低40%存储。
  • 设置原始日志字段(message)为text,去除keyword类型子字段,提供全文搜索能力,降低存储。
  • 关闭_all索引,前面已通过message提供全文搜索能力。
  • 对于其他字符串字段,统一设置为keyword类型,避免默认情况下字符串字段同时存储text、keyword两种类型的数据。
  • 使用开源组件(如Beats)上报数据时会包含较多辅助信息,用户可通过修改组件配置文件进行裁剪。

这样去除message的keyword子字段、_all等冗余信息后,再加上最优压缩,可以保证数据相对精简。下面给出这类场景的常用模板,供用户参考:

{
    "order": 5,
    "template": "my_log_*",
    "settings": {
        "translog.durability": "async",
        "translog.sync_interval": "5s",
        "index.refresh_interval": "30s",
        "index.codec": "best_compression"    # 最优压缩
    },
    "mappings": {
        "_default_": {
            "_all": {                        # 关闭_all索引
                "enabled": false
            },
            "dynamic_templates": [
                {
                    "log": {                 # 原始日志字段,分词建立索引
                        "match": "message",
                        "mapping": {
                            "type": "text"
                        }
                    }
                },
                {
                    "strings": {             # 其他字符串字段,统一设置为keyword类型
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "keyword"
                        }
                    }
                }
            ]
        }
    }
}

3.2 精准搜索场景

对于部分用户,普通的全文检索并不能满足需求,希望精准搜索日志中的某部分,例如每条日志中包含程序运行时多个阶段的耗时数据,对具体一个阶段的耗时进行搜索就比较麻烦。对于此类场景,用户可基于基础场景,进行如下调整:

  • 清洗过程中,可仅解析出需要精准搜索的部分作为独立字段,用于精准搜索。
  • 对于精准搜索字段,如果无排序/聚合需求,可以关闭doc_values;对于字符串,一般使用keyword,可按需考虑使用text。

下面给出这类场景的常用模板,供用户参考:

{
    "order": 5,
    "template": "my_log_*",
    "settings": {
        "translog.durability": "async",
        "translog.sync_interval": "5s",
        "index.refresh_interval": "30s",
        "index.codec": "best_compression"    # 最优压缩
    },
    "mappings": {
        "_default_": {
            "_all": {                        # 关闭_all索引
                "enabled": false
            },
            "dynamic_templates": [
                {
                    "log": {                 # 原始日志字段,分词建立索引
                        "match": "message",
                        "mapping": {
                            "type": "text"
                        }
                    }
                },
                {
                    "precise_fieldx": {       # 精准搜索字段
                        "match": "fieldx",
                        "mapping": {
                            "type": "keyword",
                            "doc_values": false
                        }
                    }
                },
                {
                    "strings": {             # 其他字符串字段,统一设置为keyword类型
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "keyword"
                        }
                    }
                }
            ]
        }
    }
}

3.3 统计分析场景

对于某些场景,日志包含的主要是程序运行时输出的统计信息,用户通常会完全解析日志进行精确查询、统计分析,而是否保存原始日志关系不大。对于此类场景,用户可进行如下调整:

  • 清洗过程中,解析出所有需要的数据作为独立字段;原始日志非必要时,建议去除。
  • 如果有强需求保留原始日志,可以设置该字段enabled属性为false,只存储不索引。
  • 多数字段保持默认即可,会自动建立索引、打开doc_values,可用于查询、排序、聚合。
  • 对部分无排序/聚合需求、开销高的字段,可以关闭doc_values。

下面给出这类场景的常用模板,供用户参考:

{
    "order": 5,
    "template": "my_log_*",
    "settings": {
        "translog.durability": "async",
        "translog.sync_interval": "5s",
        "index.refresh_interval": "30s",
        "index.codec": "best_compression"    # 最优压缩
    },
    "mappings": {
        "_default_": {
            "_all": {                        # 关闭_all索引
                "enabled": false
            },
            "dynamic_templates": [
                {
                    "log": {                 # 原始日志字段,关闭索引
                        "match": "message",
                        "mapping": {
                            "enabled": false
                        }
                    }
                },
                {
                    "index_only_fieldx": {   # 仅索引的字段,无排序/聚合需求
                        "match": "fieldx",
                        "mapping": {
                            "type": "keyword",
                            "doc_values": false
                        }
                    }
                },
                {
                    "strings": {             # 其他字符串字段,统一设置为keyword类型
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "keyword"
                        }
                    }
                }
            ]
        }
    }
}

ES 5.1及之后的版本,支持关键字查询时自动选择目标字段,用户没有必要再使用原始日志字段提供不指定字段进行查询的能力。

4. 小结

日志的使用方式比较灵活,本文结合常见的客户使用方式,从整体上对性能、成本进行优化。用户也可结合自身业务场景,参考文章Elasticsearch调优实践进行更细致的优化。

继续阅读 »

1. 背景

Elasticsearch可广泛应用于日志分析、全文检索、结构化数据分析等多种场景,大幅度降低维护多套专用系统的成本,在开源社区非常受欢迎。然而Elasticsearch为满足多种不同的使用场景,底层组合使用了多种数据结构,部分数据结构对具体的用户使用场景可能是冗余的,从而导致默认情况下无法达到性能和成本最优化。 幸运的是,Elasticsearch提供非常灵活的模板配置能力,用户可以按需进行优化。多数情况下,用户结合使用场景进行优化后,Elasticsearch的性能都会有数倍的提升,成本也对应有倍数级别的下降。本文主要介绍不同日志使用场景下的调优经验。

2. 日志处理基本流程

日志处理的基本流程包含:日志采集 -> 数据清洗 -> 存储 -> 可视化分析。Elastic Stack提供完整的日志解决方案,帮助用户完成对日志处理全链路的管理,推荐大家使用。每个流程的处理如下:

  • 日志采集:从业务所在的机器上,较实时的采集日志传递给下游。常用开源组件如Beats、Logstash、Fluentd等。
  • 数据清洗:利用正则解析等机制,完成日志从文本数据到结构化数据的转换。用户可使用Logstash 或 Elasticsearch Ingest模块等完成数据清洗。
  • 存储:使用Elasticsearch对数据进行持久存储,并提供全文搜索和分析能力。
  • 可视化分析:通过图形界面,完成对日志的搜索分析,常用的开源组件如Kibana、Grafana。

使用Elastic Stack处理日志的详细过程,用户可参考官方文章Getting started with the Elastic Stack,这里不展开介绍。

3. 日志场景调优

       对于Elasticsearch的通用调优,之前分享的文章Elasticsearch调优实践,详细介绍了Elasticsearch在性能、稳定性方面的调优经验。而对于日志场景,不同的场景使用方式差别较大,这里主要介绍常见使用方式下,性能和成本的优化思路。

3.1 基础场景

对于多数简单日志使用场景,用户一般只要求存储原始日志,并提供按关键字搜索日志记录的能力。对于此类场景,用户可跳过数据清洗阶段,并参考如下方式进行优化:

  • 建议打开最优压缩,一般可降低40%存储。
  • 设置原始日志字段(message)为text,去除keyword类型子字段,提供全文搜索能力,降低存储。
  • 关闭_all索引,前面已通过message提供全文搜索能力。
  • 对于其他字符串字段,统一设置为keyword类型,避免默认情况下字符串字段同时存储text、keyword两种类型的数据。
  • 使用开源组件(如Beats)上报数据时会包含较多辅助信息,用户可通过修改组件配置文件进行裁剪。

这样去除message的keyword子字段、_all等冗余信息后,再加上最优压缩,可以保证数据相对精简。下面给出这类场景的常用模板,供用户参考:

{
    "order": 5,
    "template": "my_log_*",
    "settings": {
        "translog.durability": "async",
        "translog.sync_interval": "5s",
        "index.refresh_interval": "30s",
        "index.codec": "best_compression"    # 最优压缩
    },
    "mappings": {
        "_default_": {
            "_all": {                        # 关闭_all索引
                "enabled": false
            },
            "dynamic_templates": [
                {
                    "log": {                 # 原始日志字段,分词建立索引
                        "match": "message",
                        "mapping": {
                            "type": "text"
                        }
                    }
                },
                {
                    "strings": {             # 其他字符串字段,统一设置为keyword类型
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "keyword"
                        }
                    }
                }
            ]
        }
    }
}

3.2 精准搜索场景

对于部分用户,普通的全文检索并不能满足需求,希望精准搜索日志中的某部分,例如每条日志中包含程序运行时多个阶段的耗时数据,对具体一个阶段的耗时进行搜索就比较麻烦。对于此类场景,用户可基于基础场景,进行如下调整:

  • 清洗过程中,可仅解析出需要精准搜索的部分作为独立字段,用于精准搜索。
  • 对于精准搜索字段,如果无排序/聚合需求,可以关闭doc_values;对于字符串,一般使用keyword,可按需考虑使用text。

下面给出这类场景的常用模板,供用户参考:

{
    "order": 5,
    "template": "my_log_*",
    "settings": {
        "translog.durability": "async",
        "translog.sync_interval": "5s",
        "index.refresh_interval": "30s",
        "index.codec": "best_compression"    # 最优压缩
    },
    "mappings": {
        "_default_": {
            "_all": {                        # 关闭_all索引
                "enabled": false
            },
            "dynamic_templates": [
                {
                    "log": {                 # 原始日志字段,分词建立索引
                        "match": "message",
                        "mapping": {
                            "type": "text"
                        }
                    }
                },
                {
                    "precise_fieldx": {       # 精准搜索字段
                        "match": "fieldx",
                        "mapping": {
                            "type": "keyword",
                            "doc_values": false
                        }
                    }
                },
                {
                    "strings": {             # 其他字符串字段,统一设置为keyword类型
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "keyword"
                        }
                    }
                }
            ]
        }
    }
}

3.3 统计分析场景

对于某些场景,日志包含的主要是程序运行时输出的统计信息,用户通常会完全解析日志进行精确查询、统计分析,而是否保存原始日志关系不大。对于此类场景,用户可进行如下调整:

  • 清洗过程中,解析出所有需要的数据作为独立字段;原始日志非必要时,建议去除。
  • 如果有强需求保留原始日志,可以设置该字段enabled属性为false,只存储不索引。
  • 多数字段保持默认即可,会自动建立索引、打开doc_values,可用于查询、排序、聚合。
  • 对部分无排序/聚合需求、开销高的字段,可以关闭doc_values。

下面给出这类场景的常用模板,供用户参考:

{
    "order": 5,
    "template": "my_log_*",
    "settings": {
        "translog.durability": "async",
        "translog.sync_interval": "5s",
        "index.refresh_interval": "30s",
        "index.codec": "best_compression"    # 最优压缩
    },
    "mappings": {
        "_default_": {
            "_all": {                        # 关闭_all索引
                "enabled": false
            },
            "dynamic_templates": [
                {
                    "log": {                 # 原始日志字段,关闭索引
                        "match": "message",
                        "mapping": {
                            "enabled": false
                        }
                    }
                },
                {
                    "index_only_fieldx": {   # 仅索引的字段,无排序/聚合需求
                        "match": "fieldx",
                        "mapping": {
                            "type": "keyword",
                            "doc_values": false
                        }
                    }
                },
                {
                    "strings": {             # 其他字符串字段,统一设置为keyword类型
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "keyword"
                        }
                    }
                }
            ]
        }
    }
}

ES 5.1及之后的版本,支持关键字查询时自动选择目标字段,用户没有必要再使用原始日志字段提供不指定字段进行查询的能力。

4. 小结

日志的使用方式比较灵活,本文结合常见的客户使用方式,从整体上对性能、成本进行优化。用户也可结合自身业务场景,参考文章Elasticsearch调优实践进行更细致的优化。

收起阅读 »

用elasitc stack监控kafka

当我们搭建elasitc stack集群时,大多数时候会在我们的架构中加入kafka作为消息缓冲区,即从beats -> kafka -> logstash -> elasticsearch这样的一个消息流。使用kafka可以给我们带来很多便利,但是也让我们需要额外多维护一套组件,elasitc stack本身已经提供了monitoring的功能,我们可以方便的从kibana上监控各个组件中各节点的可用性,吞吐和性能等各种指标,但kafka作为架构中的组件之一却游离在监控之外,相当不合理。

幸而elastic真的是迭代的相当快,在metricbeat上很早就有了对kafka的监控,但一直没有一个直观的dashboard,终于在6.5版本上,上新了kafka dashboard。我们来看一下吧。

安装和配置metricbeat

安装包下载地址,下载后,自己安装。 然后,将/etc/metricbeat/modules.d/kafka.yml.disable文件重命名为/etc/metricbeat/modules.d/kafka.yml。(即打开kafka的监控)。稍微修改一下文件内容, 注意,这里需填入所有你需要监控的kafka服务器的地址

# Module: kafka
# Docs: https://www.elastic.co/guide/en/beats/metricbeat/6.4/metricbeat-module-kafka.html

- module: kafka
  metricsets:
    - partition
    - consumergroup
  period: 20s
  hosts: ["10.*.*.*:9092","10.*.*.*:9092","10.*.*.*:9092","10.*.*.*:9092"]

  #client_id: metricbeat
  #retries: 3
  #backoff: 250ms

  # List of Topics to query metadata for. If empty, all topics will be queried.
  #topics: []

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

  # SASL authentication
  #username: ""
  #password: ""

运行metricbeat,这里,一定要注意enable kibana dashboard。

然后就可以在kibana里面看到:

20181212112635653.png
2018121211265464.png

这样,我们就可以通过sentinl等类似的插件,自动做kafka的告警等功能了

继续阅读 »

当我们搭建elasitc stack集群时,大多数时候会在我们的架构中加入kafka作为消息缓冲区,即从beats -> kafka -> logstash -> elasticsearch这样的一个消息流。使用kafka可以给我们带来很多便利,但是也让我们需要额外多维护一套组件,elasitc stack本身已经提供了monitoring的功能,我们可以方便的从kibana上监控各个组件中各节点的可用性,吞吐和性能等各种指标,但kafka作为架构中的组件之一却游离在监控之外,相当不合理。

幸而elastic真的是迭代的相当快,在metricbeat上很早就有了对kafka的监控,但一直没有一个直观的dashboard,终于在6.5版本上,上新了kafka dashboard。我们来看一下吧。

安装和配置metricbeat

安装包下载地址,下载后,自己安装。 然后,将/etc/metricbeat/modules.d/kafka.yml.disable文件重命名为/etc/metricbeat/modules.d/kafka.yml。(即打开kafka的监控)。稍微修改一下文件内容, 注意,这里需填入所有你需要监控的kafka服务器的地址

# Module: kafka
# Docs: https://www.elastic.co/guide/en/beats/metricbeat/6.4/metricbeat-module-kafka.html

- module: kafka
  metricsets:
    - partition
    - consumergroup
  period: 20s
  hosts: ["10.*.*.*:9092","10.*.*.*:9092","10.*.*.*:9092","10.*.*.*:9092"]

  #client_id: metricbeat
  #retries: 3
  #backoff: 250ms

  # List of Topics to query metadata for. If empty, all topics will be queried.
  #topics: []

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

  # SASL authentication
  #username: ""
  #password: ""

运行metricbeat,这里,一定要注意enable kibana dashboard。

然后就可以在kibana里面看到:

20181212112635653.png
2018121211265464.png

这样,我们就可以通过sentinl等类似的插件,自动做kafka的告警等功能了

收起阅读 »

搭建Elasitc stack集群需要注意的日志问题

@[toc] 搭建Elasitc stack集群时,我们往往把大部分注意力放在集群的搭建,索引的优化,分片的设置上等具体的调优参数上,很少有人会去关心Elasitc stack的日志配置的问题,大概是觉得,日志应该是一个公共的问题,默认的配置应该已经为我们处理好了。但很不幸,在不同的机器配置或者不同的运营策略下,如果采用默认的配置,会给我们带来麻烦。

默认配置带来的麻烦

以下例子是默认情况下,当Elasitc stack集群运行超过3个月之后的情况:

elasticsearch

elasticsearch默认情况下会每天rolling一个文件,当到达2G的时候,才开始清除超出的部分,当一个文件只有几十K的时候,文件会一直累计下来。

20181210163659128.png

logstash

一直增长的gc文件和不停增多的rolling日志文件

20181210164358500.png

kibana

默认日志输出到kibana.out文件当中,这个文件会变得越来越大

20181210163521285.png

kafka

这里提到kafka是因为在大部分的架构当中,我们都会用到kafka作为中间件数据缓冲区,因此不得不维护kafka集群。同样,如果不做特定的配置,也会遇到日志的问题:不停增多的rolling日志文件

20181210164731512.png

原因是kafka的默认log4j配置是使用DailyRollingFileAppender每隔一个小时生成一个文件 '.'yyyy-MM-dd-HH

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

解决方案

因此,对于我们需要维护的这几个组件,需要配置合理的日志rotate策略。一个比较常用的策略就是时间+size,每天rotate一个日志文件或者每当日志文件大小超过256M,rotate一个新的日志文件,并且最多保留7天之内的日志文件。

elasticsearch 

通过修改log4j2.properties文件来解决。该文件在/etc/elasticsesarch目录下(或者config目录)。 默认配置是:

appender.rolling.type = RollingFile 
appender.rolling.name = rolling
appender.rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}.log 
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %.-10000m%n
appender.rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}-%d{yyyy-MM-dd}-%i.log.gz 
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy 
appender.rolling.policies.time.interval = 1 
appender.rolling.policies.time.modulate = true 
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy 
appender.rolling.policies.size.size = 256MB 
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.fileIndex = nomax
appender.rolling.strategy.action.type = Delete 
appender.rolling.strategy.action.basepath = ${sys:es.logs.base_path}
appender.rolling.strategy.action.condition.type = IfFileName 
appender.rolling.strategy.action.condition.glob = ${sys:es.logs.cluster_name}-* 
appender.rolling.strategy.action.condition.nested_condition.type = IfAccumulatedFileSize 
appender.rolling.strategy.action.condition.nested_condition.exceeds = 2GB 

以上默认配置,会保存2GB的日志,只有累计的日志大小超过2GB的时候,才会删除旧的日志文件。 建议改为如下配置,仅保留最近7天的日志

appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.action.type = Delete
appender.rolling.strategy.action.basepath = ${sys:es.logs.base_path}
appender.rolling.strategy.action.condition.type = IfFileName
appender.rolling.strategy.action.condition.glob = ${sys:es.logs.cluster_name}-*
appender.rolling.strategy.action.condition.nested_condition.type = IfLastModified
appender.rolling.strategy.action.condition.nested_condition.age = 7D

这里必须注意,log4j2会因为末尾的空格导致无法识别配置

logstash

与elasticsearch类似,通过修改log4j2.properties文件来解决。该文件在/etc/logstash目录下(或者config目录)。 默认配置是不会删除历史日志的:

status = error
name = LogstashPropertiesConfig

appender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n

appender.json_console.type = Console
appender.json_console.name = json_console
appender.json_console.layout.type = JSONLayout
appender.json_console.layout.compact = true
appender.json_console.layout.eventEol = true

appender.rolling.type = RollingFile
appender.rolling.name = plain_rolling
appender.rolling.fileName = ${sys:ls.logs}/logstash-${sys:ls.log.format}.log
appender.rolling.filePattern = ${sys:ls.logs}/logstash-${sys:ls.log.format}-%d{yyyy-MM-dd}.log
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 1
appender.rolling.policies.time.modulate = true
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %-.10000m%n

需手动加上:

appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.action.type = Delete
appender.rolling.strategy.action.basepath = ${sys:ls.logs}
appender.rolling.strategy.action.condition.type = IfFileName
appender.rolling.strategy.action.condition.glob = ${sys:ls.logs}/logstash-${sys:ls.log.format}
appender.rolling.strategy.action.condition.nested_condition.type = IfLastModified
appender.rolling.strategy.action.condition.nested_condition.age = 7D

kibana

在kibana的配置文件中,只有以下几个选项:

logging.dest:
Default: stdout Enables you specify a file where Kibana stores log output.
logging.quiet:
Default: false Set the value of this setting to true to suppress all logging output other than error messages.
logging.silent:
Default: false Set the value of this setting to true to suppress all logging output.
logging.verbose:
Default: false Set the value of this setting to true to log all events, including system usage information and all requests. Supported on Elastic Cloud Enterprise.
logging.timezone
Default: UTC Set to the canonical timezone id (e.g. US/Pacific) to log events using that timezone. A list of timezones can be referenced at https://en.wikipedia.org/wiki/List_of_tz_database_time_zones.

我们可以指定输出的日志文件与日志内容,但是却不可以配置日志的rotate。这时,我们需要使用logrotate,这个linux默认安装的工具。 首先,我们要在配置文件里面指定生成pid文件:

pid.file: "pid.log"

然后,修改/etc/logrotate.conf:

/var/log/kibana {
    missingok
    notifempty
    sharedscripts
    daily
    rotate 7
    copytruncate
    /bin/kill -HUP $(cat /usr/share/kibana/pid.log 2>/dev/null) 2>/dev/null
    endscript
}

kafka

如果不想写脚本清理过多的文件的话,需要修改config/log4j.properties文件。使用RollingFileAppender代替DailyRollingFileAppender,同时设置MaxFileSizeMaxBackupIndex。即修改为:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.MaxFileSize=10MB
log4j.appender.kafkaAppender.MaxBackupIndex=10
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.MaxFileSize=10M
log4j.appender.stateChangeAppender.MaxBackupIndex=10
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.MaxFileSize=10MB
log4j.appender.requestAppender.MaxBackupIndex=10
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.MaxFileSize=10MB
log4j.appender.cleanerAppender.MaxBackupIndex=10
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controllerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.MaxFileSize=10MB
log4j.appender.controllerAppender.MaxBackupIndex=10
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.MaxFileSize=10MB
log4j.appender.authorizerAppender.MaxBackupIndex=10
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

# Turn on all our debugging info
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
#log4j.logger.kafka.perf=DEBUG, kafkaAppender
#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
log4j.logger.kafka=INFO, kafkaAppender

log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

#Change this to debug to get the actual audit log for authorizer.
log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false
继续阅读 »

@[toc] 搭建Elasitc stack集群时,我们往往把大部分注意力放在集群的搭建,索引的优化,分片的设置上等具体的调优参数上,很少有人会去关心Elasitc stack的日志配置的问题,大概是觉得,日志应该是一个公共的问题,默认的配置应该已经为我们处理好了。但很不幸,在不同的机器配置或者不同的运营策略下,如果采用默认的配置,会给我们带来麻烦。

默认配置带来的麻烦

以下例子是默认情况下,当Elasitc stack集群运行超过3个月之后的情况:

elasticsearch

elasticsearch默认情况下会每天rolling一个文件,当到达2G的时候,才开始清除超出的部分,当一个文件只有几十K的时候,文件会一直累计下来。

20181210163659128.png

logstash

一直增长的gc文件和不停增多的rolling日志文件

20181210164358500.png

kibana

默认日志输出到kibana.out文件当中,这个文件会变得越来越大

20181210163521285.png

kafka

这里提到kafka是因为在大部分的架构当中,我们都会用到kafka作为中间件数据缓冲区,因此不得不维护kafka集群。同样,如果不做特定的配置,也会遇到日志的问题:不停增多的rolling日志文件

20181210164731512.png

原因是kafka的默认log4j配置是使用DailyRollingFileAppender每隔一个小时生成一个文件 '.'yyyy-MM-dd-HH

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

解决方案

因此,对于我们需要维护的这几个组件,需要配置合理的日志rotate策略。一个比较常用的策略就是时间+size,每天rotate一个日志文件或者每当日志文件大小超过256M,rotate一个新的日志文件,并且最多保留7天之内的日志文件。

elasticsearch 

通过修改log4j2.properties文件来解决。该文件在/etc/elasticsesarch目录下(或者config目录)。 默认配置是:

appender.rolling.type = RollingFile 
appender.rolling.name = rolling
appender.rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}.log 
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %.-10000m%n
appender.rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}-%d{yyyy-MM-dd}-%i.log.gz 
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy 
appender.rolling.policies.time.interval = 1 
appender.rolling.policies.time.modulate = true 
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy 
appender.rolling.policies.size.size = 256MB 
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.fileIndex = nomax
appender.rolling.strategy.action.type = Delete 
appender.rolling.strategy.action.basepath = ${sys:es.logs.base_path}
appender.rolling.strategy.action.condition.type = IfFileName 
appender.rolling.strategy.action.condition.glob = ${sys:es.logs.cluster_name}-* 
appender.rolling.strategy.action.condition.nested_condition.type = IfAccumulatedFileSize 
appender.rolling.strategy.action.condition.nested_condition.exceeds = 2GB 

以上默认配置,会保存2GB的日志,只有累计的日志大小超过2GB的时候,才会删除旧的日志文件。 建议改为如下配置,仅保留最近7天的日志

appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.action.type = Delete
appender.rolling.strategy.action.basepath = ${sys:es.logs.base_path}
appender.rolling.strategy.action.condition.type = IfFileName
appender.rolling.strategy.action.condition.glob = ${sys:es.logs.cluster_name}-*
appender.rolling.strategy.action.condition.nested_condition.type = IfLastModified
appender.rolling.strategy.action.condition.nested_condition.age = 7D

这里必须注意,log4j2会因为末尾的空格导致无法识别配置

logstash

与elasticsearch类似,通过修改log4j2.properties文件来解决。该文件在/etc/logstash目录下(或者config目录)。 默认配置是不会删除历史日志的:

status = error
name = LogstashPropertiesConfig

appender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n

appender.json_console.type = Console
appender.json_console.name = json_console
appender.json_console.layout.type = JSONLayout
appender.json_console.layout.compact = true
appender.json_console.layout.eventEol = true

appender.rolling.type = RollingFile
appender.rolling.name = plain_rolling
appender.rolling.fileName = ${sys:ls.logs}/logstash-${sys:ls.log.format}.log
appender.rolling.filePattern = ${sys:ls.logs}/logstash-${sys:ls.log.format}-%d{yyyy-MM-dd}.log
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 1
appender.rolling.policies.time.modulate = true
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %-.10000m%n

需手动加上:

appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.action.type = Delete
appender.rolling.strategy.action.basepath = ${sys:ls.logs}
appender.rolling.strategy.action.condition.type = IfFileName
appender.rolling.strategy.action.condition.glob = ${sys:ls.logs}/logstash-${sys:ls.log.format}
appender.rolling.strategy.action.condition.nested_condition.type = IfLastModified
appender.rolling.strategy.action.condition.nested_condition.age = 7D

kibana

在kibana的配置文件中,只有以下几个选项:

logging.dest:
Default: stdout Enables you specify a file where Kibana stores log output.
logging.quiet:
Default: false Set the value of this setting to true to suppress all logging output other than error messages.
logging.silent:
Default: false Set the value of this setting to true to suppress all logging output.
logging.verbose:
Default: false Set the value of this setting to true to log all events, including system usage information and all requests. Supported on Elastic Cloud Enterprise.
logging.timezone
Default: UTC Set to the canonical timezone id (e.g. US/Pacific) to log events using that timezone. A list of timezones can be referenced at https://en.wikipedia.org/wiki/List_of_tz_database_time_zones.

我们可以指定输出的日志文件与日志内容,但是却不可以配置日志的rotate。这时,我们需要使用logrotate,这个linux默认安装的工具。 首先,我们要在配置文件里面指定生成pid文件:

pid.file: "pid.log"

然后,修改/etc/logrotate.conf:

/var/log/kibana {
    missingok
    notifempty
    sharedscripts
    daily
    rotate 7
    copytruncate
    /bin/kill -HUP $(cat /usr/share/kibana/pid.log 2>/dev/null) 2>/dev/null
    endscript
}

kafka

如果不想写脚本清理过多的文件的话,需要修改config/log4j.properties文件。使用RollingFileAppender代替DailyRollingFileAppender,同时设置MaxFileSizeMaxBackupIndex。即修改为:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.MaxFileSize=10MB
log4j.appender.kafkaAppender.MaxBackupIndex=10
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.MaxFileSize=10M
log4j.appender.stateChangeAppender.MaxBackupIndex=10
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.MaxFileSize=10MB
log4j.appender.requestAppender.MaxBackupIndex=10
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.MaxFileSize=10MB
log4j.appender.cleanerAppender.MaxBackupIndex=10
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controllerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.MaxFileSize=10MB
log4j.appender.controllerAppender.MaxBackupIndex=10
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.MaxFileSize=10MB
log4j.appender.authorizerAppender.MaxBackupIndex=10
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

# Turn on all our debugging info
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
#log4j.logger.kafka.perf=DEBUG, kafkaAppender
#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
log4j.logger.kafka=INFO, kafkaAppender

log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

#Change this to debug to get the actual audit log for authorizer.
log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false
收起阅读 »