亲,只收二进制

logstash input插件开发

logstash作为一个数据管道中间件,支持对各种类型数据的采集与转换,并将数据发送到各种类型的存储库,比如实现消费kafka数据并且写入到Elasticsearch, 日志文件同步到对象存储S3等,mysql数据同步到Elasticsearch等。

logstash内部主要包含三个模块:

* input: 从数据源获取数据
* filter: 过滤、转换数据
* output: 输出数据

不同类型的数据都可以通过对应的input-plugin, output-plugin完成数据的输入与输出。如需要消费kafka中的数据并写入到Elasticsearch中,则需要使用logstash的kafka-input-plugin完成数据输入,logstash-output-elasticsearch完成数据输出。如果需要对输入数据进行过滤或者转换,比如根据关键词过滤掉不需要的内容,或者时间字段的格式转换,就需要又filter-plugin完成了。

logstash的input插件目前已经有几十种了,支持大多数比较通用或开源的数据源的输入。但如果公司内部开发的数据库或其它存储类的服务不能和开源产品在接口协议上兼容,比如腾讯自研的消息队列服务CMQ不依赖于其它的开源消息队列产品,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的数据;腾讯云对象存储服务COS, 在鉴权方式上和AWS的S3存在差异,也不能直接使用logstash-input-s3插件从COS中读取数据,对于这种情况,就需要自己开发logstash的input插件了。

本文以开发logstash的cos input插件为例,介绍如何开发logstash的input插件。

logstash官方提供了有个简单的input plugin example可供参考: https://github.com/logstash-plugins/logstash-input-example/

环境准备

logstash使用jruby开发,首先要配置jruby环境:

  1. 安装rvm:

    rvm是一个ruby管理器,可以安装并管理ruby环境,也可以通过命令行切换到不同的ruby版本。

    gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
    \curl -sSL https://get.rvm.io | bash -s stable
    source /etc/profile.d/rvm.sh
  2. 安装jruby

    rvm install jruby
    rvm use jruby
  3. 安装包管理工具bundle和测试工具rspec

    gem install bundle
    gem install rspec

从example开始

  1. clone logstash-input-example

    git clone https://github.com/logstash-plugins/logstash-input-example.git
  2. 将clone出来的logstash-input-example源码copy到logstash-input-cos目录,并删除.git文件夹,目的是以logstash-input-example的源码为参考进行开发,同时把需要改动名称的地方修改一下:

    mv logstash-input-example.gemspec logstash-input-cos.gemspec
    mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb
    mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb
  3. 建立的源码目录结构如图所示:

其中,重要文件的作用说明如下:

  • cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
  • cos_spec.rb: 单元测试文件,通过rspec可以对cos.rb中的代码进行测试
  • logstash-input-cos.gemspec: 类似于maven中的pom.xml文件,配置工程的版本、名称、licene,包依赖等,通过bundle命令可以下载依赖包

配置并下载依赖

因为腾讯云COS服务没有ruby sdk, 因为只能依赖其Java sdk进行开发,首先添加对cos java sdk的依赖。在logstash-input-cos.gemspec中Gem dependencies配置栏中增加以下内容:

# Gem dependencies
  s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
  s.add_runtime_dependency 'logstash-codec-plain'
  s.add_runtime_dependency 'stud', '>= 0.0.22'
  s.add_runtime_dependency 'jar-dependencies'
  s.add_development_dependency 'logstash-devutils', '1.3.6'

相比logstash-input-example.gemspec,增加了对com.qcloud:cos_api包以及jar-dependencies包的依赖,jar-dependencies用于在ruby环境中管理jar包,并且可以跟踪jar包的加载状态。

然后,在logstash-input-cos.gemspec中增加配置:

s.platform = 'java'

这样可以成功下载java依赖包,并且可以在ruby代码中直接调用java代码。

最后,执行以下命令下载依赖:

bundle install

编写代码

logstash-input-cos的代码逻辑其实比较简单,主要是通过执行定时任务,调用cos java sdk中的listObjects方法,获取到指定bucket里的数据,并在每次定时任务执行结束后设置marker保存在本地,再次执行时从marker位置获取数据,以实现数据的增量同步。

jar包的引用

因为要调用cos java sdk中的代码,先引用该jar包:

require 'cos_api-5.4.4.jar'
java_import com.qcloud.cos.COSClient;
java_import com.qcloud.cos.ClientConfig;
java_import com.qcloud.cos.auth.BasicCOSCredentials;
java_import com.qcloud.cos.auth.COSCredentials;
java_import com.qcloud.cos.exception.CosClientException;
java_import com.qcloud.cos.exception.CosServiceException;
java_import com.qcloud.cos.model.COSObjectSummary;
java_import com.qcloud.cos.model.ListObjectsRequest;
java_import com.qcloud.cos.model.ObjectListing;
java_import com.qcloud.cos.region.Region;

读取配置文件

logstash配置文件读取的代码如图所示:

config_name为cos,其它的配置项读取代码按照ruby的代码规范编写,添加类型校验与默认值,就可以从以下配置文件中读取配置项:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

实现register方法

logstash input插件必须实现另个方法:register 和run

register方法类似于初始化方法,在该方法中可以直接使用从配置文件读取并赋值的变量,完成cos client的初始化,代码如下:

    # 1 初始化用户身份信息(appid, secretId, secretKey)
    cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
    # 2 设置bucket的区域, COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224
    clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
    # 3 生成cos客户端
    @cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
    # bucket名称, 需包含appid
    bucketName = @bucket + "-"+ @appId
    @bucketName = bucketName

    @listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()
    # 设置bucket名称
    @listObjectsRequest.setBucketName(bucketName)
    # prefix表示列出的object的key以prefix开始
    @listObjectsRequest.setPrefix(@prefix)
    # 设置最大遍历出多少个对象, 一次listobject最大支持1000
    @listObjectsRequest.setMaxKeys(1000)
    @listObjectsRequest.setMarker(@markerConfig.getMarker)

示例代码中设置了@cosclient和@listObjectRequest为全局变量, 因为在run方法中会用到这两个变量。

注意在ruby中调用java代码的方式:没有变量描述符;不能直接new Object(),而只能Object.new().

实现run方法

run方法获取数据并将数据流转换成event事件

最简单的run方法为:

def run(queue)
    Stud.interval(@interval) do
      event = LogStash::Event.new("message" => @message, "host" => @host)
      decorate(event)
      queue << event
    end # loop
  end # def run

代码说明:

  • 通过Stud ruby模块执行定时任务,interval可自定义,从配置文件中读取
  • 生成event, 示例代码生成了一个包含两个字段数据的event
  • 调用decorate()方法, 给该event打上tag,如果配置的话
  • queue<<event, 将event插入到数据管道中,发送给filter处理

logstash-input-cos的run方法实现为:

def run(queue)
    @current_thread = Thread.current
    Stud.interval(@interval) do
      process(queue)
    end
end

def process(queue)
    @logger.info('Marker from: ' + @markerConfig.getMarker)

    objectListing = @cosclient.listObjects(@listObjectsRequest)
    nextMarker = objectListing.getNextMarker()
    cosObjectSummaries = objectListing.getObjectSummaries()
    cosObjectSummaries.each do |obj|
       # 文件的路径key
       key = obj.getKey()

       if stop?
         @logger.info("stop while attempting to read log file")
         break
       end
       # 根据key获取内容
       getObject(key) { |log|
         # 发送消息
         @codec.decode(log) do |event|
           decorate(event)
           queue << event
         end
       }

       #记录 marker
       @markerConfig.setMarker(key)
       @logger.info('Marker end: ' + @markerConfig.getMarker)
    end
  end

  # 获取下载输入流
 def getObject(key, &block)
    getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
    cosObject = @cosclient.getObject(getObjectRequest)
    cosObjectInput = cosObject.getObjectContent()
    buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
    while (line = buffered.readLine())
      block.call(line)
    end
  end

测试代码

在spec/inputs/cos_spec.rb中增加如下测试代码:

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"

describe LogStash::Inputs::Cos do

  it_behaves_like "an interruptible input plugin" do
    let(:config) { {
        "endpoint" => 'cos.ap-guangzhou.myqcloud.com',
        "access_key_id" => '*',
        "access_key_secret" => '*',
        "bucket" => '*',
         "region" => 'ap-guangzhou',
         "appId" => '*',
        "interval" => 60 } }
  end
end

rspec是一个ruby测试库,通过bundle命令执行rspec:

bundle exec rspec

如果cos.rb中的代码没有语法或运行时错误,则会出现如果信息表明测试成功:

Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures

构建并测试input-plugin-cos

build

使用gem对input-plugin-cos插件源码进行build:

gem build logstash-input-cos.gemspec

构建完成后会生成一个名为logstash-input-cos-0.0.1-java.gem的文件

test

在logstash的解压目录下,执行一下命令安装logstash-input-cos plugin:

./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem

执行结果为:

Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
Installing logstash-input-cos
Installation successful

另外,可以通过./bin/logstash-plugin list命令查看logstash已经安装的所有input/output/filter/codec插件。

生成配置文件cos.logstash.conf,内容为:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

该配置文件使用腾讯云官网账号的secret_id和secret_key进行权限验证,拉取指定bucket里的数据,为了测试,将output设置为标准输出。

执行logstash:

./bin/logstash -f cos.logstash.conf

输出结果为:

Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties
[2018-07-30T19:26:17,039][WARN ][logstash.runner          ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.
[2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}
[2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}
[2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos      ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.
[2018-07-30T19:26:17,341][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos      ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}
[2018-07-30T19:26:17,528][INFO ][logstash.pipeline        ] Pipeline main started
[2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos      ] Marker from:
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2018-07-30T19:26:17,574][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos      ] Marker end: access.log
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.710Z
}
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://localhost:8080/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.711Z
}

在cos中的bucket里上传了名为access.log的nginx日志,上述输出结果中最后打印出来的每个json结构体构成一个event, 其中message消息即为access.log中每一条日志。

继续阅读 »

logstash作为一个数据管道中间件,支持对各种类型数据的采集与转换,并将数据发送到各种类型的存储库,比如实现消费kafka数据并且写入到Elasticsearch, 日志文件同步到对象存储S3等,mysql数据同步到Elasticsearch等。

logstash内部主要包含三个模块:

* input: 从数据源获取数据
* filter: 过滤、转换数据
* output: 输出数据

不同类型的数据都可以通过对应的input-plugin, output-plugin完成数据的输入与输出。如需要消费kafka中的数据并写入到Elasticsearch中,则需要使用logstash的kafka-input-plugin完成数据输入,logstash-output-elasticsearch完成数据输出。如果需要对输入数据进行过滤或者转换,比如根据关键词过滤掉不需要的内容,或者时间字段的格式转换,就需要又filter-plugin完成了。

logstash的input插件目前已经有几十种了,支持大多数比较通用或开源的数据源的输入。但如果公司内部开发的数据库或其它存储类的服务不能和开源产品在接口协议上兼容,比如腾讯自研的消息队列服务CMQ不依赖于其它的开源消息队列产品,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的数据;腾讯云对象存储服务COS, 在鉴权方式上和AWS的S3存在差异,也不能直接使用logstash-input-s3插件从COS中读取数据,对于这种情况,就需要自己开发logstash的input插件了。

本文以开发logstash的cos input插件为例,介绍如何开发logstash的input插件。

logstash官方提供了有个简单的input plugin example可供参考: https://github.com/logstash-plugins/logstash-input-example/

环境准备

logstash使用jruby开发,首先要配置jruby环境:

  1. 安装rvm:

    rvm是一个ruby管理器,可以安装并管理ruby环境,也可以通过命令行切换到不同的ruby版本。

    gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
    \curl -sSL https://get.rvm.io | bash -s stable
    source /etc/profile.d/rvm.sh
  2. 安装jruby

    rvm install jruby
    rvm use jruby
  3. 安装包管理工具bundle和测试工具rspec

    gem install bundle
    gem install rspec

从example开始

  1. clone logstash-input-example

    git clone https://github.com/logstash-plugins/logstash-input-example.git
  2. 将clone出来的logstash-input-example源码copy到logstash-input-cos目录,并删除.git文件夹,目的是以logstash-input-example的源码为参考进行开发,同时把需要改动名称的地方修改一下:

    mv logstash-input-example.gemspec logstash-input-cos.gemspec
    mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb
    mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb
  3. 建立的源码目录结构如图所示:

其中,重要文件的作用说明如下:

  • cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
  • cos_spec.rb: 单元测试文件,通过rspec可以对cos.rb中的代码进行测试
  • logstash-input-cos.gemspec: 类似于maven中的pom.xml文件,配置工程的版本、名称、licene,包依赖等,通过bundle命令可以下载依赖包

配置并下载依赖

因为腾讯云COS服务没有ruby sdk, 因为只能依赖其Java sdk进行开发,首先添加对cos java sdk的依赖。在logstash-input-cos.gemspec中Gem dependencies配置栏中增加以下内容:

# Gem dependencies
  s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
  s.add_runtime_dependency 'logstash-codec-plain'
  s.add_runtime_dependency 'stud', '>= 0.0.22'
  s.add_runtime_dependency 'jar-dependencies'
  s.add_development_dependency 'logstash-devutils', '1.3.6'

相比logstash-input-example.gemspec,增加了对com.qcloud:cos_api包以及jar-dependencies包的依赖,jar-dependencies用于在ruby环境中管理jar包,并且可以跟踪jar包的加载状态。

然后,在logstash-input-cos.gemspec中增加配置:

s.platform = 'java'

这样可以成功下载java依赖包,并且可以在ruby代码中直接调用java代码。

最后,执行以下命令下载依赖:

bundle install

编写代码

logstash-input-cos的代码逻辑其实比较简单,主要是通过执行定时任务,调用cos java sdk中的listObjects方法,获取到指定bucket里的数据,并在每次定时任务执行结束后设置marker保存在本地,再次执行时从marker位置获取数据,以实现数据的增量同步。

jar包的引用

因为要调用cos java sdk中的代码,先引用该jar包:

require 'cos_api-5.4.4.jar'
java_import com.qcloud.cos.COSClient;
java_import com.qcloud.cos.ClientConfig;
java_import com.qcloud.cos.auth.BasicCOSCredentials;
java_import com.qcloud.cos.auth.COSCredentials;
java_import com.qcloud.cos.exception.CosClientException;
java_import com.qcloud.cos.exception.CosServiceException;
java_import com.qcloud.cos.model.COSObjectSummary;
java_import com.qcloud.cos.model.ListObjectsRequest;
java_import com.qcloud.cos.model.ObjectListing;
java_import com.qcloud.cos.region.Region;

读取配置文件

logstash配置文件读取的代码如图所示:

config_name为cos,其它的配置项读取代码按照ruby的代码规范编写,添加类型校验与默认值,就可以从以下配置文件中读取配置项:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

实现register方法

logstash input插件必须实现另个方法:register 和run

register方法类似于初始化方法,在该方法中可以直接使用从配置文件读取并赋值的变量,完成cos client的初始化,代码如下:

    # 1 初始化用户身份信息(appid, secretId, secretKey)
    cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
    # 2 设置bucket的区域, COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224
    clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
    # 3 生成cos客户端
    @cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
    # bucket名称, 需包含appid
    bucketName = @bucket + "-"+ @appId
    @bucketName = bucketName

    @listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()
    # 设置bucket名称
    @listObjectsRequest.setBucketName(bucketName)
    # prefix表示列出的object的key以prefix开始
    @listObjectsRequest.setPrefix(@prefix)
    # 设置最大遍历出多少个对象, 一次listobject最大支持1000
    @listObjectsRequest.setMaxKeys(1000)
    @listObjectsRequest.setMarker(@markerConfig.getMarker)

示例代码中设置了@cosclient和@listObjectRequest为全局变量, 因为在run方法中会用到这两个变量。

注意在ruby中调用java代码的方式:没有变量描述符;不能直接new Object(),而只能Object.new().

实现run方法

run方法获取数据并将数据流转换成event事件

最简单的run方法为:

def run(queue)
    Stud.interval(@interval) do
      event = LogStash::Event.new("message" => @message, "host" => @host)
      decorate(event)
      queue << event
    end # loop
  end # def run

代码说明:

  • 通过Stud ruby模块执行定时任务,interval可自定义,从配置文件中读取
  • 生成event, 示例代码生成了一个包含两个字段数据的event
  • 调用decorate()方法, 给该event打上tag,如果配置的话
  • queue<<event, 将event插入到数据管道中,发送给filter处理

logstash-input-cos的run方法实现为:

def run(queue)
    @current_thread = Thread.current
    Stud.interval(@interval) do
      process(queue)
    end
end

def process(queue)
    @logger.info('Marker from: ' + @markerConfig.getMarker)

    objectListing = @cosclient.listObjects(@listObjectsRequest)
    nextMarker = objectListing.getNextMarker()
    cosObjectSummaries = objectListing.getObjectSummaries()
    cosObjectSummaries.each do |obj|
       # 文件的路径key
       key = obj.getKey()

       if stop?
         @logger.info("stop while attempting to read log file")
         break
       end
       # 根据key获取内容
       getObject(key) { |log|
         # 发送消息
         @codec.decode(log) do |event|
           decorate(event)
           queue << event
         end
       }

       #记录 marker
       @markerConfig.setMarker(key)
       @logger.info('Marker end: ' + @markerConfig.getMarker)
    end
  end

  # 获取下载输入流
 def getObject(key, &block)
    getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
    cosObject = @cosclient.getObject(getObjectRequest)
    cosObjectInput = cosObject.getObjectContent()
    buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
    while (line = buffered.readLine())
      block.call(line)
    end
  end

测试代码

在spec/inputs/cos_spec.rb中增加如下测试代码:

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"

describe LogStash::Inputs::Cos do

  it_behaves_like "an interruptible input plugin" do
    let(:config) { {
        "endpoint" => 'cos.ap-guangzhou.myqcloud.com',
        "access_key_id" => '*',
        "access_key_secret" => '*',
        "bucket" => '*',
         "region" => 'ap-guangzhou',
         "appId" => '*',
        "interval" => 60 } }
  end
end

rspec是一个ruby测试库,通过bundle命令执行rspec:

bundle exec rspec

如果cos.rb中的代码没有语法或运行时错误,则会出现如果信息表明测试成功:

Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures

构建并测试input-plugin-cos

build

使用gem对input-plugin-cos插件源码进行build:

gem build logstash-input-cos.gemspec

构建完成后会生成一个名为logstash-input-cos-0.0.1-java.gem的文件

test

在logstash的解压目录下,执行一下命令安装logstash-input-cos plugin:

./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem

执行结果为:

Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
Installing logstash-input-cos
Installation successful

另外,可以通过./bin/logstash-plugin list命令查看logstash已经安装的所有input/output/filter/codec插件。

生成配置文件cos.logstash.conf,内容为:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

该配置文件使用腾讯云官网账号的secret_id和secret_key进行权限验证,拉取指定bucket里的数据,为了测试,将output设置为标准输出。

执行logstash:

./bin/logstash -f cos.logstash.conf

输出结果为:

Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties
[2018-07-30T19:26:17,039][WARN ][logstash.runner          ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.
[2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}
[2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}
[2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos      ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.
[2018-07-30T19:26:17,341][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos      ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}
[2018-07-30T19:26:17,528][INFO ][logstash.pipeline        ] Pipeline main started
[2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos      ] Marker from:
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2018-07-30T19:26:17,574][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos      ] Marker end: access.log
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.710Z
}
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://localhost:8080/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.711Z
}

