使用 shuf 来打乱一个文件中的行或是选择文件中一个随机的行。
mysql_2_elasticsearch

mysql_2_elasticsearch

logstash5.5.0版本同步mysql数据到es5.5.0版本,怎么秒级别的同步?

Logstashrochy 回复了问题 • 2 人关注 • 1 个回复 • 2494 次浏览 • 2018-11-13 20:59 • 来自相关话题

elasticsearch同步性能问题

Elasticsearchkennywu76 回复了问题 • 5 人关注 • 3 个回复 • 3707 次浏览 • 2018-01-16 12:06 • 来自相关话题

用logstash-input-jdbc从MySQL向ES导入数据,发现mysql中timestamp的格式都被改变了,如何解决?

默认分类rommr 回复了问题 • 4 人关注 • 2 个回复 • 4333 次浏览 • 2017-12-13 10:53 • 来自相关话题

logstash-input-jdbc 数据已经导入完成 不退出一直循环导入

LogstashAoseala 回复了问题 • 4 人关注 • 4 个回复 • 7570 次浏览 • 2017-06-21 18:04 • 来自相关话题

elasticsearch-jdbc从mysql导入的读取顺序是??

Elasticsearchlaoyang360 回复了问题 • 3 人关注 • 2 个回复 • 4219 次浏览 • 2017-03-31 07:12 • 来自相关话题

Elasticsearch jdbc从mysql导入数据,到了一半shell脚本中断。

ElasticsearchXargin 回复了问题 • 3 人关注 • 1 个回复 • 5502 次浏览 • 2017-01-10 22:01 • 来自相关话题

可定制的 elasticsearch 数据导入工具 ——mysql_2_elasticsearch

Elasticsearchparksben 发表了文章 • 7 个评论 • 7005 次浏览 • 2016-12-13 23:55 • 来自相关话题

A customizable importer from mysql to elasticsearch. 可定制的 elasticsearch 数据导入工具 ——基于 elasticsearch 的 JS API     【github 项目地址】 https://github.com/parksben/mysql_2_elasticsearch  【主要功能】 1. 完全使用 JS 实现数据从 MySQL 到 elasticsearch 的迁移; 2. 可批量导入多张 MySQL 表; 3. 可自定义的数据迁移规则(数据表/字段关系、字段过滤、使用正则进行数据处理); 4. 可自定义的异步分片导入方式,数据导入效率更高。 【一键安装】
npm install mysql_2_elasticsearch
【快速开始(简单用例)】
var esMysqlRiver = require('mysql_2_elasticsearch');

var river_config = {
  mysql: {
    host: '127.0.0.1',
    user: 'root',
    password: 'root',
    database: 'users',
    port: 3306
  },
  elasticsearch: {
    host_config: {               // es客户端的配置参数
      host: 'localhost:9200',
      // log: 'trace'
    },
    index: 'myIndex'
  },
  riverMap: {
    'users => users': {}         // 将数据表 users 导入到 es 类型: /myIndex/users
  }
};


/*
** 以下代码内容:
** 通过 esMysqlRiver 方法进行数据传输,方法的回调参数(一个JSON对象) obj 包含此次数据传输的结果
** 其中:
** 1. obj.total    => 需要传输的数据表数量
** 2. obj.success  => 传输成功的数据表数量
** 3. obj.failed   => 传输失败的数据表数量
** 4. obj.result   => 本次数据传输的结论
*/

esMysqlRiver(river_config, function(obj) {
  /* 将传输结果打印到终端 */
  console.log('\n---------------------------------');
  console.log('总传送:' + obj.total + '项');
  console.log('成功:' + obj.success + '项');
  console.log('失败:' + obj.failed + '项');
  if (obj.result == 'success') {
    console.log('\n结论:全部数据传送完成!');
  } else {
    console.log('\n结论:传送未成功...');
  }
  console.log('---------------------------------');
  console.log('\n(使用 Ctrl + C 退出进程)');
  /* 将传输结果打印到终端 */
});
【最佳实现(完整用例)】
var esMysqlRiver = require('mysql_2_elasticsearch');

/*
** mysql_2_elasticsearch 的相关参数配置(详情见注释)
*/

