您现在的位置是:首页 > 博客日记 > Search搜索 Search搜索

配置 logstash 将mysql多表数据全量增量同步到es

2019-07-12 11:31:23

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

首先下载与es版本对应的logstash,解压到/usr/local/logstash-6.4.0

1.jdbc-connector的jar包

maven 仓库下载 mysql-connector-java

mysql-connector-java-6.0.6.jar包放在

  1. /usr/local/logstash-6.4.0/lib/mysql-connector-java-6.0.6.jar

2.mysql文件下创建文件

/usr/local/logstash-6.4.0/bin 目录下创建mysql文件夹

1.执行mysql操作的.sql文件 jdbc.conf

  1. input {
  2. stdin {
  3. }
  4. jdbc {
  5. # mysql 数据库链接
  6. jdbc_connection_string => "jdbc:mysql://127.0.0.1/video?characterEncoding=UTF-8&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai"
  7. # 用户名和密码
  8. jdbc_user => "root"
  9. jdbc_password => "root"
  10. # 驱动
  11. jdbc_driver_library => "/usr/local/logstash-6.4.0/lib/mysql-connector-java-6.0.6.jar"
  12. # 驱动类名
  13. jdbc_driver_class => "com.mysql.jdbc.Driver"
  14. jdbc_paging_enabled => "true"
  15. jdbc_page_size => "50000"
  16. # 执行的sql 文件路径+名称
  17. statement_filepath => "mysql/jdbc.sql"
  18. # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
  19. schedule => "* * * * *"
  20. #schedule => "0 1 * * *"
  21. #处理中文乱码问题
  22. codec => plain { charset => "UTF-8"}
  23. # 索引类型
  24. type => "product"
  25. lowercase_column_names => false
  26. #是否记录最后一次运行内容
  27. record_last_run => true
  28. # 是否适用列元素
  29. use_column_value => true
  30. # 追踪的元素名,对应保存到es上面的字段名而不是数据库字段名
  31. tracking_column => "updated_at"
  32. # 默认为number,如果为日期必须声明为timestamp
  33. tracking_column_type => "timestamp"
  34. # 设置记录的路径
  35. last_run_metadata_path => "mysql/prodcut_last_time"
  36. # 每次运行是否清除
  37. clean_run => false
  38. }
  39. }
  40. filter {
  41. json {
  42. source => "message"
  43. remove_field => ["message"]
  44. }
  45. }
  46. output {
  47. if [type]=="product"{
  48. elasticsearch {
  49. hosts => ["211.147.6.230:9200"]
  50. # 索引名称
  51. index => "collect"
  52. # type名称
  53. document_type => "product"
  54. # 文档id,inquiryId为sql文件中查询出的字段名
  55. document_id => "%{id}"
  56. }
  57. stdout {
  58. # JSON格式输出
  59. codec => json_lines
  60. }
  61. }
  62. }

2.jdbc.sql 配置文件.conf

  1. SELECT `a`.`id`,`a`.`catid`,`a`.`catids`,`a`.`title`,`f`.`name`, `a`.`status`,a.updated_at FROM `c_product` `a` LEFT JOIN `c_company` `f` ON f.id = a.cid WHERE a.updated_at > :sql_last_value order by a.updated_at
启动logstash
  1. ./logstash -f ./mysql/jdbc.conf


关注TinyMeng博客,更多精彩分享,敬请期待!