在cos中的bucket里上传了名为access.log的nginx日志,上述输出结果中最后打印出来的每个json结构体构成一个event, 其中message消息即为access.log中每一条日志。

收起阅读 »

Day 10 - Elasticsearch 分片恢复并发数过大引发的bug分析

       大家好,今天为大家分享一次 ES 的填坑经验。主要是关于集群恢复过程中,分片恢复并发数调整过大导致集群 hang 死的问题。

场景描述

       废话不多说,先来描述场景。某日,腾讯云线上某 ES 集群,15个节点,2700+ 索引,15000+ 分片,数十 TB 数据。由于机器故障,某个节点被重启,此时集群有大量的 unassigned 分片,集群处于 yellow 状态。为了加快集群恢复的速度,手动调整分片恢复并发数,原本想将默认值为2的 node_concurrent_recoveries 调整为10,结果手一抖多加了一个0,设定了如下参数:

curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
        "cluster.routing.allocation.node_concurrent_recoveries": 100,
        "indices.recovery.max_bytes_per_sec": "40mb"
    }
}
'

       设定之后,观察集群 unassigned 分片,一开始下降的速度很快。大约几分钟后,数量维持在一个固定值不变了,然后,然后就没有然后了,集群所有节点 generic 线程池卡死,虽然已存在的索引读写没问题,但是新建索引以及所有涉及 generic 线程池的操作全部卡住。立马修改分片恢复并发数到10,通过管控平台一把重启了全部节点,约15分钟后集群恢复正常。接下来会先介绍一些基本的概念,然后再重现这个问题并做详细分析。

基本概念

ES 线程池(thread pool)

ES 中每个节点有多种线程池,各有用途。重要的有:

  • generic :通用线程池,后台的 node discovery,上述的分片恢复(node recovery)等等一些通用后台的操作都会用到该线程池。该线程池线程数量默认为配置的处理器数量(processors)* 4,最小128,最大512。
  • index :index/delete 等索引操作会用到该线程池,包括自动创建索引等。默认线程数量为配置的处理器数量,默认队列大小:200.
  • search :查询请求处理线程池。默认线程数量:int((# of available_processors * 3) / 2) + 1,默认队列大小:1000.
  • get :get 请求处理线程池。默认线程数量为配置的处理器数量,默认队列大小:1000.
  • write :单个文档的 index/delete/update 以及 bulk 请求处理线程。默认线程数量为配置的处理器数量,默认队列大小:200,在写多的日志场景我们一般会将队列调大。 还有其它线程池,例如备份回档(snapshot)、analyze、refresh 等,这里就不一一介绍了。详细可参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html

集群恢复之分片恢复

       我们知道 ES 集群状态分为三种,green、yellow、red。green 状态表示所有分片包括主副本均正常被分配;yellow 状态表示所有主分片已分配,但是有部分副本分片未分配;red 表示有部分主分片未分配。 一般当集群中某个节点因故障失联或者重启之后,如果集群索引有副本的场景,集群将进入分片恢复阶段(recovery)。此时一般是 master 节点发起更新集群元数据任务,分片的分配策略由 master 决定,具体分配策略可以参考腾讯云+社区的这篇文章了解细节:https://cloud.tencent.com/developer/article/1334743 。各节点收到集群元数据更新请求,检查分片状态并触发分片恢复流程,根据分片数据所在的位置,有多种恢复的方式,主要有以下几种:

  • EXISTING_STORE : 数据在节点本地存在,从本地节点恢复。
  • PEER :本地数据不可用或不存在,从远端节点(源分片,一般是主分片)恢复。
  • SNAPSHOT : 数据从备份仓库恢复。
  • LOCAL_SHARDS : 分片合并(shrink)场景,从本地别的分片恢复。

PEER 场景分片恢复并发数主要由如下参数控制:

  • cluster.routing.allocation.node_concurrent_incoming_recoveries:节点上最大接受的分片恢复并发数。一般指分片从其它节点恢复至本节点。
  • cluster.routing.allocation.node_concurrent_outgoing_recoveries :节点上最大发送的分片恢复并发数。一般指分片从本节点恢复至其它节点。
  • cluster.routing.allocation.node_concurrent_recoveries :该参数同时设置上述接受发送分片恢复并发数为相同的值。 详细参数可参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/shards-allocation.html

       集群卡住的主要原因就是从远端节点恢复(PEER Recovery)的并发数过多,导致 generic 线程池被用完。涉及目标节点(target)和源节点(source)的恢复交互流程,后面分析问题时我们再来详细讨论。

问题复现与剖析

       为了便于描述,我用 ES 6.4.3版本重新搭建了一个三节点的集群。单节点 1 core,2GB memory。新建了300个 index, 单个 index 5个分片一个副本,共 3000 个 shard。每个 index 插入大约100条数据。 先设定分片恢复并发数,为了夸张一点,我直接调整到200,如下所示:

curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
        "cluster.routing.allocation.node_concurrent_recoveries": 200 // 设定分片恢复并发数
    }
}
'

  接下来停掉某节点,模拟机器挂掉场景。几分钟后,观察集群分片恢复数量,卡在固定数值不再变化:

分片恢复统计信息.png

  通过 allocation explain 查看分片分配状态,未分配的原因是受到最大恢复并发数的限制:

分片恢复限制.png
  观察线程池的数量,generic 线程池打满128.

线程池统计.png

此时查询或写入已有索引不受影响,但是新建索引这种涉及到 generic 线程池的操作都会卡住。 通过堆栈分析,128 个 generic 线程全部卡在 PEER recovery 阶段。

堆栈分析.png
         现象有了,我们来分析一下这种场景,远程分片恢复(PEER Recovery)流程为什么会导致集群卡住。

       当集群中有分片的状态发生变更时,master 节点会发起集群元数据更新(cluster state update)请求给所有节点。其它节点收到该请求后,感知到分片状态的变更,启动分片恢复流程。部分分片需要从其它节点恢复,代码层面,涉及分片分配的目标节点(target)和源节点(source)的交互流程如下:

远端恢复时序分析.png
 

       6.x 版本之后引入了 seqNo,恢复会涉及到 seqNo+translog,这也是6.x提升恢复速度的一大改进。我们重点关注流程中第 2、4、5、7、10、12 步骤中的远程调用,他们的作用分别是:

  • 第2步:分片分配的目标节点向源节点(一般是主分片)发起分片恢复请求,携带起始 seqNo 和 syncId。
  • 第4步:发送数据文件信息,告知目标节点待接收的文件清单。
  • 第5步:发送 diff 数据文件给目标节点。
  • 第7步:源节点发送 prepare translog 请求给目标节点,等目标节点打开 shard level 引擎,准备接受 translog。
  • 第10步:源节点发送指定范围的 translog 快照给目标节点。
  • 第12步:结束恢复流程。

       我们可以看到除第5步发送数据文件外,多次远程交互 submitRequest 都会调用 txGet,这个调用底层用的是基于 AQS 改造过的 sync 对象,是一个同步调用。 如果一端 generic 线程池被这些请求打满,发出的请求等待对端返回,而发出的这些请求由于对端 generic 线程池同样的原因被打满,只能 pending 在队列中,这样两边的线程池都满了而且相互等待对端队列中的线程返回,就出现了分布式死锁现象。

问题处理

       为了避免改动太大带来不确定的 side effect,针对腾讯云 ES 集群我们目前先在 rest 层拒掉了并发数超过一定值的参数设定请求并提醒用户。与此同时,我们向官方提交了 issue:https://github.com/elastic/elasticsearch/issues/36195 进行跟踪。

总结

       本文旨在描述集群恢复过程出现的集群卡死场景,避免更多的 ES 用户踩坑,没有对整体分片恢复做详细的分析,大家想了解详细的分片恢复流程可以参考腾讯云+社区 Elasticsearch 专栏相关的文章:https://cloud.tencent.com/developer/column/2428

完结,谢谢!

继续阅读 »

       大家好,今天为大家分享一次 ES 的填坑经验。主要是关于集群恢复过程中,分片恢复并发数调整过大导致集群 hang 死的问题。

场景描述

       废话不多说,先来描述场景。某日,腾讯云线上某 ES 集群,15个节点,2700+ 索引,15000+ 分片,数十 TB 数据。由于机器故障,某个节点被重启,此时集群有大量的 unassigned 分片,集群处于 yellow 状态。为了加快集群恢复的速度,手动调整分片恢复并发数,原本想将默认值为2的 node_concurrent_recoveries 调整为10,结果手一抖多加了一个0,设定了如下参数:

curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
        "cluster.routing.allocation.node_concurrent_recoveries": 100,
        "indices.recovery.max_bytes_per_sec": "40mb"
    }
}
'

       设定之后,观察集群 unassigned 分片,一开始下降的速度很快。大约几分钟后,数量维持在一个固定值不变了,然后,然后就没有然后了,集群所有节点 generic 线程池卡死,虽然已存在的索引读写没问题,但是新建索引以及所有涉及 generic 线程池的操作全部卡住。立马修改分片恢复并发数到10,通过管控平台一把重启了全部节点,约15分钟后集群恢复正常。接下来会先介绍一些基本的概念,然后再重现这个问题并做详细分析。

基本概念

ES 线程池(thread pool)

ES 中每个节点有多种线程池,各有用途。重要的有:

  • generic :通用线程池,后台的 node discovery,上述的分片恢复(node recovery)等等一些通用后台的操作都会用到该线程池。该线程池线程数量默认为配置的处理器数量(processors)* 4,最小128,最大512。
  • index :index/delete 等索引操作会用到该线程池,包括自动创建索引等。默认线程数量为配置的处理器数量,默认队列大小:200.
  • search :查询请求处理线程池。默认线程数量:int((# of available_processors * 3) / 2) + 1,默认队列大小:1000.
  • get :get 请求处理线程池。默认线程数量为配置的处理器数量,默认队列大小:1000.
  • write :单个文档的 index/delete/update 以及 bulk 请求处理线程。默认线程数量为配置的处理器数量,默认队列大小:200,在写多的日志场景我们一般会将队列调大。 还有其它线程池,例如备份回档(snapshot)、analyze、refresh 等,这里就不一一介绍了。详细可参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html

集群恢复之分片恢复

       我们知道 ES 集群状态分为三种,green、yellow、red。green 状态表示所有分片包括主副本均正常被分配;yellow 状态表示所有主分片已分配,但是有部分副本分片未分配;red 表示有部分主分片未分配。 一般当集群中某个节点因故障失联或者重启之后,如果集群索引有副本的场景,集群将进入分片恢复阶段(recovery)。此时一般是 master 节点发起更新集群元数据任务,分片的分配策略由 master 决定,具体分配策略可以参考腾讯云+社区的这篇文章了解细节:https://cloud.tencent.com/developer/article/1334743 。各节点收到集群元数据更新请求,检查分片状态并触发分片恢复流程,根据分片数据所在的位置,有多种恢复的方式,主要有以下几种:

  • EXISTING_STORE : 数据在节点本地存在,从本地节点恢复。
  • PEER :本地数据不可用或不存在,从远端节点(源分片,一般是主分片)恢复。
  • SNAPSHOT : 数据从备份仓库恢复。
  • LOCAL_SHARDS : 分片合并(shrink)场景,从本地别的分片恢复。

PEER 场景分片恢复并发数主要由如下参数控制:

  • cluster.routing.allocation.node_concurrent_incoming_recoveries:节点上最大接受的分片恢复并发数。一般指分片从其它节点恢复至本节点。
  • cluster.routing.allocation.node_concurrent_outgoing_recoveries :节点上最大发送的分片恢复并发数。一般指分片从本节点恢复至其它节点。
  • cluster.routing.allocation.node_concurrent_recoveries :该参数同时设置上述接受发送分片恢复并发数为相同的值。 详细参数可参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/shards-allocation.html

       集群卡住的主要原因就是从远端节点恢复(PEER Recovery)的并发数过多,导致 generic 线程池被用完。涉及目标节点(target)和源节点(source)的恢复交互流程,后面分析问题时我们再来详细讨论。

问题复现与剖析

       为了便于描述,我用 ES 6.4.3版本重新搭建了一个三节点的集群。单节点 1 core,2GB memory。新建了300个 index, 单个 index 5个分片一个副本,共 3000 个 shard。每个 index 插入大约100条数据。 先设定分片恢复并发数,为了夸张一点,我直接调整到200,如下所示:

curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
        "cluster.routing.allocation.node_concurrent_recoveries": 200 // 设定分片恢复并发数
    }
}
'

  接下来停掉某节点,模拟机器挂掉场景。几分钟后,观察集群分片恢复数量,卡在固定数值不再变化:

分片恢复统计信息.png

  通过 allocation explain 查看分片分配状态,未分配的原因是受到最大恢复并发数的限制:

分片恢复限制.png
  观察线程池的数量,generic 线程池打满128.

线程池统计.png

此时查询或写入已有索引不受影响,但是新建索引这种涉及到 generic 线程池的操作都会卡住。 通过堆栈分析,128 个 generic 线程全部卡在 PEER recovery 阶段。

堆栈分析.png
         现象有了,我们来分析一下这种场景,远程分片恢复(PEER Recovery)流程为什么会导致集群卡住。

       当集群中有分片的状态发生变更时,master 节点会发起集群元数据更新(cluster state update)请求给所有节点。其它节点收到该请求后,感知到分片状态的变更,启动分片恢复流程。部分分片需要从其它节点恢复,代码层面,涉及分片分配的目标节点(target)和源节点(source)的交互流程如下:

远端恢复时序分析.png
 

       6.x 版本之后引入了 seqNo,恢复会涉及到 seqNo+translog,这也是6.x提升恢复速度的一大改进。我们重点关注流程中第 2、4、5、7、10、12 步骤中的远程调用,他们的作用分别是:

  • 第2步:分片分配的目标节点向源节点(一般是主分片)发起分片恢复请求,携带起始 seqNo 和 syncId。
  • 第4步:发送数据文件信息,告知目标节点待接收的文件清单。
  • 第5步:发送 diff 数据文件给目标节点。
  • 第7步:源节点发送 prepare translog 请求给目标节点,等目标节点打开 shard level 引擎,准备接受 translog。
  • 第10步:源节点发送指定范围的 translog 快照给目标节点。
  • 第12步:结束恢复流程。

       我们可以看到除第5步发送数据文件外,多次远程交互 submitRequest 都会调用 txGet,这个调用底层用的是基于 AQS 改造过的 sync 对象,是一个同步调用。 如果一端 generic 线程池被这些请求打满,发出的请求等待对端返回,而发出的这些请求由于对端 generic 线程池同样的原因被打满,只能 pending 在队列中,这样两边的线程池都满了而且相互等待对端队列中的线程返回,就出现了分布式死锁现象。

问题处理

       为了避免改动太大带来不确定的 side effect,针对腾讯云 ES 集群我们目前先在 rest 层拒掉了并发数超过一定值的参数设定请求并提醒用户。与此同时,我们向官方提交了 issue:https://github.com/elastic/elasticsearch/issues/36195 进行跟踪。

总结

       本文旨在描述集群恢复过程出现的集群卡死场景,避免更多的 ES 用户踩坑,没有对整体分片恢复做详细的分析,大家想了解详细的分片恢复流程可以参考腾讯云+社区 Elasticsearch 专栏相关的文章:https://cloud.tencent.com/developer/column/2428

完结,谢谢!

收起阅读 »

社区日报 第474期 (2018-12-10)

1. 如何设置kibana的堆大小避免oom
http://t.cn/Ey1omYc

2. Es 另外一款web管理UI,包含导入,查看,编辑等功能
http://t.cn/Ey1dKAj

3. 使用elasticseach 搜索emoji表情
http://t.cn/Rf5r848

编辑:cyberdak
归档:https://elasticsearch.cn/article/6183
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1. 如何设置kibana的堆大小避免oom
http://t.cn/Ey1omYc

2. Es 另外一款web管理UI,包含导入,查看,编辑等功能
http://t.cn/Ey1dKAj

3. 使用elasticseach 搜索emoji表情
http://t.cn/Rf5r848

编辑:cyberdak
归档:https://elasticsearch.cn/article/6183
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

Day 9 - 动手实现一个自定义beat

参考

https://elasticsearch.cn/article/113

https://www.elastic.co/blog/build-your-own-beat

介绍

公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。 本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。

环境配置

go version go1.9.4 linux/amd64

python version: 2.7.9

不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9

下面的工具会提供python本身需要依赖的一些native包

yum install openssl -y
yum install openssl-devel -y
yum install zlib-devel -y
yum install zlib -y

安装python

wget https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz
tar -zxvf Python-2.7.9.tgz
cd ~/python/Python-2.7.9
./configure --prefix=/usr/local/python-2.7.9
make
make install

rm -f /bin/python
ln -s /usr/local/python-2.7.9/bin/python /bin/python

安装工具包 distribute, setuptools, pip

cd ~/python/setuptools-19.6 && python setup.py install
cd ~/python/pip-1.5.4  && python setup.py install
cd ~/python/distribute-0.7.3  && python setup.py install

安装cookiecutter

pip install --user cookiecutter

安装cookiecutter所依赖的工具

pip install backports.functools-lru-cache
pip install six
pip install virtualenv

*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到

代码实现

需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping

使用官方提供的beat模板创建自己的beat

  • 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go
$ go get github.com/elastic/beats
$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1

[root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat
project_name [Examplebeat]: hdfsauditbeat
github_name [your-github-name]: suxingfate
beat [hdfsauditbeat]:
beat_path [github.com/suxingfate]:
full_name [Firstname Lastname]: xinglong

make setup

到这里,模板就生成了,然后就是定制需要的东西。

  • 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
  • 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
  • 3 beater/hdfsauditbeat.go # 核心逻辑代码
  • 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping

1 _meta/beat.yml

这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑

[root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "."

2 config/config.go

这里把path定义到struct里面,后面核心代码就可以从config对象获得path了

[root@minikube-2830379 hdfsauditbeat]# cat config/config.go
// Config is put into a different package to prevent cyclic imports in case
// it is needed in several locations

package config

import "time"

type Config struct {
    Period time.Duration `config:"period"`
        Path   string        `config:"path"`
}

var DefaultConfig = Config{
    Period: 1 * time.Second,
        Path:   ".",
}

3 beater/hdfsauditbeat.go

这里需要改动的地方是: 3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去 3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入

[root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go
package beater

import (
    "fmt"
    "time"
        "os"
        "io"
        "bufio"
        "strings"
    "github.com/elastic/beats/libbeat/beat"
    "github.com/elastic/beats/libbeat/common"
    "github.com/elastic/beats/libbeat/logp"
    "github.com/elastic/beats/libbeat/publisher"

    "github.com/suxingfate/hdfsauditbeat/config"
)

type Hdfsauditbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
}

// Creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }

    bt := &Hdfsauditbeat{
        done: make(chan struct{}),
        config: config,
    }
    return bt, nil
}

func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {
    logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")

    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    counter := 1
    for {
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }

        bt.catAudit(bt.config.Path)

        logp.Info("Event sent")
        counter++
    }
}