var river_config = {

  /* [必需] MySQL数据库的相关参数(根据实际情况进行修改) */
  mysql: {
    host: '127.0.0.1',
    user: 'root',
    password: 'root',
    database: 'users',
    port: 3306
  },

  /* [必需] es 相关参数(根据实际情况进行修改) */
  elasticsearch: {
    host_config: {               // [必需] host_config 即 es客户端的配置参数,详细配置参考 es官方文档
      host: 'localhost:9200',
      log: 'trace',
      // Other options...
    },
    index: 'myIndex',            // [必需] es 索引名
    chunkSize: 8000,             // [非必需] 单分片最大数据量,默认为 5000 (条数据)
    timeout: '2m'                // [非必需] 单次分片请求的超时时间,默认为 1m
    //(注意:此 timeout 并非es客户端请求的timeout,后者请在 host_config 中设置)
  },

  /* [必需] 数据传送的规则 */
  riverMap: {
    'users => users': {            // [必需] 'a => b' 表示将 mysql数据库中名为 'a' 的 table 的所有数据 输送到 es中名为 'b' 的 type 中去
      filter_out: [                // [非必需] 需要过滤的字段名,即 filter_out 中的设置的所有字段将不会被导入 elasticsearch 的数据中
        'password',
        'age'
      ],
      exception_handler: {           // [非必需] 异常处理器,使用JS正则表达式处理异常数据,避免 es 入库时由于类型不合法造成数据缺失
        'birthday': [                // [示例] 对 users 表的 birthday 字段的异常数据进行处理
          {
            match: /NaN/gi,          // [示例] 正则条件(此例匹配字段值为 "NaN" 的情况)
            writeAs: null            // [示例] 将 "NaN" 重写为 null
          },
          {
            match: /(\d{4})年/gi,    // [示例] 正则表达式(此例匹配字段值为形如 "2016年" 的情况)
            writeAs: '$1.1'          // [示例] 将 "2015年" 样式的数据重写为 "2016.1" 样式的数据
          }
        ]
      }
    },
    // Other fields' options...
  }

};


/*
** 将传输结果打印到终端
*/

esMysqlRiver(river_config, function(obj) {
  console.log('\n---------------------------------');
  console.log('总传送:' + obj.total + '项');
  console.log('成功:' + obj.success + '项');
  console.log('失败:' + obj.failed + '项');
  if (obj.result == 'success') {
    console.log('\n结论:全部数据传送完成!');
  } else {
    console.log('\n结论:传送未成功...');
  }
  console.log('---------------------------------');
  console.log('\n(使用 Ctrl + C 退出进程)');
});
【注意事项及参考】 1. elasticsearch数据导入前请先配置好数据的 mapping; 2. "host_config" 更多参数设置详见 [es官方API文档] https://www.elastic.co/guide/e ... .html 3. mysql 表的自增 id 将自动替换为 "表名+_id" 的格式,如:"users_id"; 4. 如出现数据缺失情况,请注意查看 elasticsearch 终端进程或日志,找出未成功导入的数据,通过设置 exception_handler 参数处理它。

logstash5.5.0版本同步mysql数据到es5.5.0版本,怎么秒级别的同步?

回复

Logstashrochy 回复了问题 • 2 人关注 • 1 个回复 • 2494 次浏览 • 2018-11-13 20:59 • 来自相关话题

elasticsearch同步性能问题

回复

Elasticsearchkennywu76 回复了问题 • 5 人关注 • 3 个回复 • 3707 次浏览 • 2018-01-16 12:06 • 来自相关话题

用logstash-input-jdbc从MySQL向ES导入数据,发现mysql中timestamp的格式都被改变了,如何解决?

回复

默认分类rommr 回复了问题 • 4 人关注 • 2 个回复 • 4333 次浏览 • 2017-12-13 10:53 • 来自相关话题

logstash-input-jdbc 数据已经导入完成 不退出一直循环导入

回复

LogstashAoseala 回复了问题 • 4 人关注 • 4 个回复 • 7570 次浏览 • 2017-06-21 18:04 • 来自相关话题

elasticsearch-jdbc从mysql导入的读取顺序是??

回复

Elasticsearchlaoyang360 回复了问题 • 3 人关注 • 2 个回复 • 4219 次浏览 • 2017-03-31 07:12 • 来自相关话题

Elasticsearch jdbc从mysql导入数据,到了一半shell脚本中断。

回复

ElasticsearchXargin 回复了问题 • 3 人关注 • 1 个回复 • 5502 次浏览 • 2017-01-10 22:01 • 来自相关话题

可定制的 elasticsearch 数据导入工具 ——mysql_2_elasticsearch

Elasticsearchparksben 发表了文章 • 7 个评论 • 7005 次浏览 • 2016-12-13 23:55 • 来自相关话题

A customizable importer from mysql to elasticsearch. 可定制的 elasticsearch 数据导入工具 ——基于 elasticsearch 的 JS API     【github 项目地址】 https://github.com/parksben/mysql_2_elasticsearch  【主要功能】 1. 完全使用 JS 实现数据从 MySQL 到 elasticsearch 的迁移; 2. 可批量导入多张 MySQL 表; 3. 可自定义的数据迁移规则(数据表/字段关系、字段过滤、使用正则进行数据处理); 4. 可自定义的异步分片导入方式,数据导入效率更高。 【一键安装】
npm install mysql_2_elasticsearch
【快速开始(简单用例)】
var esMysqlRiver = require('mysql_2_elasticsearch');

