好的想法是十分钱一打,真正无价的是能够实现这些想法的人。

ClusterBlockException SERVICE_UNAVAILABLE/1/state

截图.PNG
图中为_cluster/health 和_cat/pending_tasks的执行结果,现在ES节点全部故障了,请教下如何恢复?   gateway.expected_nodes配置的为节点个数的80% 

 Caused by: org.elasticsearch.discovery.MasterNotDiscoveredException: ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction$4.onTimeout(TransportMasterNodeAction.java:213) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onTimeout(ClusterStateObserver.java:317) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onTimeout(ClusterStateObserver.java:244) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.service.ClusterApplierService$NotifyTimeout.run(ClusterApplierService.java:578) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.3.jar:6.1.3]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
        Caused by: org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];
                at org.elasticsearch.cluster.block.ClusterBlocks.indexBlockedException(ClusterBlocks.java:182) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:64) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:39) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:135) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.start(TransportMasterNodeAction.java:127) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:105) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:55) ~[elasticsearch-6.1.3.jar:6.1.3]
继续阅读 »

截图.PNG
图中为_cluster/health 和_cat/pending_tasks的执行结果,现在ES节点全部故障了,请教下如何恢复?   gateway.expected_nodes配置的为节点个数的80% 

 Caused by: org.elasticsearch.discovery.MasterNotDiscoveredException: ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction$4.onTimeout(TransportMasterNodeAction.java:213) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onTimeout(ClusterStateObserver.java:317) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onTimeout(ClusterStateObserver.java:244) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.service.ClusterApplierService$NotifyTimeout.run(ClusterApplierService.java:578) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.3.jar:6.1.3]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
        Caused by: org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];
                at org.elasticsearch.cluster.block.ClusterBlocks.indexBlockedException(ClusterBlocks.java:182) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:64) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:39) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:135) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.start(TransportMasterNodeAction.java:127) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:105) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:55) ~[elasticsearch-6.1.3.jar:6.1.3]
收起阅读 »

社区日报 第529期 (2019-02-03)

1.在Elasticsearch数据库上公开2400万个信用和抵押记录。
http://t.cn/EtdTYAs
2.使用Elasticsearch构建长期安全运营平台。
http://t.cn/EtdHXAV
3.(自备梯子)飞行汽车比您想象的更接近现实。
http://t.cn/EtdQtfV

编辑:至尊宝
归档:https://elasticsearch.cn/article/6356
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.在Elasticsearch数据库上公开2400万个信用和抵押记录。
http://t.cn/EtdTYAs
2.使用Elasticsearch构建长期安全运营平台。
http://t.cn/EtdHXAV
3.(自备梯子)飞行汽车比您想象的更接近现实。
http://t.cn/EtdQtfV

编辑:至尊宝
归档:https://elasticsearch.cn/article/6356
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第528期 (2019-02-02)

  1. 使用ELK实时展示python日志(自备梯子) http://t.cn/EtnXpxL

2.使用logstash的grok自定义数据格式(自备梯子) http://t.cn/Etna8mi

  1. docker部署ELK监控日志 http://t.cn/EtnTn3O
继续阅读 »
  1. 使用ELK实时展示python日志(自备梯子) http://t.cn/EtnXpxL

2.使用logstash的grok自定义数据格式(自备梯子) http://t.cn/Etna8mi

  1. docker部署ELK监控日志 http://t.cn/EtnTn3O
收起阅读 »

社区日报 第527期 (2019-02-01)

1、Spring Boot  + Elasticsearch开源实现
http://t.cn/EtRjzdC
2、docker一键搭建elasticsearch6.5.0集群
http://t.cn/EtRjy4M
3、深入研究Elasticsearch聚合的性能
http://t.cn/EtRjtEe

编辑:铭毅天下
归档:https://elasticsearch.cn/article/6354
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1、Spring Boot  + Elasticsearch开源实现
http://t.cn/EtRjzdC
2、docker一键搭建elasticsearch6.5.0集群
http://t.cn/EtRjy4M
3、深入研究Elasticsearch聚合的性能
http://t.cn/EtRjtEe

编辑:铭毅天下
归档:https://elasticsearch.cn/article/6354
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第526期 (2019-01-31)