func (bt *Hdfsauditbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

func (bt *Hdfsauditbeat) catAudit(auditFile string) {
    file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)
    if err != nil {
        //fmt.Println("Open file error!", err)
        return
    }
    defer file.Close()

    buf := bufio.NewReader(file)
    for {
        line, err := buf.ReadString('\n')
        line = strings.TrimSpace(line)
        if line == "" {
            return
        }

    timeEnd := strings.Index(line, ",")
        timeString := line[0 :timeEnd]
        tm, _ := time.Parse("2006-01-02 03:04:05", timeString)

        ugiStart := strings.Index(line, "ugi=") + 4
        ugiEnd := strings.Index(line, " (auth")
        ugi := line[ugiStart :ugiEnd]

        cmdStart := strings.Index(line, "cmd=") + 4
        line = line[cmdStart:len(line)]
        cmdEnd := strings.Index(line, " ")
        cmd := line[0 : cmdEnd]

        srcStart := strings.Index(line, "src=") + 4
        line = line[srcStart:len(line)]
        srcEnd := strings.Index(line, " ")
        src := line[0:srcEnd]

        dstStart := strings.Index(line, "dst=") + 4
        line = line[dstStart:len(line)]
        dstEnd := strings.Index(line, " ")
        dst := line[0:dstEnd]

        event := common.MapStr{
                "@timestamp": common.Time(time.Unix(tm.Unix(), 0)),
                "ugi":       ugi,
                "cmd":       cmd,
                "src":    src,
                "dst":   dst,
            }
            bt.client.PublishEvent(event)

        if err != nil {
            if err == io.EOF {
                //fmt.Println("File read ok!")
                break
            } else {
                //fmt.Println("Read file error!", err)
                return
            }
        }
    }
}

4 _meat/fields.yml

[root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml
- key: hdfsauditbeat
  title: hdfsauditbeat
  description:
  fields:
    - name: counter
      type: long
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added hdfsaudit
    - name: entrytime
      type: date
    - name: ugi
      type: keyword
    - name: cmd
      type: keyword
    - name: src
      type: keyword
    - name: dst
      type: keyword

测试

首先编译好项目

make update
make

然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。 修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console

[root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "/root/go/hdfs-audit.log"

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
#  hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

output.console:
  pretty: true
#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

开始执行

[root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}

结束

使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。

继续阅读 »

参考

https://elasticsearch.cn/article/113

https://www.elastic.co/blog/build-your-own-beat

介绍

公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。 本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。

环境配置

go version go1.9.4 linux/amd64

python version: 2.7.9

不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9

下面的工具会提供python本身需要依赖的一些native包

yum install openssl -y
yum install openssl-devel -y
yum install zlib-devel -y
yum install zlib -y

安装python

wget https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz
tar -zxvf Python-2.7.9.tgz
cd ~/python/Python-2.7.9
./configure --prefix=/usr/local/python-2.7.9
make
make install

rm -f /bin/python
ln -s /usr/local/python-2.7.9/bin/python /bin/python

安装工具包 distribute, setuptools, pip

cd ~/python/setuptools-19.6 && python setup.py install
cd ~/python/pip-1.5.4  && python setup.py install
cd ~/python/distribute-0.7.3  && python setup.py install

安装cookiecutter

pip install --user cookiecutter

安装cookiecutter所依赖的工具

pip install backports.functools-lru-cache
pip install six
pip install virtualenv

*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到

代码实现

需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping

使用官方提供的beat模板创建自己的beat

  • 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go
$ go get github.com/elastic/beats
$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1

[root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat
project_name [Examplebeat]: hdfsauditbeat
github_name [your-github-name]: suxingfate
beat [hdfsauditbeat]:
beat_path [github.com/suxingfate]:
full_name [Firstname Lastname]: xinglong

make setup

到这里,模板就生成了,然后就是定制需要的东西。

  • 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
  • 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
  • 3 beater/hdfsauditbeat.go # 核心逻辑代码
  • 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping

1 _meta/beat.yml

这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑

[root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "."

2 config/config.go

这里把path定义到struct里面,后面核心代码就可以从config对象获得path了

[root@minikube-2830379 hdfsauditbeat]# cat config/config.go
// Config is put into a different package to prevent cyclic imports in case
// it is needed in several locations

package config

import "time"

type Config struct {
    Period time.Duration `config:"period"`
        Path   string        `config:"path"`
}

var DefaultConfig = Config{
    Period: 1 * time.Second,
        Path:   ".",
}

3 beater/hdfsauditbeat.go

这里需要改动的地方是: 3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去 3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入

[root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go
package beater

import (
    "fmt"
    "time"
        "os"
        "io"
        "bufio"
        "strings"
    "github.com/elastic/beats/libbeat/beat"
    "github.com/elastic/beats/libbeat/common"
    "github.com/elastic/beats/libbeat/logp"
    "github.com/elastic/beats/libbeat/publisher"

    "github.com/suxingfate/hdfsauditbeat/config"
)

type Hdfsauditbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
}

// Creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }

    bt := &Hdfsauditbeat{
        done: make(chan struct{}),
        config: config,
    }
    return bt, nil
}

func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {
    logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")

    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    counter := 1
    for {
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }

        bt.catAudit(bt.config.Path)

        logp.Info("Event sent")
        counter++
    }
}

func (bt *Hdfsauditbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

func (bt *Hdfsauditbeat) catAudit(auditFile string) {
    file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)
    if err != nil {
        //fmt.Println("Open file error!", err)
        return
    }
    defer file.Close()

    buf := bufio.NewReader(file)
    for {
        line, err := buf.ReadString('\n')
        line = strings.TrimSpace(line)
        if line == "" {
            return
        }

    timeEnd := strings.Index(line, ",")
        timeString := line[0 :timeEnd]
        tm, _ := time.Parse("2006-01-02 03:04:05", timeString)

        ugiStart := strings.Index(line, "ugi=") + 4
        ugiEnd := strings.Index(line, " (auth")
        ugi := line[ugiStart :ugiEnd]

        cmdStart := strings.Index(line, "cmd=") + 4
        line = line[cmdStart:len(line)]
        cmdEnd := strings.Index(line, " ")
        cmd := line[0 : cmdEnd]

        srcStart := strings.Index(line, "src=") + 4
        line = line[srcStart:len(line)]
        srcEnd := strings.Index(line, " ")
        src := line[0:srcEnd]

        dstStart := strings.Index(line, "dst=") + 4
        line = line[dstStart:len(line)]
        dstEnd := strings.Index(line, " ")
        dst := line[0:dstEnd]

        event := common.MapStr{
                "@timestamp": common.Time(time.Unix(tm.Unix(), 0)),
                "ugi":       ugi,
                "cmd":       cmd,
                "src":    src,
                "dst":   dst,
            }
            bt.client.PublishEvent(event)

        if err != nil {
            if err == io.EOF {
                //fmt.Println("File read ok!")
                break
            } else {
                //fmt.Println("Read file error!", err)
                return
            }
        }
    }
}

4 _meat/fields.yml

[root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml
- key: hdfsauditbeat
  title: hdfsauditbeat
  description:
  fields:
    - name: counter
      type: long
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added hdfsaudit
    - name: entrytime
      type: date
    - name: ugi
      type: keyword
    - name: cmd
      type: keyword
    - name: src
      type: keyword
    - name: dst
      type: keyword

测试

首先编译好项目

make update
make

然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。 修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console

[root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "/root/go/hdfs-audit.log"

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
#  hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

output.console:
  pretty: true
#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

开始执行

[root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}

结束

使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。

收起阅读 »

社区日报 第473期 (2018-12-09)

1.ElasticSearch连接:Has_Child,Has_parent查询。
http://t.cn/EyEJZGO
2.(自备梯子)将full-scale ELK栈部署到Kubernetes。
http://t.cn/EyEiOtk
3.(自备梯子)Facebook建立在不平等的基础之上。
http://t.cn/EyE6quM

编辑:至尊宝
归档:https://elasticsearch.cn/article/6181
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.ElasticSearch连接:Has_Child,Has_parent查询。
http://t.cn/EyEJZGO
2.(自备梯子)将full-scale ELK栈部署到Kubernetes。
http://t.cn/EyEiOtk
3.(自备梯子)Facebook建立在不平等的基础之上。
http://t.cn/EyE6quM

编辑:至尊宝
归档:https://elasticsearch.cn/article/6181
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第472期 (2018-12-08)

  1. 用于ES数据警报的实时守护进程。 http://t.cn/EyQmiuE

  2. 最小的ibana Docker镜像。 http://t.cn/EyQbwGk

  3. Lucene列式存储格式DocValues详解。 http://t.cn/EyQ6pkK
继续阅读 »
  1. 用于ES数据警报的实时守护进程。 http://t.cn/EyQmiuE

  2. 最小的ibana Docker镜像。 http://t.cn/EyQbwGk

  3. Lucene列式存储格式DocValues详解。 http://t.cn/EyQ6pkK
收起阅读 »

Day 8 - 如何使用Spark快速将数据写入Elasticsearch

如何使用Spark快速将数据写入Elasticsearch

说到数据写入Elasticsearch,最先想到的肯定是Logstash。Logstash因为其简单上手、可扩展、可伸缩等优点被广大用户接受。但是尺有所短,寸有所长,Logstash肯定也有它无法适用的应用场景,比如:

  • 海量数据ETL
  • 海量数据聚合
  • 多源数据处理

为了满足这些场景,很多同学都会选择Spark,借助Spark算子进行数据处理,最后将处理结果写入Elasticsearch。

我们部门之前利用Spark对Nginx日志进行分析,统计我们的Web服务访问情况,将Nginx日志每分钟聚合一次最后将结果写入Elasticsearch,然后利用Kibana配置实时监控Dashboard。Elasticsearch和Kibana都很方便、实用,但是随着类似需求越来越多,如何快速通过Spark将数据写入Elasticsearch成为了我们的一大问题。

今天给大家推荐一款能够实现数据快速写入的黑科技——Waterdrop,一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上,简单易用,灵活配置,无需开发。

wd.png

Kafka to Elasticsearch

和Logstash一样,Waterdrop同样支持多种类型的数据输入,这里我们以最常见的Kakfa作为输入源为例,讲解如何使用Waterdrop将数据快速写入Elasticsearch

Log Sample

原始日志格式如下:

127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"

Elasticsearch Document

我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:

domain String
hostname String
status int
datetime String
count int

Waterdrop with Elasticsearch

接下来会给大家详细介绍,我们如何通过Waterdrop读取Kafka中的数据,对数据进行解析以及聚合,最后将处理结果写入Elasticsearch中。

Waterdrop

Waterdrop同样拥有着非常丰富的插件,支持从Kafka、HDFS、Hive中读取数据,进行各种各样的数据处理,并将结果写入Elasticsearch、Kudu或者Kafka中。

Prerequisites

首先我们需要安装Waterdrop,安装十分简单,无需配置系统环境变量

  1. 准备Spark环境
  2. 安装Waterdrop
  3. 配置Waterdrop

以下是简易步骤,具体安装可以参照Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip
unzip waterdrop-1.1.1.zip
cd waterdrop-1.1.1

vim config/waterdrop-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Waterdrop Pipeline

与Logstash一样,我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入,相信了解Logstash的朋友可以很快入手Waterdrop配置。

配置文件包括四个部分,分别是Spark、Input、filter和Output。

Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

Input

这一部分定义数据源,如下是从Kafka中读取数据的配置案例,

kafkaStream {
    topics = "waterdrop-es"
    consumer.bootstrap.servers = "localhost:9092"
    consumer.group.id = "waterdrop_es_group"
    consumer.rebalance.max.retries = 100
}

Filter

在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为Elasticsearch支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行数据聚合

filter {
    # 使用正则解析原始日志
    # 最开始数据都在raw_message字段中
    grok {
        source_field = "raw_message"
        pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
    # Elasticsearch中支持的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
    }
    ## 利用SQL对数据进行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
    }
 }

Output

最后我们将处理好的结构化数据写入Elasticsearch。

output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "waterdrop-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}

Running Waterdrop

我们将上述四部分配置组合成为我们的配置文件config/batch.conf

vim config/batch.conf
spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}
input {
    kafkaStream {
        topics = "waterdrop-es"
        consumer.bootstrap.servers = "localhost:9092"
        consumer.group.id = "waterdrop_es_group"
        consumer.rebalance.max.retries = 100
    }
}
filter {
    # 使用正则解析原始日志
    # 最开始数据都在raw_message字段中
    grok {
        source_field = "raw_message"
        pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
    # Elasticsearch中支持的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
    }
    ## 利用SQL对数据进行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, localhost, status, datetime"
    }
 }
output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "waterdrop-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}

执行命令,指定配置文件,运行Waterdrop,即可将数据写入Elasticsearch。这里我们以本地模式为例。

./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

最后,写入Elasticsearch中的数据如下,再配上Kibana就可以实现Web服务的实时监控了^_^.

"_source": {
    "domain": "elasticsearch.cn",
    "hostname": "localhost",
    "status": "200",
    "datetime": "2018-11-26T21:54:00.000+08:00",
    "count": 26
  }

Conclusion

在这篇文章中,我们介绍了如何通过Waterdrop将Kafka中的数据写入Elasticsearch中。仅仅通过一个配置文件便可快速运行一个Spark Application,完成数据的处理、写入,无需编写任何代码,十分简单。

当数据处理过程中有遇到Logstash无法支持的场景或者Logstah性能无法达到预期的情况下,都可以尝试使用Waterdrop解决问题。

希望了解Waterdrop与Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入项目主页https://github.com/InterestingLab/waterdrop

我们近期会再发布一篇《如何用Spark和Elasticsearch做交互式数据分析》,敬请期待.

Contract us

欢迎联系我们交流Spark和Elasticsearch:

Garyelephant: 微信: garyelephant

RickyHuo: 微信: chodomatte1994

继续阅读 »

如何使用Spark快速将数据写入Elasticsearch

说到数据写入Elasticsearch,最先想到的肯定是Logstash。Logstash因为其简单上手、可扩展、可伸缩等优点被广大用户接受。但是尺有所短,寸有所长,Logstash肯定也有它无法适用的应用场景,比如:

  • 海量数据ETL
  • 海量数据聚合
  • 多源数据处理

为了满足这些场景,很多同学都会选择Spark,借助Spark算子进行数据处理,最后将处理结果写入Elasticsearch。

我们部门之前利用Spark对Nginx日志进行分析,统计我们的Web服务访问情况,将Nginx日志每分钟聚合一次最后将结果写入Elasticsearch,然后利用Kibana配置实时监控Dashboard。Elasticsearch和Kibana都很方便、实用,但是随着类似需求越来越多,如何快速通过Spark将数据写入Elasticsearch成为了我们的一大问题。

今天给大家推荐一款能够实现数据快速写入的黑科技——Waterdrop,一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上,简单易用,灵活配置,无需开发。

wd.png

Kafka to Elasticsearch

和Logstash一样,Waterdrop同样支持多种类型的数据输入,这里我们以最常见的Kakfa作为输入源为例,讲解如何使用Waterdrop将数据快速写入Elasticsearch

Log Sample

原始日志格式如下:

127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"

Elasticsearch Document

我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:

domain String
hostname String
status int
datetime String
count int

Waterdrop with Elasticsearch

接下来会给大家详细介绍,我们如何通过Waterdrop读取Kafka中的数据,对数据进行解析以及聚合,最后将处理结果写入Elasticsearch中。

Waterdrop

Waterdrop同样拥有着非常丰富的插件,支持从Kafka、HDFS、Hive中读取数据,进行各种各样的数据处理,并将结果写入Elasticsearch、Kudu或者Kafka中。

Prerequisites

首先我们需要安装Waterdrop,安装十分简单,无需配置系统环境变量

  1. 准备Spark环境
  2. 安装Waterdrop
  3. 配置Waterdrop

以下是简易步骤,具体安装可以参照Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip
unzip waterdrop-1.1.1.zip
cd waterdrop-1.1.1

vim config/waterdrop-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Waterdrop Pipeline

与Logstash一样,我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入,相信了解Logstash的朋友可以很快入手Waterdrop配置。

配置文件包括四个部分,分别是Spark、Input、filter和Output。

Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

Input

这一部分定义数据源,如下是从Kafka中读取数据的配置案例,

kafkaStream {
    topics = "waterdrop-es"
    consumer.bootstrap.servers = "localhost:9092"
    consumer.group.id = "waterdrop_es_group"
    consumer.rebalance.max.retries = 100
}

Filter

在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为Elasticsearch支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行数据聚合

filter {
    # 使用正则解析原始日志
    # 最开始数据都在raw_message字段中
    grok {
        source_field = "raw_message"
        pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
    # Elasticsearch中支持的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
    }
    ## 利用SQL对数据进行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
    }
 }

Output

最后我们将处理好的结构化数据写入Elasticsearch。

output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "waterdrop-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}

Running Waterdrop

我们将上述四部分配置组合成为我们的配置文件config/batch.conf

vim config/batch.conf
spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}
input {
    kafkaStream {
        topics = "waterdrop-es"
        consumer.bootstrap.servers = "localhost:9092"
        consumer.group.id = "waterdrop_es_group"
        consumer.rebalance.max.retries = 100
    }
}
filter {
    # 使用正则解析原始日志
    # 最开始数据都在raw_message字段中
    grok {
        source_field = "raw_message"
        pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
    # Elasticsearch中支持的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
    }
    ## 利用SQL对数据进行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, localhost, status, datetime"
    }
 }
output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "waterdrop-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}

执行命令,指定配置文件,运行Waterdrop,即可将数据写入Elasticsearch。这里我们以本地模式为例。

./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

最后,写入Elasticsearch中的数据如下,再配上Kibana就可以实现Web服务的实时监控了^_^.

"_source": {
    "domain": "elasticsearch.cn",
    "hostname": "localhost",
    "status": "200",
    "datetime": "2018-11-26T21:54:00.000+08:00",
    "count": 26
  }

Conclusion

在这篇文章中,我们介绍了如何通过Waterdrop将Kafka中的数据写入Elasticsearch中。仅仅通过一个配置文件便可快速运行一个Spark Application,完成数据的处理、写入,无需编写任何代码,十分简单。

当数据处理过程中有遇到Logstash无法支持的场景或者Logstah性能无法达到预期的情况下,都可以尝试使用Waterdrop解决问题。

希望了解Waterdrop与Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入项目主页https://github.com/InterestingLab/waterdrop

我们近期会再发布一篇《如何用Spark和Elasticsearch做交互式数据分析》,敬请期待.

Contract us

欢迎联系我们交流Spark和Elasticsearch:

Garyelephant: 微信: garyelephant

RickyHuo: 微信: chodomatte1994

收起阅读 »

Day 7 - Elasticsearch中数据是如何存储的

前言

很多使用Elasticsearch的同学会关心数据存储在ES中的存储容量,会有这样的疑问:xxTB的数据入到ES会使用多少存储空间。这个问题其实很难直接回答的,只有数据写入ES后,才能观察到实际的存储空间。比如同样是1TB的数据,写入ES的存储空间可能差距会非常大,可能小到只有300~400GB,也可能多到6-7TB,为什么会造成这么大的差距呢?究其原因,我们来探究下Elasticsearch中的数据是如何存储。文章中我以Elasticsearch 2.3版本为示例,对应的lucene版本是5.5,Elasticsearch现在已经来到了6.5版本,数字类型、列存等存储结构有些变化,但基本的概念变化不多,文章中的内容依然适用。

Elasticsearch索引结构

Elasticsearch对外提供的是index的概念,可以类比为DB,用户查询是在index上完成的,每个index由若干个shard组成,以此来达到分布式可扩展的能力。比如下图是一个由10个shard组成的index。

elasticsearch_store_arc.png

