logstash实时监听pgsql数据库变化 发送消息到kafka

    技术2023-11-20  77

    # Sample Logstash configuration for creating a simple # Kafka -> Logstash -> Elasticsearch pipeline. input { kafka { bootstrap_servers => "localhost:9092" topics => ["BookList","BookDuplicate"] decorate_events => true #可向事件添加Kafka元数据比如 topic,消费者组 } jdbc { jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/folio" jdbc_user => "folio" jdbc_password => "folio123" #更换自己的数据库用户名和密码 jdbc_driver_library => "/Users/yanziyu/logstash/logstash-7.7.1/bin/postgresql-42.2.14.jar" #更换自己的驱动地址 jdbc_driver_class => "org.postgresql.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "300000" use_column_value => "true" tracking_column => "id" #这是要操作的sql 表,下面会提供 statement => "select jsonb::TEXT from shlibrary_mod_shl_inventory.booklist_add" schedule => "* * * * *" type => "jdbc_booklist" jdbc_default_timezone =>"Asia/Shanghai" } jdbc { jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/folio" jdbc_user => "folio" jdbc_password => "folio123" #更换自己的数据库用户名和密码 jdbc_driver_library => "/Users/yanziyu/logstash/logstash-7.7.1/bin/postgresql-42.2.14.jar" #更换自己的驱动地址 jdbc_driver_class => "org.postgresql.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "300000" use_column_value => "true" tracking_column => "id" #这是要操作的sql 表,下面会提供 statement => "select jsonb::TEXT from shlibrary_mod_shl_inventory.bookduplicate_add" schedule => "* * * * *" type => "jdbc_bookduplicate" jdbc_default_timezone =>"Asia/Shanghai" } } filter { json { source => "message" #将message中数据分成对应的field #remove_field => ["message"] } mutate { remove_field => ["@version","@timestamp"] #移除字段 } } output { stdout{ codec => "rubydebug" } if [type]== "jdbc_booklist" { kafka { bootstrap_servers => "localhost:9092" #kafka服务器地址 topic_id => "BookList_Add" batch_size => 5 codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式 } } if [type]== "jdbc_bookduplicate" { kafka { bootstrap_servers => "localhost:9092" #kafka服务器地址 topic_id => "BookDuplicate_Add" batch_size => 5 codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式 } } if [@metadata][kafka][topic] == "BookList" { elasticsearch { hosts => ["http://localhost:9200"] index => "booklist" document_id => "%{id}" #将message中的id设置为_doc的Id } } if [@metadata][kafka][topic] == "BookDuplicate" { elasticsearch { hosts => ["http://localhost:9200"] index => "bookduplicate" document_id => "%{id}" #将message中的id设置为_doc的Id } } }
    Processed: 0.013, SQL: 9