1.Elastic Stack 6.6.0发布
https://www.elastic.co/blog/el ... eased
2.基于Lucene查询原理分析Elasticsearch的性能
http://t.cn/EwZO5to
3.基于ELK的uber实时预测引擎
http://t.cn/Et0qg2x

编辑:金桥
归档:https://elasticsearch.cn/article/6353
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.Elastic Stack 6.6.0发布
https://www.elastic.co/blog/el ... eased
2.基于Lucene查询原理分析Elasticsearch的性能
http://t.cn/EwZO5to
3.基于ELK的uber实时预测引擎
http://t.cn/Et0qg2x

编辑:金桥
归档:https://elasticsearch.cn/article/6353
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第525期 (2019-01-30)

1.Elasticsearch DSL 的 Rust 封装
http://t.cn/EtpIMiG
2.Elasticsearch & Hadoop
http://t.cn/EtpImun
3.Lucene索引结构漫谈
http://t.cn/Re1Dp6g

编辑:江水
归档:https://elasticsearch.cn/article/6352
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.Elasticsearch DSL 的 Rust 封装
http://t.cn/EtpIMiG
2.Elasticsearch & Hadoop
http://t.cn/EtpImun
3.Lucene索引结构漫谈
http://t.cn/Re1Dp6g

编辑:江水
归档:https://elasticsearch.cn/article/6352
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第524期 (2019-01-29)

1、Elasticsearch分布式一致性原理分析之元数据。
http://t.cn/EtbgJxx
2、使用Amazon Elasticsearch Service和AWS Lambda对PCI-DSS感知进行警报、监控和报告。
http://t.cn/Etbgorf
3、Node.js和浏览器官方Elasticsearch客户端介绍。
http://t.cn/Etbg04v

编辑:叮咚光军
归档:https://elasticsearch.cn/article/6351
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1、Elasticsearch分布式一致性原理分析之元数据。
http://t.cn/EtbgJxx
2、使用Amazon Elasticsearch Service和AWS Lambda对PCI-DSS感知进行警报、监控和报告。
http://t.cn/Etbgorf
3、Node.js和浏览器官方Elasticsearch客户端介绍。
http://t.cn/Etbg04v

编辑:叮咚光军
归档:https://elasticsearch.cn/article/6351
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第523期 (2019-01-28)

1.大数据搜索选开源还是商业软件?ElasticSearch 对比 Splunk
http://t.cn/EtVI8va
2.ELK做数据挖掘的优缺点
http://t.cn/EtVMwlq
3.导入CSV 和 日志文件到ES中,并使用可视化展示
http://t.cn/EtVM6TN

编辑:cyberdak
归档:https://elasticsearch.cn/article/6350
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.大数据搜索选开源还是商业软件?ElasticSearch 对比 Splunk
http://t.cn/EtVI8va
2.ELK做数据挖掘的优缺点
http://t.cn/EtVMwlq
3.导入CSV 和 日志文件到ES中,并使用可视化展示
http://t.cn/EtVM6TN

编辑:cyberdak
归档:https://elasticsearch.cn/article/6350
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

Hive 与 ElasticSearch 的数据交互

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

其他通用字段的说明请参考文章:使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章:https://www.w3schools.com/xml/schema_dtypes_date.asp

为了兼容自定义的日期格式,需要编写自定义的日期读取类:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

其他通用字段的说明请参考文章:使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章:https://www.w3schools.com/xml/schema_dtypes_date.asp

为了兼容自定义的日期格式,需要编写自定义的日期读取类:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

社区日报 第522期 (2019-01-27)

1.在现有Elasticsearch集群中添加或删除节点。
http://t.cn/EtUHo2i
2.运行400+节点Elasticsearch集群。
http://t.cn/EtUHqmF
3.(自备梯子)亚马逊变得比你意识到的更强大。
http://t.cn/EtUHBLe

编辑:至尊宝
归档:https://elasticsearch.cn/article/6348
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.在现有Elasticsearch集群中添加或删除节点。
http://t.cn/EtUHo2i
2.运行400+节点Elasticsearch集群。
http://t.cn/EtUHqmF
3.(自备梯子)亚马逊变得比你意识到的更强大。
http://t.cn/EtUHBLe