shard是Elasticsearch数据存储的最小单位,index的存储容量为所有shard的存储容量之和。Elasticsearch集群的存储容量则为所有index存储容量之和。

一个shard就对应了一个lucene的library。对于一个shard,Elasticsearch增加了translog的功能,类似于HBase WAL,是数据写入过程中的中间数据,其余的数据都在lucene库中管理的。

所以Elasticsearch索引使用的存储内容主要取决于lucene中的数据存储。

lucene数据存储

下面我们主要看下lucene的文件内容,在了解lucene文件内容前,大家先了解些lucene的基本概念。

lucene基本概念

  • segment : lucene内部的数据是由一个个segment组成的,写入lucene的数据并不直接落盘,而是先写在内存中,经过了refresh间隔,lucene才将该时间段写入的全部数据refresh成一个segment,segment多了之后会进行merge成更大的segment。lucene查询时会遍历每个segment完成。由于lucene* 写入的数据是在内存中完成,所以写入效率非常高。但是也存在丢失数据的风险,所以Elasticsearch基于此现象实现了translog,只有在segment数据落盘后,Elasticsearch才会删除对应的translog。
  • doc : doc表示lucene中的一条记录
  • field :field表示记录中的字段概念,一个doc由若干个field组成。
  • term :term是lucene中索引的最小单位,某个field对应的内容如果是全文检索类型,会将内容进行分词,分词的结果就是由term组成的。如果是不分词的字段,那么该字段的内容就是一个term。
  • 倒排索引(inverted index): lucene索引的通用叫法,即实现了term到doc list的映射。
  • 正排数据:搜索引擎的通用叫法,即原始数据,可以理解为一个doc list。
  • docvalues :Elasticsearch中的列式存储的名称,Elasticsearch除了存储原始存储、倒排索引,还存储了一份docvalues,用作分析和排序。

lucene文件内容

lucene包的文件是由很多segment文件组成的,segments_xxx文件记录了lucene包下面的segment文件数量。每个segment会包含如下的文件。

Name Extension Brief Description
Segment Info .si segment的元数据文件
Compound File .cfs, .cfe 一个segment包含了如下表的各个文件,为减少打开文件的数量,在segment小的时候,segment的所有文件内容都保存在cfs文件中,cfe文件保存了lucene各文件在cfs文件的位置信息
Fields .fnm 保存了fields的相关信息
Field Index .fdx 正排存储文件的元数据信息
Field Data .fdt 存储了正排存储数据,写入的原文存储在这
Term Dictionary .tim 倒排索引的元数据信息
Term Index .tip 倒排索引文件,存储了所有的倒排索引数据
Frequencies .doc 保存了每个term的doc id列表和term在doc中的词频
Positions .pos Stores position information about where a term occurs in the index
全文索引的字段,会有该文件,保存了term在doc中的位置
Payloads .pay Stores additional per-position metadata information such as character offsets and user payloads
全文索引的字段,使用了一些像payloads的高级特性会有该文件,保存了term在doc中的一些高级特性
Norms .nvd, .nvm 文件保存索引字段加权数据
Per-Document Values .dvd, .dvm lucene的docvalues文件,即数据的列式存储,用作聚合和排序
Term Vector Data .tvx, .tvd, .tvf Stores offset into the document data file
保存索引字段的矢量信息,用在对term进行高亮,计算文本相关性中使用
Live Documents .liv 记录了segment中删除的doc

测试数据示例

下面我们以真实的数据作为示例,看看lucene中各类型数据的容量占比。

写100w数据,有一个uuid字段,写入的是长度为36位的uuid,字符串总为3600w字节,约为35M。

数据使用一个shard,不带副本,使用默认的压缩算法,写入完成后merge成一个segment方便观察。

使用线上默认的配置,uuid存为不分词的字符串类型。创建如下索引:

PUT test_field
{
  "settings": {
    "index": {
      "number_of_shards": "1",
      "number_of_replicas": "0",
      "refresh_interval": "30s"
    }
  },
  "mappings": {
    "type": {
      "_all": {
        "enabled": false
      }, 
      "properties": {
        "uuid": {
          "type": "string",
          "index": "not_analyzed"
        }
      }
    }
  }
}

首先写入100w不同的uuid,使用磁盘容量细节如下:


health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    122.7mb        122.7mb 

-rw-r--r--  1 weizijun  staff    41M Aug 19 21:23 _8.fdt
-rw-r--r--  1 weizijun  staff    17K Aug 19 21:23 _8.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 19 21:23 _8.fnm
-rw-r--r--  1 weizijun  staff   494B Aug 19 21:23 _8.si
-rw-r--r--  1 weizijun  staff   265K Aug 19 21:23 _8_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff    44M Aug 19 21:23 _8_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   340K Aug 19 21:23 _8_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 19 21:23 _8_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 19 21:23 _8_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   195B Aug 19 21:23 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 19 21:20 write.lock

可以看到正排数据、倒排索引数据,列存数据容量占比几乎相同,正排数据和倒排数据还会存储Elasticsearch的唯一id字段,所以容量会比列存多一些。

35M的uuid存入Elasticsearch后,数据膨胀了3倍,达到了122.7mb。Elasticsearch竟然这么消耗资源,不要着急下结论,接下来看另一个测试结果。

我们写入100w一样的uuid,然后看看Elasticsearch使用的容量。

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0     13.2mb         13.2mb 

-rw-r--r--  1 weizijun  staff   5.5M Aug 19 21:29 _6.fdt
-rw-r--r--  1 weizijun  staff    15K Aug 19 21:29 _6.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 19 21:29 _6.fnm
-rw-r--r--  1 weizijun  staff   494B Aug 19 21:29 _6.si
-rw-r--r--  1 weizijun  staff   309K Aug 19 21:29 _6_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   7.0M Aug 19 21:29 _6_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   195K Aug 19 21:29 _6_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff   244K Aug 19 21:29 _6_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   252B Aug 19 21:29 _6_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   195B Aug 19 21:29 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 19 21:26 write.lock

这回35M的数据Elasticsearch容量只有13.2mb,其中还有主要的占比还是Elasticsearch的唯一id,100w的uuid几乎不占存储容积。

所以在Elasticsearch中建立索引的字段如果基数越大(count distinct),越占用磁盘空间。

我们再看看存100w个不一样的整型会是如何。

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0     13.6mb         13.6mb 

-rw-r--r--  1 weizijun  staff   6.1M Aug 28 10:19 _42.fdt
-rw-r--r--  1 weizijun  staff    22K Aug 28 10:19 _42.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 28 10:19 _42.fnm
-rw-r--r--  1 weizijun  staff   503B Aug 28 10:19 _42.si
-rw-r--r--  1 weizijun  staff   2.8M Aug 28 10:19 _42_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   2.2M Aug 28 10:19 _42_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff    83K Aug 28 10:19 _42_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff   2.5M Aug 28 10:19 _42_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   228B Aug 28 10:19 _42_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   196B Aug 28 10:19 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 28 10:16 write.lock

从结果可以看到,100w整型数据,Elasticsearch的存储开销为13.6mb。如果以int型计算100w数据的长度的话,为400w字节,大概是3.8mb数据。忽略Elasticsearch唯一id字段的影响,Elasticsearch实际存储容量跟整型数据长度差不多。

我们再看一下开启最佳压缩参数对存储空间的影响:

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    107.2mb        107.2mb 

-rw-r--r--  1 weizijun  staff    25M Aug 20 12:30 _5.fdt
-rw-r--r--  1 weizijun  staff   6.0K Aug 20 12:30 _5.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 20 12:31 _5.fnm
-rw-r--r--  1 weizijun  staff   500B Aug 20 12:31 _5.si
-rw-r--r--  1 weizijun  staff   265K Aug 20 12:31 _5_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff    44M Aug 20 12:31 _5_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   322K Aug 20 12:31 _5_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 20 12:31 _5_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 20 12:31 _5_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   224B Aug 20 12:31 segments_4
-rw-r--r--  1 weizijun  staff     0B Aug 20 12:00 write.lock

结果中可以发现,只有正排数据会启动压缩,压缩能力确实强劲,不考虑唯一id字段,存储容量大概压缩到接近50%。

我们还做了一些实验,Elasticsearch默认是开启_all参数的,_all可以让用户传入的整体json数据作为全文检索的字段,可以更方便的检索,但在现实场景中已经使用的不多,相反会增加很多存储容量的开销,可以看下开启_all的磁盘空间使用情况:


health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    162.4mb        162.4mb 

-rw-r--r--  1 weizijun  staff    41M Aug 18 22:59 _20.fdt
-rw-r--r--  1 weizijun  staff    18K Aug 18 22:59 _20.fdx
-rw-r--r--  1 weizijun  staff   777B Aug 18 22:59 _20.fnm
-rw-r--r--  1 weizijun  staff    59B Aug 18 22:59 _20.nvd
-rw-r--r--  1 weizijun  staff    78B Aug 18 22:59 _20.nvm
-rw-r--r--  1 weizijun  staff   539B Aug 18 22:59 _20.si
-rw-r--r--  1 weizijun  staff   7.2M Aug 18 22:59 _20_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   4.2M Aug 18 22:59 _20_Lucene50_0.pos
-rw-r--r--  1 weizijun  staff    73M Aug 18 22:59 _20_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   832K Aug 18 22:59 _20_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 18 22:59 _20_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 18 22:59 _20_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   196B Aug 18 22:59 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 18 22:53 write.lock

开启_all比不开启多了40mb的存储空间,多的数据都在倒排索引上,大约会增加30%多的存储开销。所以线上都直接禁用。

然后我还做了其他几个尝试,为了验证存储容量是否和数据量成正比,写入1000w数据的uuid,发现存储容量基本为100w数据的10倍。我还验证了数据长度是否和数据量成正比,发现把uuid增长2倍、4倍,存储容量也响应的增加了2倍和4倍。在此就不一一列出数据了。

lucene各文件具体内容和实现

lucene数据元信息文件

文件名为:segments_xxx

该文件为lucene数据文件的元信息文件,记录所有segment的元数据信息。

该文件主要记录了目前有多少segment,每个segment有一些基本信息,更新这些信息定位到每个segment的元信息文件。

lucene元信息文件还支持记录userData,Elasticsearch可以在此记录translog的一些相关信息。

文件示例

elasticsearch_store_segments.png

具体实现类

public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo> {
  // generation是segment的版本的概念,从文件名中提取出来,实例中为:2t/101
  private long generation;     // generation of the "segments_N" for the next commit

  private long lastGeneration; // generation of the "segments_N" file we last successfully read
                               // or wrote; this is normally the same as generation except if
                               // there was an IOException that had interrupted a commit

  /** Id for this commit; only written starting with Lucene 5.0 */
  private byte[] id;

  /** Which Lucene version wrote this commit, or null if this commit is pre-5.3. */
  private Version luceneVersion;

  /** Counts how often the index has been changed.  */
  public long version;

  /** Used to name new segments. */
  // TODO: should this be a long ...?
  public int counter;

  /** Version of the oldest segment in the index, or null if there are no segments. */
  private Version minSegmentLuceneVersion;

  private List<SegmentCommitInfo> segments = new ArrayList<>();

  /** Opaque Map&lt;String, String&gt; that user can specify during IndexWriter.commit */
  public Map<String,String> userData = Collections.emptyMap();
}

/** Embeds a [read-only] SegmentInfo and adds per-commit
 *  fields.
 *
 *  @lucene.experimental */
public class SegmentCommitInfo {

  /** The {@link SegmentInfo} that we wrap. */
  public final SegmentInfo info;

  // How many deleted docs in the segment:
  private int delCount;

  // Generation number of the live docs file (-1 if there
  // are no deletes yet):
  private long delGen;

  // Normally 1+delGen, unless an exception was hit on last
  // attempt to write:
  private long nextWriteDelGen;

  // Generation number of the FieldInfos (-1 if there are no updates)
  private long fieldInfosGen;

  // Normally 1+fieldInfosGen, unless an exception was hit on last attempt to
  // write
  private long nextWriteFieldInfosGen; //fieldInfosGen == -1 ? 1 : fieldInfosGen + 1;

  // Generation number of the DocValues (-1 if there are no updates)
  private long docValuesGen;

  // Normally 1+dvGen, unless an exception was hit on last attempt to
  // write
  private long nextWriteDocValuesGen; //docValuesGen == -1 ? 1 : docValuesGen + 1;

  // TODO should we add .files() to FieldInfosFormat, like we have on
  // LiveDocsFormat?
  // track the fieldInfos update files
  private final Set<String> fieldInfosFiles = new HashSet<>();

  // Track the per-field DocValues update files
  private final Map<Integer,Set<String>> dvUpdatesFiles = new HashMap<>();

  // Track the per-generation updates files
  @Deprecated
  private final Map<Long,Set<String>> genUpdatesFiles = new HashMap<>();

  private volatile long sizeInBytes = -1;
}

segment的元信息文件

文件后缀:.si

每个segment都有一个.si文件,记录了该segment的元信息。

segment元信息文件中记录了segment的文档数量,segment对应的文件列表等信息。

文件示例

elasticsearch_store_si.png

具体实现类

/**
 * Information about a segment such as its name, directory, and files related
 * to the segment.
 *
 * @lucene.experimental
 */
public final class SegmentInfo {

  // _bl
  public final String name;

  /** Where this segment resides. */
  public final Directory dir;

  /** Id that uniquely identifies this segment. */
  private final byte[] id;

  private Codec codec;

  // Tracks the Lucene version this segment was created with, since 3.1. Null
  // indicates an older than 3.0 index, and it's used to detect a too old index.
  // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
  // specific versions afterwards ("3.0.0", "3.1.0" etc.).
  // see o.a.l.util.Version.
  private Version version;

  private int maxDoc;         // number of docs in seg

  private boolean isCompoundFile;

  private Map<String,String> diagnostics;

  private Set<String> setFiles;

  private final Map<String,String> attributes;
}

fields信息文件

文件后缀:.fnm

该文件存储了fields的基本信息。

fields信息中包括field的数量,field的类型,以及IndexOpetions,包括是否存储、是否索引,是否分词,是否需要列存等等。

文件示例

elasticsearch_store_fnm.png

具体实现类

/**
 *  Access to the Field Info file that describes document fields and whether or
 *  not they are indexed. Each segment has a separate Field Info file. Objects
 *  of this class are thread-safe for multiple readers, but only one thread can
 *  be adding documents at a time, with no other reader or writer threads
 *  accessing this object.
 **/
public final class FieldInfo {
  /** Field's name */
  public final String name;

  /** Internal field number */
  //field在内部的编号
  public final int number;

  //field docvalues的类型
  private DocValuesType docValuesType = DocValuesType.NONE;

  // True if any document indexed term vectors
  private boolean storeTermVector;

  private boolean omitNorms; // omit norms associated with indexed fields 

  //index的配置项
  private IndexOptions indexOptions = IndexOptions.NONE;

  private boolean storePayloads; // whether this field stores payloads together with term positions 

  private final Map<String,String> attributes;

  // docvalues的generation
  private long dvGen;
}

数据存储文件

文件后缀:.fdx, .fdt

索引文件为.fdx,数据文件为.fdt,数据存储文件功能为根据自动的文档id,得到文档的内容,搜索引擎的术语习惯称之为正排数据,即doc_id -> content,es的_source数据就存在这

索引文件记录了快速定位文档数据的索引信息,数据文件记录了所有文档id的具体内容。

文件示例

elasticsearch_store_fdt.png

具体实现类

/**
 * Random-access reader for {@link CompressingStoredFieldsIndexWriter}.
 * @lucene.internal
 */
public final class CompressingStoredFieldsIndexReader implements Cloneable, Accountable {
  private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CompressingStoredFieldsIndexReader.class);

  final int maxDoc;

  //docid索引,快速定位某个docid的数组坐标
  final int[] docBases;

  //快速定位某个docid所在的文件offset的startPointer
  final long[] startPointers;

  //平均一个chunk的文档数
  final int[] avgChunkDocs;

  //平均一个chunk的size
  final long[] avgChunkSizes;

  final PackedInts.Reader[] docBasesDeltas; // delta from the avg

  final PackedInts.Reader[] startPointersDeltas; // delta from the avg
}

/**
 * {@link StoredFieldsReader} impl for {@link CompressingStoredFieldsFormat}.
 * @lucene.experimental
 */
public final class CompressingStoredFieldsReader extends StoredFieldsReader {

  //从fdt正排索引文件中获得
  private final int version;

  // field的基本信息
  private final FieldInfos fieldInfos;

  //fdt正排索引文件reader
  private final CompressingStoredFieldsIndexReader indexReader;

  //从fdt正排索引文件中获得,用于指向fdx数据文件的末端,指向numChunks地址4
  private final long maxPointer;

  //fdx正排数据文件句柄
  private final IndexInput fieldsStream;

  //块大小
  private final int chunkSize;

  private final int packedIntsVersion;

  //压缩类型
  private final CompressionMode compressionMode;

  //解压缩处理对象
  private final Decompressor decompressor;

  //文档数量,从segment元数据中获得
  private final int numDocs;

  //是否正在merge,默认为false
  private final boolean merging;

  //初始化时new了一个BlockState,BlockState记录下当前正排文件读取的状态信息
  private final BlockState state;
  //chunk的数量
  private final long numChunks; // number of compressed blocks written

  //dirty chunk的数量
  private final long numDirtyChunks; // number of incomplete compressed blocks written

  //是否close,默认为false
  private boolean closed;
}

倒排索引文件

索引后缀:.tip,.tim

倒排索引也包含索引文件和数据文件,.tip为索引文件,.tim为数据文件,索引文件包含了每个字段的索引元信息,数据文件有具体的索引内容。

5.5.0版本的倒排索引实现为FST tree,FST tree的最大优势就是内存空间占用非常低 ,具体可以参看下这篇文章:http://www.cnblogs.com/bonelee/p/6226185.html

http://examples.mikemccandless.com/fst.py?terms=&cmd=Build+it 为FST图实例,可以根据输入的数据构造出FST图

输入到 FST 中的数据为:
String inputValues[] = {"mop","moth","pop","star","stop","top"};
long outputValues[] = {0,1,2,3,4,5};

生成的 FST 图为:

elasticsearch_store_tip1.png

elasticsearch_store_tip2.png

文件示例

elasticsearch_store_tip3.png

具体实现类

public final class BlockTreeTermsReader extends FieldsProducer {
  // Open input to the main terms dict file (_X.tib)
  final IndexInput termsIn;
  // Reads the terms dict entries, to gather state to
  // produce DocsEnum on demand
  final PostingsReaderBase postingsReader;
  private final TreeMap<String,FieldReader> fields = new TreeMap<>();

  /** File offset where the directory starts in the terms file. */
  /索引数据文件tim的数据的尾部的元数据的地址
  private long dirOffset;
  /** File offset where the directory starts in the index file. */

  //索引文件tip的数据的尾部的元数据的地址
  private long indexDirOffset;

  //semgent的名称
  final String segment;

  //版本号
  final int version;

  //5.3.x index, we record up front if we may have written any auto-prefix terms,示例中记录的是false
  final boolean anyAutoPrefixTerms;
}

/**
 * BlockTree's implementation of {@link Terms}.
 * @lucene.internal
 */
public final class FieldReader extends Terms implements Accountable {

  //term的数量
  final long numTerms;

  //field信息
  final FieldInfo fieldInfo;

  final long sumTotalTermFreq;

  //总的文档频率
  final long sumDocFreq;

  //文档数量
  final int docCount;

