愚者求师之过,智者从师之长。

filebeat采集mysql的slowlog时, 缺少 use [schema] 行时, 怎么从历史行中继承字段值?

Beats | 作者 tithonus | 发布于2024年01月10日 | 阅读数:3406

慢日志文本如下:
 
# Time: 2023-12-28T16:16:08.106986Z
# User@Host: user1[user1] @  [10.71.27.29]  Id: 2701621
# Query_time: 5.304431  Lock_time: 0.000389 Rows_sent: 1  Rows_examined: 11091847
use dbname1;
SET timestamp=1703780168;
SELECT count(0) FROM MYTABNAME_1 o where  o.name = 'abc';
# Time: 2023-12-28T16:18:34.248019Z
# User@Host: user1[user1] @  [10.71.47.234]  Id: 2701740
# Query_time: 5.105161  Lock_time: 0.000380 Rows_sent: 1  Rows_examined: 11093904
SET timestamp=1703780314;
SELECT count(0) FROM MYTABNAME_1 o where  o.name = 'xyz';

# Time: 2023-12-28T16:21:52.562178Z
# User@Host: user2[user2]  @  [10.71.125.229]  Id: 2705774
# Query_time: 1.255726  Lock_time: 0.000936 Rows_sent: 1  Rows_examined: 1133817
use dbname2;
SET timestamp=1703780512;
SELECT count(0) FROM MYTABNAME_2 o where  o.job = 'AAA';
# Time: 2023-12-28T16:25:52.562178Z
# User@Host: user1[user1] @  [10.71.125.229]  Id: 2705774
# Query_time: 1.255726  Lock_time: 0.000936 Rows_sent: 1  Rows_examined: 1133817
SET timestamp=1703794321;
SELECT count(0) FROM MYTABNAME_2 o where  o.job = 'BBB';

# Time: 2023-12-28T16:30:08.106986Z
# User@Host: user1[user1] @  [10.71.27.29]  Id: 2701621
# Query_time: 5.304431  Lock_time: 0.000389 Rows_sent: 1  Rows_examined: 11091847
use dbname1;
SET timestamp=1703824321;
SELECT count(0) FROM MYTABNAME_1 o where  o.name = 'abc';
# Time: 2023-12-28T16:35:08.106986Z
# User@Host: user1[user1] @  [10.71.27.29]  Id: 2701621
# Query_time: 5.304431  Lock_time: 0.000389 Rows_sent: 1  Rows_examined: 11091847
SET timestamp=1703824321;
SELECT count(0) FROM MYTABNAME_1 o where  o.name = 'xyz';
 
采集架构
filebeat  mysql module 采集多台mysql主机的slowlog --> kafka --> logstash --> es(ingest/pipeline)解析字段后存入ES索引
 
以 # Time 为多行分割
 