编辑:至尊宝
归档:https://elasticsearch.cn/article/6348
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第521期 (2019-01-26)

  1. ES使用中遇到的多种坑,以及解决方案。 http://t.cn/Etzlrca

2.使用Python ES Kibana构建的实时异常检测开源框架。 http://t.cn/R8Vu2q3

  1. Golang操作elasticsearch。 http://t.cn/EtzjhkJ
继续阅读 »
  1. ES使用中遇到的多种坑,以及解决方案。 http://t.cn/Etzlrca

2.使用Python ES Kibana构建的实时异常检测开源框架。 http://t.cn/R8Vu2q3

  1. Golang操作elasticsearch。 http://t.cn/EtzjhkJ
收起阅读 »

访谈:Elasticsearch在360企业安全集团的应用实践

欢迎来到Elastic社区电台的第十期节目,本期我们节目的嘉宾是来自于360企业安全集团的资深研发工程师张超和段军义,张超也是 Elasticsearch 相关新书《Elasticsearch 源码解析与优化实战》的作者,让我们一起走进360企业安全的大数据团队,了解他们是如何使用 Elasticsearch 来解决公司内部和外部客户的各类需求,以及在实践过程中的经验分享。

收听地址

WechatIMG120.jpeg

时间线

  • 00:20 - 嘉宾介绍
  • 02:10 - 张超的新书《Elasticsearch 源码解析与优化实战》
  • 04:21 - 360 与 Elasticsearch 的故事
  • 05:25 - Elasticsearch 在 360 的应用场景介绍
  • 07:30 - 产品选型与比较
  • 08:20 - 有关版本升级的经验分享
  • 12:00 - 典型场景的数据规模及配置情况
  • 15:00 - 360 基于 Elasticsearch 的源码优化
  • 21:00 - Frozen Index 与按需加载的需求
  • 22:00 - Rollup 数据上卷的功能介绍
  • 26:00 - 关于如何学习 Elasticsearch 的经验分享
  • 29:00 - Elasticsearch 踩坑故事分享
  • 37:00 - 关于入库与副本同步的改进问题讨论
  • 30:30 - 关于异地多活与 CCR 的介绍
  • 41:00 - 关于 Elasticsearch 相关职位的要求
  • 42:00 - 360 团队对于 Elastic 未来的期望
  • 43:50 - 有关目前 Elasticsearch 的痛点
  • 46:00 - 有关 Rollover 的功能介绍
  • 48:00 - 尾声

嘉宾

  • 张超,大数据平台内核资深研发工程师,《Elasticsearch源码解析与优化实战》作者,就职于360企业安全集团,在基础大数据平台部门负责 Elasticsearch 内核研发工作,喜欢研究底层原理与系统优化,尤其喜欢解决深层次的问题。

  • 段军义,大数据平台资深开发工程师,就职于360企业安全集团,主要负责Elasticsearch研发,喜欢研究Java、Linux、搜索相关技术。

主持人

Elastic 技术布道师,曾勇(Medcl)。

关于360企业安全

360企业安全创建于2012年,是360公司继个人安全市场后致力服务于政府企业机构网络安全与信息安全管理的安全产品业务线。作为中国互联网安全的领导者,360凭借在PC终端安全、移动终端安全、桌面安全管理、大数据分析、云安全等方面的深厚积累,成功推出各行业用户所急需的终端安全与管理一体化、未知高级威胁检测、移动终端安全管理、大数据安全分析等多项创新型企业安全产品。

关于Elastic社区电台

Elastic 开源社区举办的一款播客类节目, 邀请来自开源社区的用户,一起聊聊 Elastic 开源产品的使用案例、经验分享、架构变迁等等。

相关链接

继续阅读 »

欢迎来到Elastic社区电台的第十期节目,本期我们节目的嘉宾是来自于360企业安全集团的资深研发工程师张超和段军义,张超也是 Elasticsearch 相关新书《Elasticsearch 源码解析与优化实战》的作者,让我们一起走进360企业安全的大数据团队,了解他们是如何使用 Elasticsearch 来解决公司内部和外部客户的各类需求,以及在实践过程中的经验分享。