  //字段在索引文件tip中的起始位置
  final long indexStartFP;

  final long rootBlockFP;

  final BytesRef rootCode;

  final BytesRef minTerm;

  final BytesRef maxTerm;

  //longs:metadata buffer, holding monotonic values
  final int longsSize;

  final BlockTreeTermsReader parent;

  final FST<BytesRef> index;
}

倒排链文件

文件后缀:.doc, .pos, .pay

.doc保存了每个term的doc id列表和term在doc中的词频

全文索引的字段,会有.pos文件,保存了term在doc中的位置

全文索引的字段,使用了一些像payloads的高级特性才会有.pay文件,保存了term在doc中的一些高级特性

文件示例

elasticsearch_store_doc.png

具体实现类

/**
 * Concrete class that reads docId(maybe frq,pos,offset,payloads) list
 * with postings format.
 *
 * @lucene.experimental
 */
public final class Lucene50PostingsReader extends PostingsReaderBase {
  private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Lucene50PostingsReader.class);
  private final IndexInput docIn;
  private final IndexInput posIn;
  private final IndexInput payIn;
  final ForUtil forUtil;
  private int version;

  //不分词的字段使用的是该对象,基于skiplist实现了倒排链
  final class BlockDocsEnum extends PostingsEnum {
  }

  //全文检索字段使用的是该对象
  final class BlockPostingsEnum extends PostingsEnum {
  }

  //包含高级特性的字段使用的是该对象
  final class EverythingEnum extends PostingsEnum {
  }
}

列存文件(docvalues)

文件后缀:.dvm, .dvd

索引文件为.dvm,数据文件为.dvd。

lucene实现的docvalues有如下类型:

  • 1、NONE 不开启docvalue时的状态
  • 2、NUMERIC 单个数值类型的docvalue主要包括(int,long,float,double)
  • 3、BINARY 二进制类型值对应不同的codes最大值可能超过32766字节,
  • 4、SORTED 有序增量字节存储,仅仅存储不同部分的值和偏移量指针,值必须小于等于32766字节
  • 5、SORTED_NUMERIC 存储数值类型的有序数组列表
  • 6、SORTED_SET 可以存储多值域的docvalue值,但返回时,仅仅只能返回多值域的第一个docvalue
  • 7、对应not_anaylized的string字段,使用的是SORTED_SET类型,number的类型是SORTED_NUMERIC类型

其中SORTED_SET 的 SORTED_SINGLE_VALUED类型包括了两类数据 : binary + numeric, binary是按ord排序的term的列表,numeric是doc到ord的映射。

文件示例

elasticsearch_store_dvd.png

具体实现类

/** reader for {@link Lucene54DocValuesFormat} */
final class Lucene54DocValuesProducer extends DocValuesProducer implements Closeable {
  //number类型的field的列存列表
  private final Map<String,NumericEntry> numerics = new HashMap<>();

  //字符串类型的field的列存列表
  private final Map<String,BinaryEntry> binaries = new HashMap<>();

  //有序字符串类型的field的列存列表
  private final Map<String,SortedSetEntry> sortedSets = new HashMap<>();

  //有序number类型的field的列存列表
  private final Map<String,SortedSetEntry> sortedNumerics = new HashMap<>();

  //字符串类型的field的ords列表
  private final Map<String,NumericEntry> ords = new HashMap<>();

  //docId -> address -> ord 中field的ords列表
  private final Map<String,NumericEntry> ordIndexes = new HashMap<>();

  //field的数量
  private final int numFields;

  //内存使用量
  private final AtomicLong ramBytesUsed;

  //数据源的文件句柄
  private final IndexInput data;

  //文档数
  private final int maxDoc;
  // memory-resident structures
  private final Map<String,MonotonicBlockPackedReader> addressInstances = new HashMap<>();
  private final Map<String,ReverseTermsIndex> reverseIndexInstances = new HashMap<>();
  private final Map<String,DirectMonotonicReader.Meta> directAddressesMeta = new HashMap<>();

  //是否正在merge
  private final boolean merging;
}

/** metadata entry for a numeric docvalues field */
  static class NumericEntry {
    private NumericEntry() {}
    /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */
    long missingOffset;

    /** offset to the actual numeric values */
    //field的在数据文件中的起始地址
    public long offset;

    /** end offset to the actual numeric values */
    //field的在数据文件中的结尾地址
    public long endOffset;

    /** bits per value used to pack the numeric values */
    public int bitsPerValue;

    //format类型
    int format;
    /** count of values written */
    public long count;
    /** monotonic meta */
    public DirectMonotonicReader.Meta monotonicMeta;

    //最小的value
    long minValue;

    //Compressed by computing the GCD
    long gcd;

    //Compressed by giving IDs to unique values.
    long table[];
    /** for sparse compression */
    long numDocsWithValue;
    NumericEntry nonMissingValues;
    NumberType numberType;
  }

  /** metadata entry for a binary docvalues field */
  static class BinaryEntry {
    private BinaryEntry() {}
    /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */
    long missingOffset;
    /** offset to the actual binary values */
    //field的在数据文件中的起始地址
    long offset;
    int format;
    /** count of values written */
    public long count;

    //最短字符串的长度
    int minLength;

    //最长字符串的长度
    int maxLength;
    /** offset to the addressing data that maps a value to its slice of the byte[] */
    public long addressesOffset, addressesEndOffset;
    /** meta data for addresses */
    public DirectMonotonicReader.Meta addressesMeta;
    /** offset to the reverse index */
    public long reverseIndexOffset;
    /** packed ints version used to encode addressing information */
    public int packedIntsVersion;
    /** packed ints blocksize */
    public int blockSize;
  }

参考资料

lucene source code

lucene document

lucene字典实现原理——FST

继续阅读 »

前言

很多使用Elasticsearch的同学会关心数据存储在ES中的存储容量,会有这样的疑问:xxTB的数据入到ES会使用多少存储空间。这个问题其实很难直接回答的,只有数据写入ES后,才能观察到实际的存储空间。比如同样是1TB的数据,写入ES的存储空间可能差距会非常大,可能小到只有300~400GB,也可能多到6-7TB,为什么会造成这么大的差距呢?究其原因,我们来探究下Elasticsearch中的数据是如何存储。文章中我以Elasticsearch 2.3版本为示例,对应的lucene版本是5.5,Elasticsearch现在已经来到了6.5版本,数字类型、列存等存储结构有些变化,但基本的概念变化不多,文章中的内容依然适用。

Elasticsearch索引结构

Elasticsearch对外提供的是index的概念,可以类比为DB,用户查询是在index上完成的,每个index由若干个shard组成,以此来达到分布式可扩展的能力。比如下图是一个由10个shard组成的index。

elasticsearch_store_arc.png

shard是Elasticsearch数据存储的最小单位,index的存储容量为所有shard的存储容量之和。Elasticsearch集群的存储容量则为所有index存储容量之和。

一个shard就对应了一个lucene的library。对于一个shard,Elasticsearch增加了translog的功能,类似于HBase WAL,是数据写入过程中的中间数据,其余的数据都在lucene库中管理的。

所以Elasticsearch索引使用的存储内容主要取决于lucene中的数据存储。

lucene数据存储

下面我们主要看下lucene的文件内容,在了解lucene文件内容前,大家先了解些lucene的基本概念。

lucene基本概念

  • segment : lucene内部的数据是由一个个segment组成的,写入lucene的数据并不直接落盘,而是先写在内存中,经过了refresh间隔,lucene才将该时间段写入的全部数据refresh成一个segment,segment多了之后会进行merge成更大的segment。lucene查询时会遍历每个segment完成。由于lucene* 写入的数据是在内存中完成,所以写入效率非常高。但是也存在丢失数据的风险,所以Elasticsearch基于此现象实现了translog,只有在segment数据落盘后,Elasticsearch才会删除对应的translog。
  • doc : doc表示lucene中的一条记录
  • field :field表示记录中的字段概念,一个doc由若干个field组成。
  • term :term是lucene中索引的最小单位,某个field对应的内容如果是全文检索类型,会将内容进行分词,分词的结果就是由term组成的。如果是不分词的字段,那么该字段的内容就是一个term。
  • 倒排索引(inverted index): lucene索引的通用叫法,即实现了term到doc list的映射。
  • 正排数据:搜索引擎的通用叫法,即原始数据,可以理解为一个doc list。
  • docvalues :Elasticsearch中的列式存储的名称,Elasticsearch除了存储原始存储、倒排索引,还存储了一份docvalues,用作分析和排序。

lucene文件内容

lucene包的文件是由很多segment文件组成的,segments_xxx文件记录了lucene包下面的segment文件数量。每个segment会包含如下的文件。

Name Extension Brief Description
Segment Info .si segment的元数据文件
Compound File .cfs, .cfe 一个segment包含了如下表的各个文件,为减少打开文件的数量,在segment小的时候,segment的所有文件内容都保存在cfs文件中,cfe文件保存了lucene各文件在cfs文件的位置信息
Fields .fnm 保存了fields的相关信息
Field Index .fdx 正排存储文件的元数据信息
Field Data .fdt 存储了正排存储数据,写入的原文存储在这
Term Dictionary .tim 倒排索引的元数据信息
Term Index .tip 倒排索引文件,存储了所有的倒排索引数据
Frequencies .doc 保存了每个term的doc id列表和term在doc中的词频
Positions .pos Stores position information about where a term occurs in the index
全文索引的字段,会有该文件,保存了term在doc中的位置
Payloads .pay Stores additional per-position metadata information such as character offsets and user payloads
全文索引的字段,使用了一些像payloads的高级特性会有该文件,保存了term在doc中的一些高级特性
Norms .nvd, .nvm 文件保存索引字段加权数据
Per-Document Values .dvd, .dvm lucene的docvalues文件,即数据的列式存储,用作聚合和排序
Term Vector Data .tvx, .tvd, .tvf Stores offset into the document data file
保存索引字段的矢量信息,用在对term进行高亮,计算文本相关性中使用
Live Documents .liv 记录了segment中删除的doc

测试数据示例

下面我们以真实的数据作为示例,看看lucene中各类型数据的容量占比。

写100w数据,有一个uuid字段,写入的是长度为36位的uuid,字符串总为3600w字节,约为35M。

数据使用一个shard,不带副本,使用默认的压缩算法,写入完成后merge成一个segment方便观察。

使用线上默认的配置,uuid存为不分词的字符串类型。创建如下索引:

PUT test_field
{
  "settings": {
    "index": {
      "number_of_shards": "1",
      "number_of_replicas": "0",
      "refresh_interval": "30s"
    }
  },
  "mappings": {
    "type": {
      "_all": {
        "enabled": false
      }, 
      "properties": {
        "uuid": {
          "type": "string",
          "index": "not_analyzed"
        }
      }
    }
  }
}

首先写入100w不同的uuid,使用磁盘容量细节如下:


health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    122.7mb        122.7mb 

-rw-r--r--  1 weizijun  staff    41M Aug 19 21:23 _8.fdt
-rw-r--r--  1 weizijun  staff    17K Aug 19 21:23 _8.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 19 21:23 _8.fnm
-rw-r--r--  1 weizijun  staff   494B Aug 19 21:23 _8.si
-rw-r--r--  1 weizijun  staff   265K Aug 19 21:23 _8_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff    44M Aug 19 21:23 _8_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   340K Aug 19 21:23 _8_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 19 21:23 _8_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 19 21:23 _8_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   195B Aug 19 21:23 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 19 21:20 write.lock

可以看到正排数据、倒排索引数据,列存数据容量占比几乎相同,正排数据和倒排数据还会存储Elasticsearch的唯一id字段,所以容量会比列存多一些。

35M的uuid存入Elasticsearch后,数据膨胀了3倍,达到了122.7mb。Elasticsearch竟然这么消耗资源,不要着急下结论,接下来看另一个测试结果。

我们写入100w一样的uuid,然后看看Elasticsearch使用的容量。

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0     13.2mb         13.2mb 

-rw-r--r--  1 weizijun  staff   5.5M Aug 19 21:29 _6.fdt
-rw-r--r--  1 weizijun  staff    15K Aug 19 21:29 _6.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 19 21:29 _6.fnm
-rw-r--r--  1 weizijun  staff   494B Aug 19 21:29 _6.si
-rw-r--r--  1 weizijun  staff   309K Aug 19 21:29 _6_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   7.0M Aug 19 21:29 _6_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   195K Aug 19 21:29 _6_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff   244K Aug 19 21:29 _6_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   252B Aug 19 21:29 _6_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   195B Aug 19 21:29 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 19 21:26 write.lock

这回35M的数据Elasticsearch容量只有13.2mb,其中还有主要的占比还是Elasticsearch的唯一id,100w的uuid几乎不占存储容积。

所以在Elasticsearch中建立索引的字段如果基数越大(count distinct),越占用磁盘空间。

我们再看看存100w个不一样的整型会是如何。

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0     13.6mb         13.6mb 

-rw-r--r--  1 weizijun  staff   6.1M Aug 28 10:19 _42.fdt
-rw-r--r--  1 weizijun  staff    22K Aug 28 10:19 _42.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 28 10:19 _42.fnm
-rw-r--r--  1 weizijun  staff   503B Aug 28 10:19 _42.si
-rw-r--r--  1 weizijun  staff   2.8M Aug 28 10:19 _42_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   2.2M Aug 28 10:19 _42_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff    83K Aug 28 10:19 _42_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff   2.5M Aug 28 10:19 _42_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   228B Aug 28 10:19 _42_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   196B Aug 28 10:19 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 28 10:16 write.lock

从结果可以看到,100w整型数据,Elasticsearch的存储开销为13.6mb。如果以int型计算100w数据的长度的话,为400w字节,大概是3.8mb数据。忽略Elasticsearch唯一id字段的影响,Elasticsearch实际存储容量跟整型数据长度差不多。

我们再看一下开启最佳压缩参数对存储空间的影响:

health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    107.2mb        107.2mb 

-rw-r--r--  1 weizijun  staff    25M Aug 20 12:30 _5.fdt
-rw-r--r--  1 weizijun  staff   6.0K Aug 20 12:30 _5.fdx
-rw-r--r--  1 weizijun  staff   688B Aug 20 12:31 _5.fnm
-rw-r--r--  1 weizijun  staff   500B Aug 20 12:31 _5.si
-rw-r--r--  1 weizijun  staff   265K Aug 20 12:31 _5_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff    44M Aug 20 12:31 _5_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   322K Aug 20 12:31 _5_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 20 12:31 _5_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 20 12:31 _5_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   224B Aug 20 12:31 segments_4
-rw-r--r--  1 weizijun  staff     0B Aug 20 12:00 write.lock

结果中可以发现,只有正排数据会启动压缩,压缩能力确实强劲,不考虑唯一id字段,存储容量大概压缩到接近50%。

我们还做了一些实验,Elasticsearch默认是开启_all参数的,_all可以让用户传入的整体json数据作为全文检索的字段,可以更方便的检索,但在现实场景中已经使用的不多,相反会增加很多存储容量的开销,可以看下开启_all的磁盘空间使用情况:


health status index      pri rep docs.count docs.deleted store.size pri.store.size 
green  open   test_field   1   0    1000000            0    162.4mb        162.4mb 

-rw-r--r--  1 weizijun  staff    41M Aug 18 22:59 _20.fdt
-rw-r--r--  1 weizijun  staff    18K Aug 18 22:59 _20.fdx
-rw-r--r--  1 weizijun  staff   777B Aug 18 22:59 _20.fnm
-rw-r--r--  1 weizijun  staff    59B Aug 18 22:59 _20.nvd
-rw-r--r--  1 weizijun  staff    78B Aug 18 22:59 _20.nvm
-rw-r--r--  1 weizijun  staff   539B Aug 18 22:59 _20.si
-rw-r--r--  1 weizijun  staff   7.2M Aug 18 22:59 _20_Lucene50_0.doc
-rw-r--r--  1 weizijun  staff   4.2M Aug 18 22:59 _20_Lucene50_0.pos
-rw-r--r--  1 weizijun  staff    73M Aug 18 22:59 _20_Lucene50_0.tim
-rw-r--r--  1 weizijun  staff   832K Aug 18 22:59 _20_Lucene50_0.tip
-rw-r--r--  1 weizijun  staff    37M Aug 18 22:59 _20_Lucene54_0.dvd
-rw-r--r--  1 weizijun  staff   254B Aug 18 22:59 _20_Lucene54_0.dvm
-rw-r--r--  1 weizijun  staff   196B Aug 18 22:59 segments_2
-rw-r--r--  1 weizijun  staff     0B Aug 18 22:53 write.lock

开启_all比不开启多了40mb的存储空间,多的数据都在倒排索引上,大约会增加30%多的存储开销。所以线上都直接禁用。

然后我还做了其他几个尝试,为了验证存储容量是否和数据量成正比,写入1000w数据的uuid,发现存储容量基本为100w数据的10倍。我还验证了数据长度是否和数据量成正比,发现把uuid增长2倍、4倍,存储容量也响应的增加了2倍和4倍。在此就不一一列出数据了。

lucene各文件具体内容和实现

lucene数据元信息文件

文件名为:segments_xxx

该文件为lucene数据文件的元信息文件,记录所有segment的元数据信息。

该文件主要记录了目前有多少segment,每个segment有一些基本信息,更新这些信息定位到每个segment的元信息文件。

lucene元信息文件还支持记录userData,Elasticsearch可以在此记录translog的一些相关信息。

文件示例

elasticsearch_store_segments.png

具体实现类

public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo> {
  // generation是segment的版本的概念,从文件名中提取出来,实例中为:2t/101
  private long generation;     // generation of the "segments_N" for the next commit

  private long lastGeneration; // generation of the "segments_N" file we last successfully read
                               // or wrote; this is normally the same as generation except if
                               // there was an IOException that had interrupted a commit

  /** Id for this commit; only written starting with Lucene 5.0 */
  private byte[] id;

  /** Which Lucene version wrote this commit, or null if this commit is pre-5.3. */
  private Version luceneVersion;

  /** Counts how often the index has been changed.  */
  public long version;

  /** Used to name new segments. */
  // TODO: should this be a long ...?
  public int counter;

  /** Version of the oldest segment in the index, or null if there are no segments. */
  private Version minSegmentLuceneVersion;

  private List<SegmentCommitInfo> segments = new ArrayList<>();

  /** Opaque Map&lt;String, String&gt; that user can specify during IndexWriter.commit */
  public Map<String,String> userData = Collections.emptyMap();
}

/** Embeds a [read-only] SegmentInfo and adds per-commit
 *  fields.
 *
 *  @lucene.experimental */
public class SegmentCommitInfo {

  /** The {@link SegmentInfo} that we wrap. */
  public final SegmentInfo info;

  // How many deleted docs in the segment:
  private int delCount;

  // Generation number of the live docs file (-1 if there
  // are no deletes yet):
  private long delGen;

  // Normally 1+delGen, unless an exception was hit on last
  // attempt to write:
  private long nextWriteDelGen;

  // Generation number of the FieldInfos (-1 if there are no updates)
  private long fieldInfosGen;

  // Normally 1+fieldInfosGen, unless an exception was hit on last attempt to
  // write
  private long nextWriteFieldInfosGen; //fieldInfosGen == -1 ? 1 : fieldInfosGen + 1;

  // Generation number of the DocValues (-1 if there are no updates)
  private long docValuesGen;