var river_config = {
  mysql: {
    host: '127.0.0.1',
    user: 'root',
    password: 'root',
    database: 'users',
    port: 3306
  },
  elasticsearch: {
    host_config: {               // es客户端的配置参数
      host: 'localhost:9200',
      // log: 'trace'
    },
    index: 'myIndex'
  },
  riverMap: {
    'users => users': {}         // 将数据表 users 导入到 es 类型: /myIndex/users
  }
};


/*
** 以下代码内容:
** 通过 esMysqlRiver 方法进行数据传输,方法的回调参数(一个JSON对象) obj 包含此次数据传输的结果
** 其中:
** 1. obj.total    => 需要传输的数据表数量
** 2. obj.success  => 传输成功的数据表数量
** 3. obj.failed   => 传输失败的数据表数量
** 4. obj.result   => 本次数据传输的结论
*/

esMysqlRiver(river_config, function(obj) {
  /* 将传输结果打印到终端 */
  console.log('\n---------------------------------');
  console.log('总传送:' + obj.total + '项');
  console.log('成功:' + obj.success + '项');
  console.log('失败:' + obj.failed + '项');
  if (obj.result == 'success') {
    console.log('\n结论:全部数据传送完成!');
  } else {
    console.log('\n结论:传送未成功...');
  }
  console.log('---------------------------------');
  console.log('\n(使用 Ctrl + C 退出进程)');
  /* 将传输结果打印到终端 */
});
【最佳实现(完整用例)】
var esMysqlRiver = require('mysql_2_elasticsearch');

/*
** mysql_2_elasticsearch 的相关参数配置(详情见注释)
*/

var river_config = {

  /* [必需] MySQL数据库的相关参数(根据实际情况进行修改) */
  mysql: {
    host: '127.0.0.1',
    user: 'root',
    password: 'root',
    database: 'users',
    port: 3306
  },

  /* [必需] es 相关参数(根据实际情况进行修改) */
  elasticsearch: {
    host_config: {               // [必需] host_config 即 es客户端的配置参数,详细配置参考 es官方文档
      host: 'localhost:9200',
      log: 'trace',
      // Other options...
    },
    index: 'myIndex',            // [必需] es 索引名
    chunkSize: 8000,             // [非必需] 单分片最大数据量,默认为 5000 (条数据)
    timeout: '2m'                // [非必需] 单次分片请求的超时时间,默认为 1m
    //(注意:此 timeout 并非es客户端请求的timeout,后者请在 host_config 中设置)
  },

  /* [必需] 数据传送的规则 */
  riverMap: {
    'users => users': {            // [必需] 'a => b' 表示将 mysql数据库中名为 'a' 的 table 的所有数据 输送到 es中名为 'b' 的 type 中去
      filter_out: [                // [非必需] 需要过滤的字段名,即 filter_out 中的设置的所有字段将不会被导入 elasticsearch 的数据中
        'password',
        'age'
      ],
      exception_handler: {           // [非必需] 异常处理器,使用JS正则表达式处理异常数据,避免 es 入库时由于类型不合法造成数据缺失
        'birthday': [                // [示例] 对 users 表的 birthday 字段的异常数据进行处理
          {
            match: /NaN/gi,          // [示例] 正则条件(此例匹配字段值为 "NaN" 的情况)
            writeAs: null            // [示例] 将 "NaN" 重写为 null
          },
          {
            match: /(\d{4})年/gi,    // [示例] 正则表达式(此例匹配字段值为形如 "2016年" 的情况)
            writeAs: '$1.1'          // [示例] 将 "2015年" 样式的数据重写为 "2016.1" 样式的数据
          }
        ]
      }
    },
    // Other fields' options...
  }

};


/*
** 将传输结果打印到终端
*/

esMysqlRiver(river_config, function(obj) {
  console.log('\n---------------------------------');
  console.log('总传送:' + obj.total + '项');
  console.log('成功:' + obj.success + '项');
  console.log('失败:' + obj.failed + '项');
  if (obj.result == 'success') {
    console.log('\n结论:全部数据传送完成!');
  } else {
    console.log('\n结论:传送未成功...');
  }
  console.log('---------------------------------');
  console.log('\n(使用 Ctrl + C 退出进程)');
});
【注意事项及参考】 1. elasticsearch数据导入前请先配置好数据的 mapping; 2. "host_config" 更多参数设置详见 [es官方API文档] https://www.elastic.co/guide/e ... .html 3. mysql 表的自增 id 将自动替换为 "表名+_id" 的格式,如:"users_id"; 4. 如出现数据缺失情况,请注意查看 elasticsearch 终端进程或日志,找出未成功导入的数据,通过设置 exception_handler 参数处理它。