收听地址

WechatIMG120.jpeg

时间线

  • 00:20 - 嘉宾介绍
  • 02:10 - 张超的新书《Elasticsearch 源码解析与优化实战》
  • 04:21 - 360 与 Elasticsearch 的故事
  • 05:25 - Elasticsearch 在 360 的应用场景介绍
  • 07:30 - 产品选型与比较
  • 08:20 - 有关版本升级的经验分享
  • 12:00 - 典型场景的数据规模及配置情况
  • 15:00 - 360 基于 Elasticsearch 的源码优化
  • 21:00 - Frozen Index 与按需加载的需求
  • 22:00 - Rollup 数据上卷的功能介绍
  • 26:00 - 关于如何学习 Elasticsearch 的经验分享
  • 29:00 - Elasticsearch 踩坑故事分享
  • 37:00 - 关于入库与副本同步的改进问题讨论
  • 30:30 - 关于异地多活与 CCR 的介绍
  • 41:00 - 关于 Elasticsearch 相关职位的要求
  • 42:00 - 360 团队对于 Elastic 未来的期望
  • 43:50 - 有关目前 Elasticsearch 的痛点
  • 46:00 - 有关 Rollover 的功能介绍
  • 48:00 - 尾声

嘉宾

  • 张超,大数据平台内核资深研发工程师,《Elasticsearch源码解析与优化实战》作者,就职于360企业安全集团,在基础大数据平台部门负责 Elasticsearch 内核研发工作,喜欢研究底层原理与系统优化,尤其喜欢解决深层次的问题。

  • 段军义,大数据平台资深开发工程师,就职于360企业安全集团,主要负责Elasticsearch研发,喜欢研究Java、Linux、搜索相关技术。

主持人

Elastic 技术布道师,曾勇(Medcl)。

关于360企业安全

360企业安全创建于2012年,是360公司继个人安全市场后致力服务于政府企业机构网络安全与信息安全管理的安全产品业务线。作为中国互联网安全的领导者,360凭借在PC终端安全、移动终端安全、桌面安全管理、大数据分析、云安全等方面的深厚积累,成功推出各行业用户所急需的终端安全与管理一体化、未知高级威胁检测、移动终端安全管理、大数据安全分析等多项创新型企业安全产品。

关于Elastic社区电台

Elastic 开源社区举办的一款播客类节目, 邀请来自开源社区的用户,一起聊聊 Elastic 开源产品的使用案例、经验分享、架构变迁等等。

相关链接

收起阅读 »

社区日报 第520期 (2019-01-25)

1、帮助识别公开 Elasticsearch 服务器上的敏感信息开源项目
http://t.cn/EGszEDf
2、Elasticsearch + neo4j图搜索
http://t.cn/E53j9ZA
3、支持6.X版本的Elasticsearch PHP客户端
http://t.cn/R5IqIG6

编辑:铭毅天下
归档:https://elasticsearch.cn/article/6345
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1、帮助识别公开 Elasticsearch 服务器上的敏感信息开源项目
http://t.cn/EGszEDf
2、Elasticsearch + neo4j图搜索
http://t.cn/E53j9ZA
3、支持6.X版本的Elasticsearch PHP客户端
http://t.cn/R5IqIG6

编辑:铭毅天下
归档:https://elasticsearch.cn/article/6345
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第519期 (2019-01-24)

1.Elasticsearch搜索Suggest功能优化
http://t.cn/E5BoAmR
2.Elasticsearch translog文件介绍
http://t.cn/E5BoqXd
3.Elasticsearch结合百度地图实现区域查询检索
http://t.cn/E5Bofgd

编辑:金桥
归档:https://elasticsearch.cn/article/6344
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.Elasticsearch搜索Suggest功能优化
http://t.cn/E5BoAmR
2.Elasticsearch translog文件介绍
http://t.cn/E5BoqXd
3.Elasticsearch结合百度地图实现区域查询检索
http://t.cn/E5Bofgd

编辑:金桥
归档:https://elasticsearch.cn/article/6344
订阅:https://tinyletter.com/elastic-daily 收起阅读 »