  // Normally 1+dvGen, unless an exception was hit on last attempt to
  // write
  private long nextWriteDocValuesGen; //docValuesGen == -1 ? 1 : docValuesGen + 1;

  // TODO should we add .files() to FieldInfosFormat, like we have on
  // LiveDocsFormat?
  // track the fieldInfos update files
  private final Set<String> fieldInfosFiles = new HashSet<>();

  // Track the per-field DocValues update files
  private final Map<Integer,Set<String>> dvUpdatesFiles = new HashMap<>();

  // Track the per-generation updates files
  @Deprecated
  private final Map<Long,Set<String>> genUpdatesFiles = new HashMap<>();

  private volatile long sizeInBytes = -1;
}

segment的元信息文件

文件后缀:.si

每个segment都有一个.si文件,记录了该segment的元信息。

segment元信息文件中记录了segment的文档数量,segment对应的文件列表等信息。

文件示例

elasticsearch_store_si.png

具体实现类

/**
 * Information about a segment such as its name, directory, and files related
 * to the segment.
 *
 * @lucene.experimental
 */
public final class SegmentInfo {

  // _bl
  public final String name;

  /** Where this segment resides. */
  public final Directory dir;

  /** Id that uniquely identifies this segment. */
  private final byte[] id;

  private Codec codec;

  // Tracks the Lucene version this segment was created with, since 3.1. Null
  // indicates an older than 3.0 index, and it's used to detect a too old index.
  // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
  // specific versions afterwards ("3.0.0", "3.1.0" etc.).
  // see o.a.l.util.Version.
  private Version version;

  private int maxDoc;         // number of docs in seg

  private boolean isCompoundFile;

  private Map<String,String> diagnostics;

  private Set<String> setFiles;

  private final Map<String,String> attributes;
}

fields信息文件

文件后缀:.fnm

该文件存储了fields的基本信息。

fields信息中包括field的数量,field的类型,以及IndexOpetions,包括是否存储、是否索引,是否分词,是否需要列存等等。

文件示例

elasticsearch_store_fnm.png

具体实现类

/**
 *  Access to the Field Info file that describes document fields and whether or
 *  not they are indexed. Each segment has a separate Field Info file. Objects
 *  of this class are thread-safe for multiple readers, but only one thread can
 *  be adding documents at a time, with no other reader or writer threads
 *  accessing this object.
 **/
public final class FieldInfo {
  /** Field's name */
  public final String name;

  /** Internal field number */
  //field在内部的编号
  public final int number;

  //field docvalues的类型
  private DocValuesType docValuesType = DocValuesType.NONE;

  // True if any document indexed term vectors
  private boolean storeTermVector;

  private boolean omitNorms; // omit norms associated with indexed fields 

  //index的配置项
  private IndexOptions indexOptions = IndexOptions.NONE;

  private boolean storePayloads; // whether this field stores payloads together with term positions 

  private final Map<String,String> attributes;

  // docvalues的generation
  private long dvGen;
}

数据存储文件

文件后缀:.fdx, .fdt

索引文件为.fdx,数据文件为.fdt,数据存储文件功能为根据自动的文档id,得到文档的内容,搜索引擎的术语习惯称之为正排数据,即doc_id -> content,es的_source数据就存在这

索引文件记录了快速定位文档数据的索引信息,数据文件记录了所有文档id的具体内容。

文件示例

elasticsearch_store_fdt.png

具体实现类

/**
 * Random-access reader for {@link CompressingStoredFieldsIndexWriter}.
 * @lucene.internal
 */
public final class CompressingStoredFieldsIndexReader implements Cloneable, Accountable {
  private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CompressingStoredFieldsIndexReader.class);

  final int maxDoc;

  //docid索引,快速定位某个docid的数组坐标
  final int[] docBases;

  //快速定位某个docid所在的文件offset的startPointer
  final long[] startPointers;

  //平均一个chunk的文档数
  final int[] avgChunkDocs;

  //平均一个chunk的size
  final long[] avgChunkSizes;

  final PackedInts.Reader[] docBasesDeltas; // delta from the avg

  final PackedInts.Reader[] startPointersDeltas; // delta from the avg
}

/**
 * {@link StoredFieldsReader} impl for {@link CompressingStoredFieldsFormat}.
 * @lucene.experimental
 */
public final class CompressingStoredFieldsReader extends StoredFieldsReader {

  //从fdt正排索引文件中获得
  private final int version;

  // field的基本信息
  private final FieldInfos fieldInfos;

  //fdt正排索引文件reader
  private final CompressingStoredFieldsIndexReader indexReader;

  //从fdt正排索引文件中获得,用于指向fdx数据文件的末端,指向numChunks地址4
  private final long maxPointer;

  //fdx正排数据文件句柄
  private final IndexInput fieldsStream;

  //块大小
  private final int chunkSize;

  private final int packedIntsVersion;

  //压缩类型
  private final CompressionMode compressionMode;

  //解压缩处理对象
  private final Decompressor decompressor;

  //文档数量,从segment元数据中获得
  private final int numDocs;

  //是否正在merge,默认为false
  private final boolean merging;

  //初始化时new了一个BlockState,BlockState记录下当前正排文件读取的状态信息
  private final BlockState state;
  //chunk的数量
  private final long numChunks; // number of compressed blocks written

  //dirty chunk的数量
  private final long numDirtyChunks; // number of incomplete compressed blocks written

  //是否close,默认为false
  private boolean closed;
}

倒排索引文件

索引后缀:.tip,.tim

倒排索引也包含索引文件和数据文件,.tip为索引文件,.tim为数据文件,索引文件包含了每个字段的索引元信息,数据文件有具体的索引内容。

5.5.0版本的倒排索引实现为FST tree,FST tree的最大优势就是内存空间占用非常低 ,具体可以参看下这篇文章:http://www.cnblogs.com/bonelee/p/6226185.html

http://examples.mikemccandless.com/fst.py?terms=&cmd=Build+it 为FST图实例,可以根据输入的数据构造出FST图

输入到 FST 中的数据为:
String inputValues[] = {"mop","moth","pop","star","stop","top"};
long outputValues[] = {0,1,2,3,4,5};

生成的 FST 图为:

elasticsearch_store_tip1.png

elasticsearch_store_tip2.png

文件示例

elasticsearch_store_tip3.png

具体实现类

public final class BlockTreeTermsReader extends FieldsProducer {
  // Open input to the main terms dict file (_X.tib)
  final IndexInput termsIn;
  // Reads the terms dict entries, to gather state to
  // produce DocsEnum on demand
  final PostingsReaderBase postingsReader;
  private final TreeMap<String,FieldReader> fields = new TreeMap<>();

  /** File offset where the directory starts in the terms file. */
  /索引数据文件tim的数据的尾部的元数据的地址
  private long dirOffset;
  /** File offset where the directory starts in the index file. */

  //索引文件tip的数据的尾部的元数据的地址
  private long indexDirOffset;

  //semgent的名称
  final String segment;

  //版本号
  final int version;

  //5.3.x index, we record up front if we may have written any auto-prefix terms,示例中记录的是false
  final boolean anyAutoPrefixTerms;
}

/**
 * BlockTree's implementation of {@link Terms}.
 * @lucene.internal
 */
public final class FieldReader extends Terms implements Accountable {

  //term的数量
  final long numTerms;

  //field信息
  final FieldInfo fieldInfo;

  final long sumTotalTermFreq;

  //总的文档频率
  final long sumDocFreq;

  //文档数量
  final int docCount;

  //字段在索引文件tip中的起始位置
  final long indexStartFP;

  final long rootBlockFP;

  final BytesRef rootCode;

  final BytesRef minTerm;

  final BytesRef maxTerm;

  //longs:metadata buffer, holding monotonic values
  final int longsSize;

  final BlockTreeTermsReader parent;

  final FST<BytesRef> index;
}

倒排链文件

文件后缀:.doc, .pos, .pay

.doc保存了每个term的doc id列表和term在doc中的词频

全文索引的字段,会有.pos文件,保存了term在doc中的位置

全文索引的字段,使用了一些像payloads的高级特性才会有.pay文件,保存了term在doc中的一些高级特性

文件示例

elasticsearch_store_doc.png

具体实现类

/**
 * Concrete class that reads docId(maybe frq,pos,offset,payloads) list
 * with postings format.
 *
 * @lucene.experimental
 */
public final class Lucene50PostingsReader extends PostingsReaderBase {
  private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Lucene50PostingsReader.class);
  private final IndexInput docIn;
  private final IndexInput posIn;
  private final IndexInput payIn;
  final ForUtil forUtil;
  private int version;

  //不分词的字段使用的是该对象,基于skiplist实现了倒排链
  final class BlockDocsEnum extends PostingsEnum {
  }

  //全文检索字段使用的是该对象
  final class BlockPostingsEnum extends PostingsEnum {
  }

  //包含高级特性的字段使用的是该对象
  final class EverythingEnum extends PostingsEnum {
  }
}

列存文件(docvalues)

文件后缀:.dvm, .dvd

索引文件为.dvm,数据文件为.dvd。

lucene实现的docvalues有如下类型:

  • 1、NONE 不开启docvalue时的状态
  • 2、NUMERIC 单个数值类型的docvalue主要包括(int,long,float,double)
  • 3、BINARY 二进制类型值对应不同的codes最大值可能超过32766字节,
  • 4、SORTED 有序增量字节存储,仅仅存储不同部分的值和偏移量指针,值必须小于等于32766字节
  • 5、SORTED_NUMERIC 存储数值类型的有序数组列表
  • 6、SORTED_SET 可以存储多值域的docvalue值,但返回时,仅仅只能返回多值域的第一个docvalue
  • 7、对应not_anaylized的string字段,使用的是SORTED_SET类型,number的类型是SORTED_NUMERIC类型

其中SORTED_SET 的 SORTED_SINGLE_VALUED类型包括了两类数据 : binary + numeric, binary是按ord排序的term的列表,numeric是doc到ord的映射。

文件示例

elasticsearch_store_dvd.png

具体实现类

/** reader for {@link Lucene54DocValuesFormat} */
final class Lucene54DocValuesProducer extends DocValuesProducer implements Closeable {
  //number类型的field的列存列表
  private final Map<String,NumericEntry> numerics = new HashMap<>();

  //字符串类型的field的列存列表
  private final Map<String,BinaryEntry> binaries = new HashMap<>();

  //有序字符串类型的field的列存列表
  private final Map<String,SortedSetEntry> sortedSets = new HashMap<>();

  //有序number类型的field的列存列表
  private final Map<String,SortedSetEntry> sortedNumerics = new HashMap<>();

  //字符串类型的field的ords列表
  private final Map<String,NumericEntry> ords = new HashMap<>();

  //docId -> address -> ord 中field的ords列表
  private final Map<String,NumericEntry> ordIndexes = new HashMap<>();

  //field的数量
  private final int numFields;

  //内存使用量
  private final AtomicLong ramBytesUsed;

  //数据源的文件句柄
  private final IndexInput data;

  //文档数
  private final int maxDoc;
  // memory-resident structures
  private final Map<String,MonotonicBlockPackedReader> addressInstances = new HashMap<>();
  private final Map<String,ReverseTermsIndex> reverseIndexInstances = new HashMap<>();
  private final Map<String,DirectMonotonicReader.Meta> directAddressesMeta = new HashMap<>();

  //是否正在merge
  private final boolean merging;
}

/** metadata entry for a numeric docvalues field */
  static class NumericEntry {
    private NumericEntry() {}
    /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */
    long missingOffset;

    /** offset to the actual numeric values */
    //field的在数据文件中的起始地址
    public long offset;

    /** end offset to the actual numeric values */
    //field的在数据文件中的结尾地址
    public long endOffset;

    /** bits per value used to pack the numeric values */
    public int bitsPerValue;

    //format类型
    int format;
    /** count of values written */
    public long count;
    /** monotonic meta */
    public DirectMonotonicReader.Meta monotonicMeta;

    //最小的value
    long minValue;

    //Compressed by computing the GCD
    long gcd;

    //Compressed by giving IDs to unique values.
    long table[];
    /** for sparse compression */
    long numDocsWithValue;
    NumericEntry nonMissingValues;
    NumberType numberType;
  }

  /** metadata entry for a binary docvalues field */
  static class BinaryEntry {
    private BinaryEntry() {}
    /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */
    long missingOffset;
    /** offset to the actual binary values */
    //field的在数据文件中的起始地址
    long offset;
    int format;
    /** count of values written */
    public long count;

    //最短字符串的长度
    int minLength;

    //最长字符串的长度
    int maxLength;
    /** offset to the addressing data that maps a value to its slice of the byte[] */
    public long addressesOffset, addressesEndOffset;
    /** meta data for addresses */
    public DirectMonotonicReader.Meta addressesMeta;
    /** offset to the reverse index */
    public long reverseIndexOffset;
    /** packed ints version used to encode addressing information */
    public int packedIntsVersion;
    /** packed ints blocksize */
    public int blockSize;
  }

参考资料

lucene source code

lucene document

lucene字典实现原理——FST

收起阅读 »

社区日报 第471期 (2018-12-07)

1、使用Elasticsearch作为主数据存储实践
http://t.cn/EyS6wcL
2、Elasticsearch 6.x启动过程
http://t.cn/EyJgQ7U
3、Elasticsearch你知道多少?
http://t.cn/EyS6I4P

编辑:铭毅天下
归档: https://elasticsearch.cn/article/6177
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1、使用Elasticsearch作为主数据存储实践
http://t.cn/EyS6wcL
2、Elasticsearch 6.x启动过程
http://t.cn/EyJgQ7U
3、Elasticsearch你知道多少?
http://t.cn/EyS6I4P

编辑:铭毅天下
归档: https://elasticsearch.cn/article/6177
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

Day 6 - Logstash Pipeline-to-Pipeline 尝鲜

Logstash 在 6.0 推出了 multiple pipeline 的解决方案,即在一个 logstash 实例中可以同时进行多个独立数据流程的处理工作,如下图所示。

而在这之前用户只能通过在单机运行多个 logstash 实例或者在配置文件中增加大量 if-else 条件判断语句来解决。要使用 multiple pipeline 也很简单,只需要将不同的 pipeline 在 config/pipeline.yml中定义好即可,如下所示:

- pipeline.id: apache
  pipeline.batch.size: 125
  queue.type: persisted
  path.config: "/path/to/config/apache.cfg"
- pipeline.id: nginx
  path.config: "/path/to/config/nginx.cfg"

其中 apachenginx作为独立的 pipeline 执行,而且配置也可以独立设置,互不干扰。pipeline.yml的引入极大地简化了 logstash 的配置管理工作,使得新手也可以很快完成复杂的 ETL 配置。

在 6.3 版本中,Logstash 又增加了 Pipeline-to-Pipeline的管道机制(beta),即管道和管道之间可以连接在一起组成一个完成的数据处理流。熟悉 linux 的管道命令 |的同学应该可以很快明白这种模式的好处。这无疑使得 Logstash 的配置会更加灵活,今天我们就来了解下这种灵活自由的配置方式。

1. 上手

废话少说,快速上手。修改 config/pipeline.yml文件如下:

 - pipeline.id: upstream
   config.string: input { stdin {} } output { pipeline { send_to => [test_output] } }
 - pipeline.id: downstream
   config.string: input { pipeline { address => test_output } } output{ stdout{}}

然后运行 logstash,其中 -r 表示配置文件有改动时自动重新加载,方便我们调试。

bin/logstash -r

在终端随意输入字符(比如aaa)后回车,会看到屏幕输出了类似下面的内容,代表运行成功了。

{
    "@timestamp" => 2018-12-06T14:43:50.310Z,
    "@version" => "1",
    "message" => "aaa",
    "host" => "rockybean-MacBook-Pro.local"
}

我们再回头看下这个配置,upstreamoutput 使用了名为 pipeline 的 plugin,然后 send_to的输出对象test_output是在 downstreaminput pipeline plugin 中定义的。通过这个唯一的address(虚拟地址)就能够把不同的 pipeline 连接在一起组成一个更长的pipeline来处理数据。类似下图所示:

当数据由 upstream传递给 downstream时会进行一个复制操作,这也意味着在这两个 pipeline 中的数据是完全独立的,互不影响。有一点要注意的是:数据的复制会增加额外的性能开销,比如会加大 JVM Heap 的使用。

2. 使用场景

使用方法是不是很简单,接下来我们来看下官方为我们开的几个脑洞。

2.1 Distributor Pattern 分发者模式

该模式执行效果类似下图所示:

在一个 pipeline 处理输入,然后根据不同的数据类型再分发到对应的 Pipeline 去处理。这种模式的好处在于统一输入端口,隔离不同类型的处理配置文件,减少由于配置文件混合在一起带来的维护成本。大家可以想一想如果不用这种Pipeline-to-Pipeline的方式,我们如果轻松做到一个端口处理多个来源的数据呢?

这种模式的参考配置如下所示:

# config/pipelines.yml
- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [type] == apache {
          pipeline { send_to => weblogs }
        } else if [type] == system {
          pipeline { send_to => syslog }
        } else {
          pipeline { send_to => fallback }
        }
    }
- pipeline.id: weblog-processing
  config.string: |
    input { pipeline { address => weblogs } }
    filter {
       # Weblog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_a_host] }
    }
- pipeline.id: syslog-processing
  config.string: |
    input { pipeline { address => syslog } }
    filter {
       # Syslog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_b_host] }
    }
- pipeline.id: fallback-processing
    config.string: |
    input { pipeline { address => fallback } }
    output { elasticsearch { hosts => [es_cluster_b_host] } }

2.2 Output Isolator Pattern 输出隔离模式

虽然 Logstash 的一个 pipeline 可以配置多个 output,但是这多个 output 会相依为命,一旦某一个 output 出问题,会导致另一个 output 也无法接收新数据。而通过这种模式可以完美解决这个问题。其运行方式如下图所示:

通过输出到两个独立的 pipeline,解除相互之间的影响,比如 http service 出问题的时候,es 依然可以正常接收数据,而且两个 pipeline 可以配置独立的队列来保障数据的完备性,其配置如下所示:

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => es } }
    output { elasticsearch { } }
- pipeline.id: buffered-http
  queue.type: persisted
  config.string: |
    input { pipeline { address => http } }
    output { http { } }

2.3 Forked Path Pattern 克隆路径模式

这个模式类似 Output Isolator Pattern,只是在不同的 output pipeline 中可以配置不同的 filter 来完成各自输出的数据处理需求,这里就不展开讲了,可以参考如下的配置,其中不同 output pipeline 的 filter 是不同的,比如 partner 这个 pipeline 去掉了一些敏感数据:

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => ["internal-es", "partner-s3"] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => "internal-es" } }
    # Index the full event
    output { elasticsearch { } }
- pipeline.id: partner
  queue.type: persisted
  config.string: |
    input { pipeline { address => "partner-s3" } }
    filter {
      # Remove the sensitive data
      mutate { remove_field => 'sensitive-data' }
    }
    output { s3 { } } # Output to partner's bucket

2.4 Collector Pattern 收集者模式

从名字可以看出,该模式是将所有 Pipeline 汇集于一处的处理模式,如下图所示:

其配置参考如下:

