pbootcms网站模板|日韩1区2区|织梦模板||网站源码|日韩1区2区|jquery建站特效-html5模板网

<i id='ifFGg'><tr id='ifFGg'><dt id='ifFGg'><q id='ifFGg'><span id='ifFGg'><b id='ifFGg'><form id='ifFGg'><ins id='ifFGg'></ins><ul id='ifFGg'></ul><sub id='ifFGg'></sub></form><legend id='ifFGg'></legend><bdo id='ifFGg'><pre id='ifFGg'><center id='ifFGg'></center></pre></bdo></b><th id='ifFGg'></th></span></q></dt></tr></i><div class="numhrmv" id='ifFGg'><tfoot id='ifFGg'></tfoot><dl id='ifFGg'><fieldset id='ifFGg'></fieldset></dl></div>
<tfoot id='ifFGg'></tfoot>

      <bdo id='ifFGg'></bdo><ul id='ifFGg'></ul>
    <legend id='ifFGg'><style id='ifFGg'><dir id='ifFGg'><q id='ifFGg'></q></dir></style></legend>

    1. <small id='ifFGg'></small><noframes id='ifFGg'>

    2. 如何從Apache Kafka中的遠程數據庫中提取數據?

      How to pull the data from remote database in Apache Kafka?(如何從Apache Kafka中的遠程數據庫中提取數據?)
        <tbody id='OQ5eL'></tbody>

      <small id='OQ5eL'></small><noframes id='OQ5eL'>

      • <legend id='OQ5eL'><style id='OQ5eL'><dir id='OQ5eL'><q id='OQ5eL'></q></dir></style></legend>

          <i id='OQ5eL'><tr id='OQ5eL'><dt id='OQ5eL'><q id='OQ5eL'><span id='OQ5eL'><b id='OQ5eL'><form id='OQ5eL'><ins id='OQ5eL'></ins><ul id='OQ5eL'></ul><sub id='OQ5eL'></sub></form><legend id='OQ5eL'></legend><bdo id='OQ5eL'><pre id='OQ5eL'><center id='OQ5eL'></center></pre></bdo></b><th id='OQ5eL'></th></span></q></dt></tr></i><div class="apwnwft" id='OQ5eL'><tfoot id='OQ5eL'></tfoot><dl id='OQ5eL'><fieldset id='OQ5eL'></fieldset></dl></div>

              <bdo id='OQ5eL'></bdo><ul id='OQ5eL'></ul>

                <tfoot id='OQ5eL'></tfoot>

              1. 本文介紹了如何從Apache Kafka中的遠程數據庫中提取數據?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

                問題描述

                我想在 Apache Kafka 中制作實時數據管道.我有位于遠程位置的數據庫,并且該數據庫不斷更新.我應該使用哪個 Kafka 連接 API 來從數據庫中提取數據并實時攝取到 Kafka 代理中?稍后我將使用 kafka 流和 KSQL 運行臨時查詢來執行指標.

                任何幫助將不勝感激!

                解決方案

                如果您想創建實時數據管道,您需要使用能夠從 MySQL 流式傳輸更改的變更數據捕獲 (CDC) 工具.我建議使用 Debezium,這是一個用于更改數據捕獲的開源分布式平臺.

                捕獲插入

                當一個新記錄被添加到一個表中時,會產生一個類似于下面的 JSON:

                <代碼>{有效載荷":{之前":空,后":{"id":1005,"first_name":"Giorgos","last_name":"無數","電子郵件":"giorgos@abc.com"},來源":{"name":"dbserver1",server_id":223344,ts_sec":1500369632,gtid":空,"file":"mysql-bin.000003",位置":364,行":0,快照":空,線程":13,"db":"庫存",表":客戶"},"op":"c",ts_ms":1500369632095}}

                before 對象為空,after 對象包含新插入的值.請注意,op 屬性是 c,表明這是一個 CREATE 事件.

                捕獲更新

                假設 email 屬性已更新,將生成類似于以下的 JSON:

                <代碼>{有效載荷":{前":{"id":1005,"first_name":"Giorgos","last_name":"無數","電子郵件":"giorgos@abc.com"},后":{"id":1005,"first_name":"Giorgos","last_name":"無數","email":"newEmail@abc.com"},來源":{"name":"dbserver1",server_id":223344,"ts_sec":1500369929,gtid":空,"file":"mysql-bin.000003",位置":673,行":0,快照":空,線程":13,"db":"庫存",表":客戶"},"op":"你",ts_ms":1500369929464}}

                注意 op 現在是 u,表明這是一個 UPDATE 事件.before 對象顯示更新前的行狀態,after 對象捕獲更新行的當前狀態.

                捕獲刪除

                現在假設該行已被刪除;

                <代碼>{有效載荷":{前":{"id":1005,"first_name":"Giorgos","last_name":"無數","email":"newEmail@abc.com"},之后":空,來源":{"name":"dbserver1",server_id":223344,ts_sec":1500370394,gtid":空,"file":"mysql-bin.000003",位置":1025,行":0,快照":空,線程":13,"db":"庫存",表":客戶"},"op":"d",ts_ms":1500370394589}}

                op new 等于 d,表示 DELETE 事件.after 屬性將為 null 并且 before 對象包含被刪除之前的行.

                您還可以查看他們網站上提供的詳盡教程.

                示例配置一個 MySQL 數據庫

                <代碼>{名稱":庫存連接器",(1)配置":{"connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)"database.hostname": "192.168.99.100", (3)"database.port": "3306", (4)"database.user": "debezium", (5)"database.password": "dbz", (6)"database.server.id": "184054", (7)"database.server.name": "fullfillment", (8)database.whitelist":庫存",(9)"database.history.kafka.bootstrap.servers": "kafka:9092", (10)"database.history.kafka.topic": "dbhistory.fullfillment" (11)"include.schema.changes": "true" (12)}}

                <塊引用>

                1 當我們向 Kafka Connect 注冊時我們的連接器的名稱服務.
                2 此 MySQL 連接器類的名稱.
                3 地址MySQL服務器.
                4 MySQL 服務器的端口號.
                5 名稱具有所需權限的 MySQL 用戶.
                6 密碼具有所需權限的 MySQL 用戶.
                7 連接器的標識符在 MySQL 集群中必須是唯一的,并且類似于MySQL 的 server-id 配置屬性.
                8 邏輯名MySQL 服務器/集群,形成命名空間,用于所有連接器寫入的 Kafka 主題的名稱,Kafka連接模式名稱,以及相應 Avro 的命名空間使用 Avro 連接器時的架構.
                9 所有數據庫的列表由此連接器將監視的此服務器托管.這是可選,還有其他屬性可用于列出數據庫和要包括或排除在監視之外的表.
                10 卡夫卡名單此連接器將用于編寫和恢復 DDL 的代理對數據庫歷史主題的語句.
                11 數據庫名稱連接器將寫入和恢復 DDL 的歷史主題聲明.本主題僅供內部使用,不應使用由消費者.
                12 指定連接器應該使用名為 fullfillment 事件的模式更改主題生成消費者可以使用的 DDL 更改.

                I want to make real-time data pipeline in Apache Kafka. I have database which is located at remote location and that database continuously updating. Can anybody which Kafka connect API i should use to pull the data from database and ingest into Kafka broker in real time? later on i would use kafka stream and KSQL to run ad-hoc queries to perform the metrics.

                Any help would be highly appreciated!

                解決方案

                If you want to create a real-time data pipeline you need to use a Change Data Capture (CDC) tool which is able to stream changes from MySQL. I would suggest Debezium which is an open source distributed platform for change data capture.

                Capturing Inserts

                When a new record is added to a table, a JSON similar to the one below will be produced:

                {  
                   "payload":{  
                      "before":null,
                      "after":{  
                         "id":1005,
                         "first_name":"Giorgos",
                         "last_name":"Myrianthous",
                         "email":"giorgos@abc.com"
                      },
                      "source":{  
                         "name":"dbserver1",
                         "server_id":223344,
                         "ts_sec":1500369632,
                         "gtid":null,
                         "file":"mysql-bin.000003",
                         "pos":364,
                         "row":0,
                         "snapshot":null,
                         "thread":13,
                         "db":"inventory",
                         "table":"customers"
                      },
                      "op":"c",
                      "ts_ms":1500369632095
                   }
                }
                

                before object is null and after object contains the newly inserted values. Note that the op attribute is c, indicating that this was a CREATE event.

                Capturing Updates

                Assuming that email attribute has been updated, a JSON similar to the one below will be produced:

                { 
                    "payload":{  
                      "before":{  
                         "id":1005,
                         "first_name":"Giorgos",
                         "last_name":"Myrianthous",
                         "email":"giorgos@abc.com"
                      },
                      "after":{  
                         "id":1005,
                         "first_name":"Giorgos",
                         "last_name":"Myrianthous",
                         "email":"newEmail@abc.com"
                      },
                      "source":{  
                         "name":"dbserver1",
                         "server_id":223344,
                         "ts_sec":1500369929,
                         "gtid":null,
                         "file":"mysql-bin.000003",
                         "pos":673,
                         "row":0,
                         "snapshot":null,
                         "thread":13,
                         "db":"inventory",
                         "table":"customers"
                      },
                      "op":"u",
                      "ts_ms":1500369929464
                   }
                }
                

                Notice op which is now u, indicating that this was an UPDATE event. before object shows the row state before the update and after object captures the current state of the updated row.

                Capturing deletes

                Now assume that the row has been deleted;

                { 
                    "payload":{  
                      "before":{  
                         "id":1005,
                         "first_name":"Giorgos",
                         "last_name":"Myrianthous",
                         "email":"newEmail@abc.com"
                      },
                      "after":null,
                      "source":{  
                         "name":"dbserver1",
                         "server_id":223344,
                         "ts_sec":1500370394,
                         "gtid":null,
                         "file":"mysql-bin.000003",
                         "pos":1025,
                         "row":0,
                         "snapshot":null,
                         "thread":13,
                         "db":"inventory",
                         "table":"customers"
                      },
                      "op":"d",
                      "ts_ms":1500370394589
                   }
                }
                

                op new is equal to d, indicating a DELETE event. after attribute will be null and before object contains the row before it gets deleted.

                You can also have a look at the extensive tutorial provided in their website.

                EDIT: Example configuration for a MySQL database

                {
                  "name": "inventory-connector",  (1)
                  "config": {
                    "connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)
                    "database.hostname": "192.168.99.100", (3)
                    "database.port": "3306", (4)
                    "database.user": "debezium", (5)
                    "database.password": "dbz", (6)
                    "database.server.id": "184054", (7)
                    "database.server.name": "fullfillment", (8)
                    "database.whitelist": "inventory", (9)
                    "database.history.kafka.bootstrap.servers": "kafka:9092", (10)
                    "database.history.kafka.topic": "dbhistory.fullfillment" (11)
                    "include.schema.changes": "true" (12)
                  }
                }
                

                1 The name of our connector when we register it with a Kafka Connect service.
                2 The name of this MySQL connector class.
                3 The address of the MySQL server.
                4 The port number of the MySQL server.
                5 The name of the MySQL user that has the required privileges.
                6 The password for the MySQL user that has the required privileges.
                7 The connector’s identifier that must be unique within the MySQL cluster and similar to MySQL’s server-id configuration property.
                8 The logical name of the MySQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
                9 A list of all databases hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the databases and tables to include or exclude from monitoring.
                10 The list of Kafka brokers that this connector will use to write and recover DDL statements to the database history topic.
                11 The name of the database history topic where the connector will write and recover DDL statements. This topic is for internal use only and should not be used by consumers.
                12 The flag specifying that the connector should generate on the schema change topic named fullfillment events with the DDL changes that can be used by consumers.

                這篇關于如何從Apache Kafka中的遠程數據庫中提取數據?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!

                【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

                相關文檔推薦

                How to use windowing functions efficiently to decide next N number of rows based on N number of previous values(如何有效地使用窗口函數根據 N 個先前值來決定接下來的 N 個行)
                reuse the result of a select expression in the quot;GROUP BYquot; clause?(在“GROUP BY中重用選擇表達式的結果;條款?)
                Does ignore option of Pyspark DataFrameWriter jdbc function ignore entire transaction or just offending rows?(Pyspark DataFrameWriter jdbc 函數的 ignore 選項是忽略整個事務還是只是有問題的行?) - IT屋-程序員軟件開發技
                Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array(使用 INSERT INTO table ON DUPLICATE KEY 時出錯,使用 for 循環數組)
                pyspark mysql jdbc load An error occurred while calling o23.load No suitable driver(pyspark mysql jdbc load 調用 o23.load 時發生錯誤 沒有合適的驅動程序)
                How to integrate Apache Spark with MySQL for reading database tables as a spark dataframe?(如何將 Apache Spark 與 MySQL 集成以將數據庫表作為 Spark 數據幀讀取?)

                  <tbody id='Wle6Q'></tbody>
                      • <bdo id='Wle6Q'></bdo><ul id='Wle6Q'></ul>
                      • <tfoot id='Wle6Q'></tfoot>

                          <i id='Wle6Q'><tr id='Wle6Q'><dt id='Wle6Q'><q id='Wle6Q'><span id='Wle6Q'><b id='Wle6Q'><form id='Wle6Q'><ins id='Wle6Q'></ins><ul id='Wle6Q'></ul><sub id='Wle6Q'></sub></form><legend id='Wle6Q'></legend><bdo id='Wle6Q'><pre id='Wle6Q'><center id='Wle6Q'></center></pre></bdo></b><th id='Wle6Q'></th></span></q></dt></tr></i><div class="mq72fd8" id='Wle6Q'><tfoot id='Wle6Q'></tfoot><dl id='Wle6Q'><fieldset id='Wle6Q'></fieldset></dl></div>
                        1. <small id='Wle6Q'></small><noframes id='Wle6Q'>

                          <legend id='Wle6Q'><style id='Wle6Q'><dir id='Wle6Q'><q id='Wle6Q'></q></dir></style></legend>
                          主站蜘蛛池模板: 工业淬火油烟净化器,北京油烟净化器厂家,热处理油烟净化器-北京众鑫百科 | 大型工业风扇_工业大风扇_大吊扇_厂房车间降温-合昌大风扇 | 柴油发电机组_柴油发电机_发电机组价格-江苏凯晨电力设备有限公司 | 刚性-柔性防水套管-橡胶伸缩接头-波纹管补偿器-启腾供水材料有限公司 | 老房子翻新装修,旧房墙面翻新,房屋防水补漏,厨房卫生间改造,室内装潢装修公司 - 一修房屋快修官网 | 艾默生变频器,艾默生ct,变频器,ct驱动器,广州艾默生变频器,供水专用变频器,风机变频器,电梯变频器,艾默生变频器代理-广州市盟雄贸易有限公司官方网站-艾默生变频器应用解决方案服务商 | 广东护栏厂家-广州护栏网厂家-广东省安麦斯交通设施有限公司 | 储气罐,真空罐,缓冲罐,隔膜气压罐厂家批发价格,空压机储气罐规格型号-上海申容压力容器集团有限公司 | 金环宇|金环宇电线|金环宇电缆|金环宇电线电缆|深圳市金环宇电线电缆有限公司|金环宇电缆集团 | 大通天成企业资质代办_承装修试电力设施许可证_增值电信业务经营许可证_无人机运营合格证_广播电视节目制作许可证 | 深圳美安可自动化设备有限公司,喷码机,定制喷码机,二维码喷码机,深圳喷码机,纸箱喷码机,东莞喷码机 UV喷码机,日期喷码机,鸡蛋喷码机,管芯喷码机,管内壁喷码机,喷码机厂家 | 综合管廊模具_生态,阶梯护坡模具_检查井模具制造-致宏模具厂家 | ★塑料拖链__工程拖链__电缆拖链__钢制拖链 - 【上海闵彬】 | 软文发布-新闻发布推广平台-代写文章-网络广告营销-自助发稿公司媒介星 | 冷藏车厂家|冷藏车价格|小型冷藏车|散装饲料车厂家|程力专用汽车股份有限公司销售十二分公司 | 心肺复苏模拟人|医学模型|急救护理模型|医学教学模型上海康人医学仪器设备有限公司 | 体检车_移动CT车_CT检查车_CT车_深圳市艾克瑞电气有限公司移动CT体检车厂家-深圳市艾克瑞电气有限公司 | 针焰试验仪,灼热丝试验仪,漏电起痕试验仪,水平垂直燃烧试验仪 - 苏州亚诺天下仪器有限公司 | 青岛美佳乐清洁工程有限公司|青岛油烟管道清洗|酒店|企事业单位|学校工厂厨房|青岛油烟管道清洗 插针变压器-家用电器变压器-工业空调变压器-CD型电抗器-余姚市中驰电器有限公司 | 阜阳在线-阜阳综合门户 | 橡胶弹簧|复合弹簧|橡胶球|振动筛配件-新乡市永鑫橡胶厂 | 武汉不干胶印刷_标签设计印刷_不干胶标签印刷厂 - 武汉不干胶标签印刷厂家 | 多米诺-多米诺世界纪录团队-多米诺世界-多米诺团队培训-多米诺公关活动-多米诺创意广告-多米诺大型表演-多米诺专业赛事 | 玻璃钢格栅盖板|玻璃钢盖板|玻璃钢格栅板|树篦子-长沙川皖玻璃钢制品有限公司 | 双菱电缆-广州电缆厂_广州电缆厂有限公司 | 原色会计-合肥注册公司_合肥代理记账公司_营业执照代办 | 火锅加盟_四川成都火锅店加盟_中国火锅连锁品牌十强_朝天门火锅【官网】 | ISO9001认证咨询_iso9001企业认证代理机构_14001|18001|16949|50430认证-艾世欧认证网 | 美的商用净水器_美的直饮机_一级代理经销商_Midea租赁价格-厂家反渗透滤芯-直饮水批发品牌售后 | 金联宇电缆总代理-金联宇集团-广东金联宇电缆实业有限公司 | 石家庄网站建设|石家庄网站制作|石家庄小程序开发|石家庄微信开发|网站建设公司|网站制作公司|微信小程序开发|手机APP开发|软件开发 | 武汉高低温试验箱_恒温恒湿试验箱厂家-武汉蓝锐环境科技有限公司 | 低粘度纤维素|混凝土灌浆料|有机硅憎水粉|聚羧酸减水剂-南京斯泰宝 | 洗瓶机厂家-酒瓶玻璃瓶冲瓶机-瓶子烘干机-封口旋盖压盖打塞机_青州惠联灌装机械 | 顺景erp系统_erp软件_erp软件系统_企业erp管理系统-广东顺景软件科技有限公司 | 西门子伺服电机维修,西门子电源模块维修,西门子驱动模块维修-上海渠利 | IHDW_TOSOKU_NEMICON_EHDW系列电子手轮,HC1系列电子手轮-上海莆林电子设备有限公司 | 早报网| 【甲方装饰】合肥工装公司-合肥装修设计公司,专业从事安徽办公室、店面、售楼部、餐饮店、厂房装修设计服务 | LED太阳能中国结|发光红灯笼|灯杆造型灯|节日灯|太阳能灯笼|LED路灯杆装饰造型灯-北京中海轩光电 | 压力喷雾干燥机,喷雾干燥设备,柱塞隔膜泵-无锡市闻华干燥设备有限公司 |