filebeat 的 自带 mysql 模块做ingest/pipeline处理:
/usr/share/filebeat/module/mysql/slowlog/ingest/pipeline.json
其中grok部分内容如下: 
    {
      "grok": {
        "field": "message",
        "patterns": [
          "^# User@Host: %{USER:user.name}(\\[%{USER:mysql.slowlog.current_user}\\])? @ %{HOSTNAME:source.domain}? \\[%{IP:source.ip}?\\]%{METRICSPACE}(Id:%{SPACE}%{NUMBER:mysql.thread_id:long}%{METRICSPACE})?(Thread_id:%{SPACE}%{NUMBER:mysql.thread_id}%{METRICSPACE})?(Schema:%{SPACE}%{NOTSPACE:mysql.slowlog.schema}?%{METRICSPACE})?(Last_errno: %{NUMBER:mysql.slowlog.last_errno:long}%{METRICSPACE})?(Killed: %{NUMBER:mysql.slowlog.killed:long}%{METRICSPACE})?(QC_hit: %{WORD:mysql.slowlog.query_cache_hit}%{METRICSPACE})?(Query_time: %{NUMBER:temp.duration:float}%{METRICSPACE})?(Lock_time: %{NUMBER:mysql.slowlog.lock_time.sec:float}%{METRICSPACE})?(Rows_sent: %{NUMBER:mysql.slowlog.rows_sent:long}%{METRICSPACE})?(Rows_examined: %{NUMBER:mysql.slowlog.rows_examined:long}%{METRICSPACE})?(Rows_affected: %{NUMBER:mysql.slowlog.rows_affected:long}%{METRICSPACE})?(Thread_id: %{NUMBER:mysql.thread_id}%{METRICSPACE})?(Errno: %{NUMBER:mysql.slowlog.last_errno:long}%{METRICSPACE})?(Killed: %{NUMBER:mysql.slowlog.killed:long}%{METRICSPACE})?(Bytes_received: %{NUMBER:mysql.slowlog.bytes_received:long}%{METRICSPACE})?(Bytes_sent: %{NUMBER:mysql.slowlog.bytes_sent:long}%{METRICSPACE})?(Read_first: %{NUMBER:mysql.slowlog.read_first:long}%{METRICSPACE})?(Read_last: %{NUMBER:mysql.slowlog.read_last:long}%{METRICSPACE})?(Read_key: %{NUMBER:mysql.slowlog.read_key:long}%{METRICSPACE})?(Read_next: %{NUMBER:mysql.slowlog.read_next:long}%{METRICSPACE})?(Read_prev: %{NUMBER:mysql.slowlog.read_prev:long}%{METRICSPACE})?(Read_rnd: %{NUMBER:mysql.slowlog.read_rnd:long}%{METRICSPACE})?(Read_rnd_next: %{NUMBER:mysql.slowlog.read_rnd_next:long}%{METRICSPACE})?(Sort_merge_passes: %{NUMBER:mysql.slowlog.sort_merge_passes:long}%{METRICSPACE})?(Sort_range_count: %{NUMBER:mysql.slowlog.sort_range_count:long}%{METRICSPACE})?(Sort_rows: %{NUMBER:mysql.slowlog.sort_rows:long}%{METRICSPACE})?(Sort_scan_count: %{NUMBER:mysql.slowlog.sort_scan_count:long}%{METRICSPACE})?(Created_tmp_disk_tables: %{NUMBER:mysql.slowlog.tmp_disk_tables:long}%{METRICSPACE})?(Created_tmp_tables: %{NUMBER:mysql.slowlog.tmp_tables:long}%{METRICSPACE})?(Tmp_tables: %{NUMBER:mysql.slowlog.tmp_tables:long}%{METRICSPACE})?(Tmp_disk_tables: %{NUMBER:mysql.slowlog.tmp_disk_tables}%{METRICSPACE})?(Tmp_table_sizes: %{NUMBER:mysql.slowlog.tmp_table_sizes:long}%{METRICSPACE})?(Start: %{TIMESTAMP_ISO8601:event.start}%{METRICSPACE})?(End: %{TIMESTAMP_ISO8601:event.end}%{METRICSPACE})?(InnoDB_trx_id: %{WORD:mysql.slowlog.innodb.trx_id}%{METRICSPACE})?(QC_Hit: %{WORD:mysql.slowlog.query_cache_hit}%{METRICSPACE})?(Full_scan: %{WORD:mysql.slowlog.full_scan}%{METRICSPACE})?(Full_join: %{WORD:mysql.slowlog.full_join}%{METRICSPACE})?(Tmp_table: %{WORD:mysql.slowlog.tmp_table}%{METRICSPACE})?(Tmp_table_on_disk: %{WORD:mysql.slowlog.tmp_table_on_disk}%{METRICSPACE})?(Filesort: %{WORD:mysql.slowlog.filesort}%{METRICSPACE})?(Filesort_on_disk: %{WORD:mysql.slowlog.filesort_on_disk}%{METRICSPACE})?(Merge_passes: %{NUMBER:mysql.slowlog.merge_passes:long}%{METRICSPACE})?(Priority_queue: %{WORD:mysql.slowlog.priority_queue}%{METRICSPACE})?(No InnoDB statistics available for this query%{METRICSPACE})?(InnoDB_IO_r_ops: %{NUMBER:mysql.slowlog.innodb.io_r_ops:long}%{METRICSPACE})?(InnoDB_IO_r_bytes: %{NUMBER:mysql.slowlog.innodb.io_r_bytes:long}%{METRICSPACE})?(InnoDB_IO_r_wait: %{NUMBER:mysql.slowlog.innodb.io_r_wait.sec:float}%{METRICSPACE})?(InnoDB_rec_lock_wait: %{NUMBER:mysql.slowlog.innodb.rec_lock_wait.sec:float}%{METRICSPACE})?(InnoDB_queue_wait: %{NUMBER:mysql.slowlog.innodb.queue_wait.sec:float}%{METRICSPACE})?(InnoDB_pages_distinct: %{NUMBER:mysql.slowlog.innodb.pages_distinct:long}%{METRICSPACE})?(Log_slow_rate_type: %{WORD:mysql.slowlog.log_slow_rate_type}%{METRICSPACE})?(Log_slow_rate_limit: %{NUMBER:mysql.slowlog.log_slow_rate_limit:long}%{METRICSPACE})?%{EXPLAIN}(use %{NOTSPACE:mysql.slowlog.schema};\n)?SET timestamp=%{NUMBER:mysql.slowlog.timestamp:long};\n%{GREEDYMULTILINE:mysql.slowlog.query}"
        ],
        "pattern_definitions": {
          "GREEDYMULTILINE": "(.|\n)*",
          "METRICSPACE": "([ #\n]*)",
          "EXPLAIN": "(# explain:.*\n|#\\s*\n)*"
        },
        "ignore_missing": true
      }
    }
 
 
在处理 (use %{NOTSPACE:mysql.slowlog.schema};\n) 这行时, mysql的slowlog有多条记录时, 只有第一条记录包含"use dbname1;" 信息, 这时可以获取到 mysql.slowlog.schema 值, 第二条就不包含 use [schema] 记录了.
 
目的就是让filebeat采集时,当不包含 use [schema]  记录时, 能继承历史行中的值; 当前行存在use [schema]  记录时,则使用当前行
请大佬帮忙解答下, 如何实现, 感谢.
 
已邀请:

要回复问题请先登录注册