# config/pipelines.yml
- pipeline.id: beats
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka
  config.string: |
    input { kafka { ... } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner
  # This common pipeline enforces the same logic whether data comes from Kafka or Beats
  config.string: |
    input { pipeline { address => commonOut } }
    filter {
      # Always remove sensitive data from all input sources
      mutate { remove_field => 'sensitive-data' }
    }
    output { elasticsearch { } }

3. 总结

本文简单给大家讲解了 Pipeline-to-Pipeline的使用方法及官方推荐的几种模式,希望可以给大家有所帮助。另外这个机制目前还处于 Beta 阶段,尝鲜需谨慎!

继续阅读 »

Logstash 在 6.0 推出了 multiple pipeline 的解决方案,即在一个 logstash 实例中可以同时进行多个独立数据流程的处理工作,如下图所示。

而在这之前用户只能通过在单机运行多个 logstash 实例或者在配置文件中增加大量 if-else 条件判断语句来解决。要使用 multiple pipeline 也很简单,只需要将不同的 pipeline 在 config/pipeline.yml中定义好即可,如下所示:

- pipeline.id: apache
  pipeline.batch.size: 125
  queue.type: persisted
  path.config: "/path/to/config/apache.cfg"
- pipeline.id: nginx
  path.config: "/path/to/config/nginx.cfg"

其中 apachenginx作为独立的 pipeline 执行,而且配置也可以独立设置,互不干扰。pipeline.yml的引入极大地简化了 logstash 的配置管理工作,使得新手也可以很快完成复杂的 ETL 配置。

在 6.3 版本中,Logstash 又增加了 Pipeline-to-Pipeline的管道机制(beta),即管道和管道之间可以连接在一起组成一个完成的数据处理流。熟悉 linux 的管道命令 |的同学应该可以很快明白这种模式的好处。这无疑使得 Logstash 的配置会更加灵活,今天我们就来了解下这种灵活自由的配置方式。

1. 上手

废话少说,快速上手。修改 config/pipeline.yml文件如下:

 - pipeline.id: upstream
   config.string: input { stdin {} } output { pipeline { send_to => [test_output] } }
 - pipeline.id: downstream
   config.string: input { pipeline { address => test_output } } output{ stdout{}}

然后运行 logstash,其中 -r 表示配置文件有改动时自动重新加载,方便我们调试。

bin/logstash -r

在终端随意输入字符(比如aaa)后回车,会看到屏幕输出了类似下面的内容,代表运行成功了。

{
    "@timestamp" => 2018-12-06T14:43:50.310Z,
    "@version" => "1",
    "message" => "aaa",
    "host" => "rockybean-MacBook-Pro.local"
}

我们再回头看下这个配置,upstreamoutput 使用了名为 pipeline 的 plugin,然后 send_to的输出对象test_output是在 downstreaminput pipeline plugin 中定义的。通过这个唯一的address(虚拟地址)就能够把不同的 pipeline 连接在一起组成一个更长的pipeline来处理数据。类似下图所示:

当数据由 upstream传递给 downstream时会进行一个复制操作,这也意味着在这两个 pipeline 中的数据是完全独立的,互不影响。有一点要注意的是:数据的复制会增加额外的性能开销,比如会加大 JVM Heap 的使用。

2. 使用场景

使用方法是不是很简单,接下来我们来看下官方为我们开的几个脑洞。

2.1 Distributor Pattern 分发者模式

该模式执行效果类似下图所示:

在一个 pipeline 处理输入,然后根据不同的数据类型再分发到对应的 Pipeline 去处理。这种模式的好处在于统一输入端口,隔离不同类型的处理配置文件,减少由于配置文件混合在一起带来的维护成本。大家可以想一想如果不用这种Pipeline-to-Pipeline的方式,我们如果轻松做到一个端口处理多个来源的数据呢?

这种模式的参考配置如下所示:

# config/pipelines.yml
- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [type] == apache {
          pipeline { send_to => weblogs }
        } else if [type] == system {
          pipeline { send_to => syslog }
        } else {
          pipeline { send_to => fallback }
        }
    }
- pipeline.id: weblog-processing
  config.string: |
    input { pipeline { address => weblogs } }
    filter {
       # Weblog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_a_host] }
    }
- pipeline.id: syslog-processing
  config.string: |
    input { pipeline { address => syslog } }
    filter {
       # Syslog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_b_host] }
    }
- pipeline.id: fallback-processing
    config.string: |
    input { pipeline { address => fallback } }
    output { elasticsearch { hosts => [es_cluster_b_host] } }

2.2 Output Isolator Pattern 输出隔离模式

虽然 Logstash 的一个 pipeline 可以配置多个 output,但是这多个 output 会相依为命,一旦某一个 output 出问题,会导致另一个 output 也无法接收新数据。而通过这种模式可以完美解决这个问题。其运行方式如下图所示:

通过输出到两个独立的 pipeline,解除相互之间的影响,比如 http service 出问题的时候,es 依然可以正常接收数据,而且两个 pipeline 可以配置独立的队列来保障数据的完备性,其配置如下所示:

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => es } }
    output { elasticsearch { } }
- pipeline.id: buffered-http
  queue.type: persisted
  config.string: |
    input { pipeline { address => http } }
    output { http { } }

2.3 Forked Path Pattern 克隆路径模式

这个模式类似 Output Isolator Pattern,只是在不同的 output pipeline 中可以配置不同的 filter 来完成各自输出的数据处理需求,这里就不展开讲了,可以参考如下的配置,其中不同 output pipeline 的 filter 是不同的,比如 partner 这个 pipeline 去掉了一些敏感数据:

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => ["internal-es", "partner-s3"] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => "internal-es" } }
    # Index the full event
    output { elasticsearch { } }
- pipeline.id: partner
  queue.type: persisted
  config.string: |
    input { pipeline { address => "partner-s3" } }
    filter {
      # Remove the sensitive data
      mutate { remove_field => 'sensitive-data' }
    }
    output { s3 { } } # Output to partner's bucket

2.4 Collector Pattern 收集者模式

从名字可以看出,该模式是将所有 Pipeline 汇集于一处的处理模式,如下图所示:

其配置参考如下:

# config/pipelines.yml
- pipeline.id: beats
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka
  config.string: |
    input { kafka { ... } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner
  # This common pipeline enforces the same logic whether data comes from Kafka or Beats
  config.string: |
    input { pipeline { address => commonOut } }
    filter {
      # Always remove sensitive data from all input sources
      mutate { remove_field => 'sensitive-data' }
    }
    output { elasticsearch { } }

3. 总结

本文简单给大家讲解了 Pipeline-to-Pipeline的使用方法及官方推荐的几种模式,希望可以给大家有所帮助。另外这个机制目前还处于 Beta 阶段,尝鲜需谨慎!

收起阅读 »

社区日报 第470期 (2018-12-06)

1.图解 Elasticsearch2.2.0 原理
http://t.cn/Eybu8a3
2.ElasticSearch 恢复类型之对等恢复
http://t.cn/EyaCYFV
3.使用带注释的文本插件搜索事物
http://t.cn/EyaCntg

编辑:金桥
归档:https://elasticsearch.cn/article/6175
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.图解 Elasticsearch2.2.0 原理
http://t.cn/Eybu8a3
2.ElasticSearch 恢复类型之对等恢复
http://t.cn/EyaCYFV
3.使用带注释的文本插件搜索事物
http://t.cn/EyaCntg

编辑:金桥
归档:https://elasticsearch.cn/article/6175
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

es创建索引失败

#10001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/mstore.log
  fields:
    env: uat-10001-log
  include_lines: ['ERROR']
#10001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/catalina.out
  fields:
    env: uat-10001-catalina
  include_lines: ['ERROR']
#11001-log
- type: log
  enabled: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/delivery.log
  include_lines: ['ERROR']
  fields:
    env: uat-11001-log
#11001-catalina.out
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/catalina.out
  fields:
    env: uat-11001-catalina
  include_lines: ['ERROR']
#12001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/mstore.log
  fields:
    env: uat-12001-log
  include_lines: ['ERROR']
#12001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/catalina.out
  fields:
    env: uat-12001-catalina
  include_lines: ['ERROR']
#13001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/pay-web-boss.log
  fields:
    env: uat-13001-log
    include_lines: ['ERROR']
#13001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/catalina.out
  fields:
    env: uat-13001-catalina
  include_lines: ['ERROR']
#14001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/pay-web-gateway.log
  fields:
    env: uat-14001-log
  include_lines: ['ERROR']
#14001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/catalina.out
  fields:
    env: uat-14001-catalina
  include_lines: ['ERROR']
#15001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/roncoo-pay-web-merchant.log
  fields:
    env: uat-15001-log
  include_lines: ['ERROR']
#15001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/catalina.out
  fields:
    env: uat-15001-catalina
  include_lines: ['ERROR']      每次创建索引会少uat-11001-log和uat-12001-log,filebeat读取了这个两个日志文件
继续阅读 »
#10001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/mstore.log
  fields:
    env: uat-10001-log
  include_lines: ['ERROR']
#10001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/catalina.out
  fields:
    env: uat-10001-catalina
  include_lines: ['ERROR']
#11001-log
- type: log
  enabled: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/delivery.log
  include_lines: ['ERROR']
  fields:
    env: uat-11001-log
#11001-catalina.out
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/catalina.out
  fields:
    env: uat-11001-catalina
  include_lines: ['ERROR']
#12001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/mstore.log
  fields:
    env: uat-12001-log
  include_lines: ['ERROR']
#12001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/catalina.out
  fields:
    env: uat-12001-catalina
  include_lines: ['ERROR']
#13001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/pay-web-boss.log
  fields:
    env: uat-13001-log
    include_lines: ['ERROR']
#13001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/catalina.out
  fields:
    env: uat-13001-catalina
  include_lines: ['ERROR']
#14001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/pay-web-gateway.log
  fields:
    env: uat-14001-log
  include_lines: ['ERROR']
#14001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/catalina.out
  fields:
    env: uat-14001-catalina
  include_lines: ['ERROR']
#15001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/roncoo-pay-web-merchant.log
  fields:
    env: uat-15001-log
  include_lines: ['ERROR']
#15001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/catalina.out
  fields:
    env: uat-15001-catalina
  include_lines: ['ERROR']      每次创建索引会少uat-11001-log和uat-12001-log,filebeat读取了这个两个日志文件 收起阅读 »

社区日报 第469期 (2018-12-05)

1. Elasticsearch写入原理深入详解
http://t.cn/EyI2jxr
2.Lucene倒排索引简述 细说倒排索引构建
http://t.cn/EyI2eMN
3. 知乎如何基于开源Druid打造下一代数据平台
http://t.cn/E2Kmzj0
 
编辑:江水
归档:https://elasticsearch.cn/article/6173
订阅:https://tinyletter.com/elastic-daily
 
继续阅读 »
1. Elasticsearch写入原理深入详解
http://t.cn/EyI2jxr
2.Lucene倒排索引简述 细说倒排索引构建
http://t.cn/EyI2eMN
3. 知乎如何基于开源Druid打造下一代数据平台
http://t.cn/E2Kmzj0
 
编辑:江水
归档:https://elasticsearch.cn/article/6173
订阅:https://tinyletter.com/elastic-daily
  收起阅读 »

Day 5 - Elasticsearch 存储设备全解析

day5 - es存储设备全解析

Elastic Search 作为一个分布式系统,它的最小单元(shard)实现基于 lucene , lucene是一个io密集cpu密集的系统。cpu密集可以通过使用更多核,更快的cpu以及优化算法来解决。而io密集部分需要搭配高性能的存储设备以及存储策略来解决。

传统的服务器硬盘分为SATA,SAS硬盘以及现在最高性能的SSD硬盘,其中SSD硬盘又分为 SATA SSD,PCI-E SSD ,M.2 SSD(性能依次提升)。

两者的区别在于 SATA 最高可以提供 7200转的。著名的HADOOP集群中,一半都会选择企业级SATA盘来降低存储成本。而SATA盘容易损坏以及恢复速度的问题,则交给10g高速网卡以及三副本策略来解决。

如果是了解数据库领域的同学就会知道,MySQL 之类的数据库严重推荐使用SSD来做存储。TiDB这种新时代的分布式数据库甚至在安装过程中会见存储是否是高性能设备,当时低速设备时,安装将失败。

如何查看io压力

iostat -x 1 100

可以根据 iowait , ioutil 等值来综合判断. 当iowait长期接近100%基本代表io系统出现瓶颈了。这时候可以用iotop命令来诊断出具体是什么进程在消耗io资源。

如何测试硬盘性能

通过 fio 测试 顺序读/写,随机读/写性能。

顺序读 fio -name iops -rw=read -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1 随机读 fio -name iops -rw=randread -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1 顺序写 fio -name iops -rw=write -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1 随机写 fio -name iops -rw=randwrite -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1

更具体的测试可以参考磁盘性能指标--IOPS、吞吐量及测试

RAID

RAID 0

将数据分布在N块盘中,速度最快,可以享受磁盘的并行读取和写入;安全性最低,一块盘损坏,将导致所有数据丢失。

raid0.png

RAID 1

将数据同时保存在N块盘中,写入速度最慢(需要同时写多块盘)。安全性最高。

raid1.png

RAID 10 ?

将RAID 1 和 RAID 0 结合起来,获得高安全性和高性能。最常用的RAID策略。同时也是TiDB,MySQL等数据库推荐的RAID策略。

raid10.png

RAID 5

RAID 5 最低三块盘,存储数据的异或编码,在一块盘损坏时,可以提供编码恢复出数据。

raid5.png

ElasticSearch 使用低速设备的 Tips

修改index.merge.scheduler.max_thread_count参数为1;该参数影响lucene后台的合并线程数量,默认设置只适合SDD。多个合并线程可能导致io压力过大,触发 (linux 120s timeout)[https://cyberdak.github.io/es/2018/07/01/es-force-merge-cause-es-down].

存储策略

  1. 避免单机存储过多数据,如果单机故障,将导致集群需要大量数据,影响集群的吞吐量,特别是发生在高峰时候更会影响业务。千兆网卡每小时可以同步的数据为463gb,可以参考这个速度结合资深集群网卡以及存储来调节每个节点存储的数据量。
  2. 存储有条件使用RAID10,增加单节点性能以及避免单节点存储故障

RAID卡策略

根据服务器RAID卡的等级不同,高级的RAID卡可以使用 write-back 写策略,数据写入会直接写入到缓存中,随后刷新到硬盘上。当主机掉电时,由RAID卡带的电池来保证数据成功写入到硬盘中。write back的设置需要电池有电才能支持,而某些场景可以设置为force write-back(即使电池没电了,也要写缓存),从而提高写入性能。

继续阅读 »

day5 - es存储设备全解析

Elastic Search 作为一个分布式系统,它的最小单元(shard)实现基于 lucene , lucene是一个io密集cpu密集的系统。cpu密集可以通过使用更多核,更快的cpu以及优化算法来解决。而io密集部分需要搭配高性能的存储设备以及存储策略来解决。

传统的服务器硬盘分为SATA,SAS硬盘以及现在最高性能的SSD硬盘,其中SSD硬盘又分为 SATA SSD,PCI-E SSD ,M.2 SSD(性能依次提升)。

两者的区别在于 SATA 最高可以提供 7200转的。著名的HADOOP集群中,一半都会选择企业级SATA盘来降低存储成本。而SATA盘容易损坏以及恢复速度的问题,则交给10g高速网卡以及三副本策略来解决。

如果是了解数据库领域的同学就会知道,MySQL 之类的数据库严重推荐使用SSD来做存储。TiDB这种新时代的分布式数据库甚至在安装过程中会见存储是否是高性能设备,当时低速设备时,安装将失败。

如何查看io压力

iostat -x 1 100

可以根据 iowait , ioutil 等值来综合判断. 当iowait长期接近100%基本代表io系统出现瓶颈了。这时候可以用iotop命令来诊断出具体是什么进程在消耗io资源。

如何测试硬盘性能

通过 fio 测试 顺序读/写,随机读/写性能。

顺序读 fio -name iops -rw=read -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1 随机读 fio -name iops -rw=randread -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1 顺序写 fio -name iops -rw=write -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1 随机写 fio -name iops -rw=randwrite -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1

更具体的测试可以参考磁盘性能指标--IOPS、吞吐量及测试

RAID

RAID 0

将数据分布在N块盘中,速度最快,可以享受磁盘的并行读取和写入;安全性最低,一块盘损坏,将导致所有数据丢失。

raid0.png

RAID 1

将数据同时保存在N块盘中,写入速度最慢(需要同时写多块盘)。安全性最高。

raid1.png

RAID 10 ?

将RAID 1 和 RAID 0 结合起来,获得高安全性和高性能。最常用的RAID策略。同时也是TiDB,MySQL等数据库推荐的RAID策略。

raid10.png

RAID 5

RAID 5 最低三块盘,存储数据的异或编码,在一块盘损坏时,可以提供编码恢复出数据。

raid5.png

ElasticSearch 使用低速设备的 Tips

修改index.merge.scheduler.max_thread_count参数为1;该参数影响lucene后台的合并线程数量,默认设置只适合SDD。多个合并线程可能导致io压力过大,触发 (linux 120s timeout)[https://cyberdak.github.io/es/2018/07/01/es-force-merge-cause-es-down].

存储策略

  1. 避免单机存储过多数据,如果单机故障,将导致集群需要大量数据,影响集群的吞吐量,特别是发生在高峰时候更会影响业务。千兆网卡每小时可以同步的数据为463gb,可以参考这个速度结合资深集群网卡以及存储来调节每个节点存储的数据量。
  2. 存储有条件使用RAID10,增加单节点性能以及避免单节点存储故障

RAID卡策略

根据服务器RAID卡的等级不同,高级的RAID卡可以使用 write-back 写策略,数据写入会直接写入到缓存中,随后刷新到硬盘上。当主机掉电时,由RAID卡带的电池来保证数据成功写入到硬盘中。write back的设置需要电池有电才能支持,而某些场景可以设置为force write-back(即使电池没电了,也要写缓存),从而提高写入性能。

收起阅读 »

Day 4 - PB级规模数据的Elasticsearch分库分表实践

从2018年7月在开始在某阿里云数据中心部署Elasticsearch软件,到2018年12月共创建了15个集群,服务于客户的文档检索、交通视频检索、地理信息检索、日志安全审计等业务。其中数据规模最大的一个业务,共有800张表,7万亿条数据,每天新增500亿条记录,数据要求存储半年,单条记录大小1KB左右,存储规模约10PB,需要支持1000并发查询。

一、数据存储空间规划。

数据中心能用于搭建Elasticsearch集群的SSD盘共700TB,SATA盘共50PB。根据业务类型、时间范围划分热数据和冷数据,一部分重要数据存储在SSD盘的热数据集群,其它数据存储在SATA盘的冷数据集群。热数据集群主要存储各类实体信息,包括人员、物品、事件、地址、组织数据,以及最新轨迹数据。冷数据集群主要存储历史轨迹信息。热数据和冷数据按照业务拆分多个小集群,每个集群规模保持在50个节点左右,单个集群最大不超过200个节点。利用阿里云平台弹性伸缩的能力,每个Elasticsearch集群可以先从小规模创建,根据资源使用情况来弹性扩展节点规模。

Elasticsearch集群节点配置

pb001.jpg

二、索引设计。

1.索引别名(alias)。每类数据根据数据源表名建立索引(index),索引中只包含一个类型(type)。配置索引别名(alias),业务上根据别名写入、查询数据,索引重建等数据维护操作可以通过别名切换对业务透明。

2.按时间分表。轨迹类数据按时间(日/月)拆分,每个索引存储数据量保持在1TB(10亿)左右,索引名带上日期/月份后缀,拆分后的索引配置别名区分冷热数据。配置索引模板,指定索引分片数和副本数、字段类型、分词器。配置Linux crontab定时任务,通过shell脚本创建索引。

3.分片(shard)设置。索引按照单个分片10-40GB数据大小设计分片数,数据量少于10GB(1000万)的索引设置1个分片即可,数据量大于1TB(10亿)的索引设置分片数为集群节点数整数倍(例如50个节点的集群配置50个分片)。

4.副本(replica)设置。数据首次批量导入时索引副本数设置为0,快速写入数据。生产环境索引副本数设置为1,避免集群节点故障数据丢失。

三、索引mapping设计。

1.精心设计索引字段类型。在开发环境配置Elasticsearch允许自动创建索引,从数据源每张表取1000条记录批量写入Elasticsearch,自动创建索引mapping,然后再根据业务需要修改mapping配置合适的字段类型,指定字段索引分词器、是否存储、是否索引、是否合并至全文检索字段。 对于数据量大的表尤其要精心设计字段类型,尽量减少索引存储空间占用。在生产环境中建议配置不允许自动创建索引。

2.配置全文检索字段。如果业务需要全文检索,可以配置开启全文字段,同时需要占用更多存储空间;如果业务上只是按字段查询,可以配置禁用全文字段,减少存储空间。Elasticsearch5.X及之前的版本默认启用_all字段,合并所有字段的值。Elasticsearch6.X及之后的版本默认禁用_all字段,可以通过copy_to将多个字段值合并成一个全文字段。对于数据查全率要求高的业务场景,建议对全文字段配置cjk分词器(Elasticsearch和Lucene中自带,对中日韩文进行二元分词的分词器)。

3.通用字段统一命名。各个索引中的姓名、证件号码、时间(开始时间、结束时间)、地点(始发地、目的地)等常用字段统一命名。用户指定证件号、时间范围等精确字段查询条件时,可以使用统一的查询条件并行查询多个索引。

四、分词设置。

1.选择合适的分词器。Elasticsearch中内置了很多分词器:standard、cjk、nGram等,也可以安装ik、pinyin等开源分词器, 可以根据业务场景选择合适的分词器。 常用分词器: standard:Elasticsearch默认分词,英文按空格切分,中文按单个汉字切分。 cjk:根据二元索引(两个相邻的字作为一个词条)对中日韩文分词,可以保证查全率。 NGram:可以将英文按照字母切分,结合Elasticsearch的短语搜索(match_phrase)使用。 ik:比较热门的中文分词,能按照中文语义切分,可以自定义词典。 pinyin:可以让用户输入拼音,就能查找到相关的关键词。 对于查全率要求较高的场景,建议使用cjk分词,同时能支持比较快的响应速度。对于查准率要求较高的场景,建议使用ik分词。

CJK分词和IK分词对比(测试环境:Elasticsearch5.5.3,8335万条人员档案信息,10节点集群,单节点16核CPU、64G内存、2T SSD盘,1个线程批量写入,1个并发查询)

pb002.jpg

测试分词效果: curl -XPOST "http://localhost:9200/_analyze" -H 'Content-Type: application/json' -d' { "analyzer": "ik_max_word", "text": "南京市长江大桥" }'

2.NGram分词。对于像车牌号之类数字和字母连在一起的字符,默认会被切成一个完整词条,但是业务上又需要支持前缀、后缀模糊匹配,可以根据业务需求进行分词。车牌号建议增加一个分词字段,配置NGram分词器,切分1元至7元的组合。身份证号码建议增加分词字段,根据业务需要切分18位完整词条、前2位(省)、前4位(省市)、前6位(省市区县)、后4位、出生年月日、出生年份、出生年月、出生月日等组合。

3.单字分词。对于像姓名类字段,业务上需要支持完整匹配,又需要支持单字查询。可以配置1个keyword字段(不分词);1个text字段(分词),分词器选择Elasticsearch默认分词器standard,按单个汉字切分。

五、数据写入策略。

1.批量离线数据导入。各类业务数据源主要在数据仓库MaxCompute(原ODPS),为了把表数据从MaxCompute表导入到ElasticSearch集群中, 我们基于MaxCompute MapReduce开发了MaxCompute到ElasticSearch的数据导出作业,通过简单的配置就可以把数据导入到ElasticSearch中。 数据源在关系数据库RDS或者NoSQL的数据,可以通过配置DataWorks(dataX企业版)导入Elasticsearch集群。

2.实时数据导入。实时数据源主要是流式数据服务DataHub, 配置DataHub任务即可同步至Elasticsearch集群。也可以自己开发程序调用DataHub的SDK获取实时数据,经过业务处理后,调用ES Rest Client SDK批量写入Elasticsearch。

3.冷热数据自动迁移。轨迹类实时数据默认先写入热数据集群(SSD盘Elasticsearch集群),对于热数据集群过期的索引(例如1个月前的索引)需要迁移到冷数据集群(SATA盘Elasticsearch)。为了实现数据跨集群迁移,我们开发了snapshot插件将索引备份到对象存储服务OSS或分布式文件系统盘古。配置定时任务,将热数据集群索引备份后,从冷数据集群恢复,然后再删除热集群中的过期索引,保持热数据集群只存储较小规模数据。冷数据集群的索引如果超过半年,则关闭索引,减少JVM堆内存占用。

4.配置索引主键字段。为了保证Elasticsearch集群和数据源记录的一致性,建议所有索引配置主键字段,而不是让Elasticsearch自动生成主键。配置数据业务主键字段作为Elasticsearch主键字段。如果没有主键字段,则将原始数据能确定记录惟一性的几个字段合并为主键,或者将所有字段值合并起来计算MD5值作为主键。

5.配置写入路由。如果业务上需要经常根据某个字段查询,例如用户ID、车牌号等的字段,写入时可以指定路由字段。

6.写入参数调优。调整数据写入任务参数,避免写入操作占用过多磁盘IO和CPU。使用批量请求,配置合理的写入线程数,调大索引刷新时间间隔refresh interval,调整事务日志translog同步策略。

六、数据查询策略。

1.冷热库异步查询。用户输入关键词查询时,优先从热数据集群查询,有结果立即返回,并估算命中记录条数。热数据集群命中结果集不足时,再查询冷数据集群。

2.跨集群搜索。业务上需要多个Elasticsearch集群一起参与检索时,可以通过Cross Cluster Search同时对多个集群发起检索请求合并检索结果。单独创建一个5节点的Cross Cluster,设置远程集群节点信息,用于跨集群搜索,不存储业务数据。

3.快速返回和超时设置。查询请求中设置参数teminate_after指定每个分片(shard)最多匹配N条记录后返回(例如10000),设置查询超时时间timeout(例如10s),避免查询一些宽泛的条件时耗费过多系统资源。

4.查询语法解析。解析用户查询条件,识别用户的查询类型,例如用户输入车牌号、证件号、年龄段等条件时,查询条件改写为字段精确匹配,无法识别的查询条件默认从全文字段匹配。

5.查询条件调优。查询结果不需要相关度排序时使用过滤器(filter),尽量使用路由(routing),设置较少的查询读取记录条数和字段,避免前缀模糊匹配,设置search_after规避深度翻页性能问题。

七、数据写入、查询性能测试。

SSD盘集群写入性能测试(测试环境:Elasticsearch6.3.2集群,单节点16核CPU、64G内存、2T SSD盘,写入10亿条记录,单条记录1KB,副本数为0,1台写入服务器):

pb003.jpg

SSD盘集群查询性能测试

pb004.jpg

SATA盘集群写入性能测试(测试环境:Elasticsearch5.5.3集群,单节点56核CPU、128G内存、12块 6T SATA盘,分别写入1亿、3亿、5亿、30亿、300亿条记录,单条记录1KB,0副本,50台写入服务器):

pb005.jpg

SATA盘集群查询性能测试

pb006.jpg

参考文档:

  1. 阿里云Elasticsearch帮助文档 https://help.aliyun.com/product/57736.html
  2. Elasticsearch参考 https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
  3. 《Elasticsearch: 权威指南》 https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html
  4. 《深入理解Elasticsearch》https://detail.tmall.com/item.htm?id=551001166567
  5. 《死磕Elasticsearch方法论》https://blog.csdn.net/laoyang360/article/details/79293493
  6. Elasticsearch索引别名和零停机 https://www.elastic.co/guide/cn/elasticsearch/guide/current/index-aliases.html
  7. Elasticsearch自动按天创建索引脚本 https://blog.csdn.net/reblue520/article/details/80553317
  8. Elasticsearch NGram分词器 https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-ngram-tokenizer.html
  9. Elasticsearch开源权限管理认证插件Search Guard https://github.com/floragunncom/search-guard
  10. Elasticsearch开源可视化管理插件cerebro https://github.com/lmenezes/cerebro
  11. Elasticsearch开源SQL插件 https://github.com/NLPchina/elasticsearch-sql
  12. Elasticsearch快照及恢复 https://help.aliyun.com/document_detail/65675.html

Elasticsearch技术交流钉钉群

dingdingpng.png

继续阅读 »

从2018年7月在开始在某阿里云数据中心部署Elasticsearch软件,到2018年12月共创建了15个集群,服务于客户的文档检索、交通视频检索、地理信息检索、日志安全审计等业务。其中数据规模最大的一个业务,共有800张表,7万亿条数据,每天新增500亿条记录,数据要求存储半年,单条记录大小1KB左右,存储规模约10PB,需要支持1000并发查询。

一、数据存储空间规划。

数据中心能用于搭建Elasticsearch集群的SSD盘共700TB,SATA盘共50PB。根据业务类型、时间范围划分热数据和冷数据,一部分重要数据存储在SSD盘的热数据集群,其它数据存储在SATA盘的冷数据集群。热数据集群主要存储各类实体信息,包括人员、物品、事件、地址、组织数据,以及最新轨迹数据。冷数据集群主要存储历史轨迹信息。热数据和冷数据按照业务拆分多个小集群,每个集群规模保持在50个节点左右,单个集群最大不超过200个节点。利用阿里云平台弹性伸缩的能力,每个Elasticsearch集群可以先从小规模创建,根据资源使用情况来弹性扩展节点规模。

Elasticsearch集群节点配置

pb001.jpg

二、索引设计。

1.索引别名(alias)。每类数据根据数据源表名建立索引(index),索引中只包含一个类型(type)。配置索引别名(alias),业务上根据别名写入、查询数据,索引重建等数据维护操作可以通过别名切换对业务透明。

2.按时间分表。轨迹类数据按时间(日/月)拆分,每个索引存储数据量保持在1TB(10亿)左右,索引名带上日期/月份后缀,拆分后的索引配置别名区分冷热数据。配置索引模板,指定索引分片数和副本数、字段类型、分词器。配置Linux crontab定时任务,通过shell脚本创建索引。

3.分片(shard)设置。索引按照单个分片10-40GB数据大小设计分片数,数据量少于10GB(1000万)的索引设置1个分片即可,数据量大于1TB(10亿)的索引设置分片数为集群节点数整数倍(例如50个节点的集群配置50个分片)。

4.副本(replica)设置。数据首次批量导入时索引副本数设置为0,快速写入数据。生产环境索引副本数设置为1,避免集群节点故障数据丢失。

三、索引mapping设计。

1.精心设计索引字段类型。在开发环境配置Elasticsearch允许自动创建索引,从数据源每张表取1000条记录批量写入Elasticsearch,自动创建索引mapping,然后再根据业务需要修改mapping配置合适的字段类型,指定字段索引分词器、是否存储、是否索引、是否合并至全文检索字段。 对于数据量大的表尤其要精心设计字段类型,尽量减少索引存储空间占用。在生产环境中建议配置不允许自动创建索引。

2.配置全文检索字段。如果业务需要全文检索,可以配置开启全文字段,同时需要占用更多存储空间;如果业务上只是按字段查询,可以配置禁用全文字段,减少存储空间。Elasticsearch5.X及之前的版本默认启用_all字段,合并所有字段的值。Elasticsearch6.X及之后的版本默认禁用_all字段,可以通过copy_to将多个字段值合并成一个全文字段。对于数据查全率要求高的业务场景,建议对全文字段配置cjk分词器(Elasticsearch和Lucene中自带,对中日韩文进行二元分词的分词器)。

3.通用字段统一命名。各个索引中的姓名、证件号码、时间(开始时间、结束时间)、地点(始发地、目的地)等常用字段统一命名。用户指定证件号、时间范围等精确字段查询条件时,可以使用统一的查询条件并行查询多个索引。

四、分词设置。

1.选择合适的分词器。Elasticsearch中内置了很多分词器:standard、cjk、nGram等,也可以安装ik、pinyin等开源分词器, 可以根据业务场景选择合适的分词器。 常用分词器: standard:Elasticsearch默认分词,英文按空格切分,中文按单个汉字切分。 cjk:根据二元索引(两个相邻的字作为一个词条)对中日韩文分词,可以保证查全率。 NGram:可以将英文按照字母切分,结合Elasticsearch的短语搜索(match_phrase)使用。 ik:比较热门的中文分词,能按照中文语义切分,可以自定义词典。 pinyin:可以让用户输入拼音,就能查找到相关的关键词。 对于查全率要求较高的场景,建议使用cjk分词,同时能支持比较快的响应速度。对于查准率要求较高的场景,建议使用ik分词。

CJK分词和IK分词对比(测试环境:Elasticsearch5.5.3,8335万条人员档案信息,10节点集群,单节点16核CPU、64G内存、2T SSD盘,1个线程批量写入,1个并发查询)

pb002.jpg

测试分词效果: curl -XPOST "http://localhost:9200/_analyze" -H 'Content-Type: application/json' -d' { "analyzer": "ik_max_word", "text": "南京市长江大桥" }'

2.NGram分词。对于像车牌号之类数字和字母连在一起的字符,默认会被切成一个完整词条,但是业务上又需要支持前缀、后缀模糊匹配,可以根据业务需求进行分词。车牌号建议增加一个分词字段,配置NGram分词器,切分1元至7元的组合。身份证号码建议增加分词字段,根据业务需要切分18位完整词条、前2位(省)、前4位(省市)、前6位(省市区县)、后4位、出生年月日、出生年份、出生年月、出生月日等组合。

3.单字分词。对于像姓名类字段,业务上需要支持完整匹配,又需要支持单字查询。可以配置1个keyword字段(不分词);1个text字段(分词),分词器选择Elasticsearch默认分词器standard,按单个汉字切分。

五、数据写入策略。

1.批量离线数据导入。各类业务数据源主要在数据仓库MaxCompute(原ODPS),为了把表数据从MaxCompute表导入到ElasticSearch集群中, 我们基于MaxCompute MapReduce开发了MaxCompute到ElasticSearch的数据导出作业,通过简单的配置就可以把数据导入到ElasticSearch中。 数据源在关系数据库RDS或者NoSQL的数据,可以通过配置DataWorks(dataX企业版)导入Elasticsearch集群。

2.实时数据导入。实时数据源主要是流式数据服务DataHub, 配置DataHub任务即可同步至Elasticsearch集群。也可以自己开发程序调用DataHub的SDK获取实时数据,经过业务处理后,调用ES Rest Client SDK批量写入Elasticsearch。

3.冷热数据自动迁移。轨迹类实时数据默认先写入热数据集群(SSD盘Elasticsearch集群),对于热数据集群过期的索引(例如1个月前的索引)需要迁移到冷数据集群(SATA盘Elasticsearch)。为了实现数据跨集群迁移,我们开发了snapshot插件将索引备份到对象存储服务OSS或分布式文件系统盘古。配置定时任务,将热数据集群索引备份后,从冷数据集群恢复,然后再删除热集群中的过期索引,保持热数据集群只存储较小规模数据。冷数据集群的索引如果超过半年,则关闭索引,减少JVM堆内存占用。

4.配置索引主键字段。为了保证Elasticsearch集群和数据源记录的一致性,建议所有索引配置主键字段,而不是让Elasticsearch自动生成主键。配置数据业务主键字段作为Elasticsearch主键字段。如果没有主键字段,则将原始数据能确定记录惟一性的几个字段合并为主键,或者将所有字段值合并起来计算MD5值作为主键。

5.配置写入路由。如果业务上需要经常根据某个字段查询,例如用户ID、车牌号等的字段,写入时可以指定路由字段。

6.写入参数调优。调整数据写入任务参数,避免写入操作占用过多磁盘IO和CPU。使用批量请求,配置合理的写入线程数,调大索引刷新时间间隔refresh interval,调整事务日志translog同步策略。

六、数据查询策略。

1.冷热库异步查询。用户输入关键词查询时,优先从热数据集群查询,有结果立即返回,并估算命中记录条数。热数据集群命中结果集不足时,再查询冷数据集群。

2.跨集群搜索。业务上需要多个Elasticsearch集群一起参与检索时,可以通过Cross Cluster Search同时对多个集群发起检索请求合并检索结果。单独创建一个5节点的Cross Cluster,设置远程集群节点信息,用于跨集群搜索,不存储业务数据。

3.快速返回和超时设置。查询请求中设置参数teminate_after指定每个分片(shard)最多匹配N条记录后返回(例如10000),设置查询超时时间timeout(例如10s),避免查询一些宽泛的条件时耗费过多系统资源。

4.查询语法解析。解析用户查询条件,识别用户的查询类型,例如用户输入车牌号、证件号、年龄段等条件时,查询条件改写为字段精确匹配,无法识别的查询条件默认从全文字段匹配。

5.查询条件调优。查询结果不需要相关度排序时使用过滤器(filter),尽量使用路由(routing),设置较少的查询读取记录条数和字段,避免前缀模糊匹配,设置search_after规避深度翻页性能问题。

七、数据写入、查询性能测试。

SSD盘集群写入性能测试(测试环境:Elasticsearch6.3.2集群,单节点16核CPU、64G内存、2T SSD盘,写入10亿条记录,单条记录1KB,副本数为0,1台写入服务器):

pb003.jpg

SSD盘集群查询性能测试

pb004.jpg

SATA盘集群写入性能测试(测试环境:Elasticsearch5.5.3集群,单节点56核CPU、128G内存、12块 6T SATA盘,分别写入1亿、3亿、5亿、30亿、300亿条记录,单条记录1KB,0副本,50台写入服务器):

pb005.jpg

SATA盘集群查询性能测试

pb006.jpg

参考文档:

  1. 阿里云Elasticsearch帮助文档 https://help.aliyun.com/product/57736.html
  2. Elasticsearch参考 https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
  3. 《Elasticsearch: 权威指南》 https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html
  4. 《深入理解Elasticsearch》https://detail.tmall.com/item.htm?id=551001166567
  5. 《死磕Elasticsearch方法论》https://blog.csdn.net/laoyang360/article/details/79293493
  6. Elasticsearch索引别名和零停机 https://www.elastic.co/guide/cn/elasticsearch/guide/current/index-aliases.html
  7. Elasticsearch自动按天创建索引脚本 https://blog.csdn.net/reblue520/article/details/80553317
  8. Elasticsearch NGram分词器 https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-ngram-tokenizer.html
  9. Elasticsearch开源权限管理认证插件Search Guard https://github.com/floragunncom/search-guard
  10. Elasticsearch开源可视化管理插件cerebro https://github.com/lmenezes/cerebro
  11. Elasticsearch开源SQL插件 https://github.com/NLPchina/elasticsearch-sql
  12. Elasticsearch快照及恢复 https://help.aliyun.com/document_detail/65675.html

Elasticsearch技术交流钉钉群

dingdingpng.png

收起阅读 »