pbootcms网站模板|日韩1区2区|织梦模板||网站源码|日韩1区2区|jquery建站特效-html5模板网

  1. <legend id='42AN6'><style id='42AN6'><dir id='42AN6'><q id='42AN6'></q></dir></style></legend>

        <bdo id='42AN6'></bdo><ul id='42AN6'></ul>
    1. <i id='42AN6'><tr id='42AN6'><dt id='42AN6'><q id='42AN6'><span id='42AN6'><b id='42AN6'><form id='42AN6'><ins id='42AN6'></ins><ul id='42AN6'></ul><sub id='42AN6'></sub></form><legend id='42AN6'></legend><bdo id='42AN6'><pre id='42AN6'><center id='42AN6'></center></pre></bdo></b><th id='42AN6'></th></span></q></dt></tr></i><div class="zlvhjvn" id='42AN6'><tfoot id='42AN6'></tfoot><dl id='42AN6'><fieldset id='42AN6'></fieldset></dl></div>

    2. <small id='42AN6'></small><noframes id='42AN6'>

      <tfoot id='42AN6'></tfoot>

      Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 數據庫

      Google Dataflow (Apache beam) JdbcIO bulk insert into mysql database(Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 數據庫)

      • <tfoot id='dIshH'></tfoot>

        <small id='dIshH'></small><noframes id='dIshH'>

        <legend id='dIshH'><style id='dIshH'><dir id='dIshH'><q id='dIshH'></q></dir></style></legend>

            <tbody id='dIshH'></tbody>

              <bdo id='dIshH'></bdo><ul id='dIshH'></ul>

                <i id='dIshH'><tr id='dIshH'><dt id='dIshH'><q id='dIshH'><span id='dIshH'><b id='dIshH'><form id='dIshH'><ins id='dIshH'></ins><ul id='dIshH'></ul><sub id='dIshH'></sub></form><legend id='dIshH'></legend><bdo id='dIshH'><pre id='dIshH'><center id='dIshH'></center></pre></bdo></b><th id='dIshH'></th></span></q></dt></tr></i><div class="njtnhzd" id='dIshH'><tfoot id='dIshH'></tfoot><dl id='dIshH'><fieldset id='dIshH'></fieldset></dl></div>
                本文介紹了Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 數據庫的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

                問題描述

                我正在使用 Dataflow SDK 2.X Java API (Apache Beam SDK) 將數據寫入 mysql.我已經基于 Apache Beam SDK 文檔 使用數據流將數據寫入 mysql.它一次插入單行,因為我需要實現批量插入.我在官方文檔中找不到任何啟用批量插入模式的選項.

                I'm using Dataflow SDK 2.X Java API ( Apache Beam SDK) to write data into mysql. I've created pipelines based on Apache Beam SDK documentation to write data into mysql using dataflow. It inserts single row at a time where as I need to implement bulk insert. I do not find any option in official documentation to enable bulk inset mode.

                想知道是否可以在數據流管道中設置批量插入模式?如果是,請讓我知道我需要在下面的代碼中更改什么.

                Wondering, if it's possible to set bulk insert mode in dataflow pipeline? If yes, please let me know what I need to change in below code.

                 .apply(JdbcIO.<KV<Integer, String>>write()
                      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                            "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
                          .withUsername("username")
                          .withPassword("password"))
                      .withStatement("insert into Person values(?, ?)")
                      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
                        public void setParameters(KV<Integer, String> element, PreparedStatement query) {
                          query.setInt(1, kv.getKey());
                          query.setString(2, kv.getValue());
                        }
                      })
                

                推薦答案

                EDIT 2018-01-27:

                事實證明,這個問題與 DirectRunner 有關.如果您使用 DataflowRunner 運行相同的管道,您應該獲得實際上多達 1,000 條記錄的批次.DirectRunner 總是在分組操作后創建大小為 1 的包.

                It turns out that this issue is related to the DirectRunner. If you run the same pipeline using the DataflowRunner, you should get batches that are actually up to 1,000 records. The DirectRunner always creates bundles of size 1 after a grouping operation.

                原答案:

                我在使用 Apache Beam 的 JdbcIO 寫入云數據庫時遇到了同樣的問題.問題是,雖然 JdbcIO 確實支持批量寫入多達 1,000 條記錄,但我從未真正見過它一次寫入超過 1 行(我不得不承認:這總是在開發環境中使用 DirectRunner).

                I've run into the same problem when writing to cloud databases using Apache Beam's JdbcIO. The problem is that while JdbcIO does support writing up to 1,000 records in one batch, in I have never actually seen it write more than 1 row at a time (I have to admit: This was always using the DirectRunner in a development environment).

                因此,我在 JdbcIO 中添加了一個功能,您可以通過將數據分組在一起并將每個組寫為一個批次來自己控制批次的大小.下面是基于 Apache Beam 原始 WordCount 示例的如何使用此功能的示例.

                I have therefore added a feature to JdbcIO where you can control the size of the batches yourself by grouping your data together and writing each group as one batch. Below is an example of how to use this feature based on the original WordCount example of Apache Beam.

                p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
                    // Count words in input file(s)
                    .apply(new CountWords())
                    // Format as text
                    .apply(MapElements.via(new FormatAsTextFn()))
                    // Make key-value pairs with the first letter as the key
                    .apply(ParDo.of(new FirstLetterAsKey()))
                    // Group the words by first letter
                    .apply(GroupByKey.<String, String> create())
                    // Get a PCollection of only the values, discarding the keys
                    .apply(ParDo.of(new GetValues()))
                    // Write the words to the database
                    .apply(JdbcIO.<String> writeIterable()
                            .withDataSourceConfiguration(
                                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
                            .withStatement(INSERT_OR_UPDATE_SQL)
                            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
                

                與 JdbcIO 的普通寫入方法的不同之處在于新方法 writeIterable()PCollection> 作為輸入而不是 <代碼>PCollection.每個 Iterable 都作為一批寫入數據庫.

                The difference with the normal write-method of JdbcIO is the new method writeIterable() that takes a PCollection<Iterable<RowT>> as input instead of PCollection<RowT>. Each Iterable is written as one batch to the database.

                可以在此處找到具有此附加功能的 JdbcIO 版本:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java

                The version of JdbcIO with this addition can be found here: https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java

                可以在此處找到包含上述示例的整個示例項目:https://github.com/olavloite/spanner-beam-example

                The entire example project containing the example above can be found here: https://github.com/olavloite/spanner-beam-example

                (Apache Beam 上還有一個拉取請求未決,以將其包含在項目中)

                (There is also a pull request pending on Apache Beam to include this in the project)

                這篇關于Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 數據庫的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!

                【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

                相關文檔推薦

                How to use windowing functions efficiently to decide next N number of rows based on N number of previous values(如何有效地使用窗口函數根據 N 個先前值來決定接下來的 N 個行)
                reuse the result of a select expression in the quot;GROUP BYquot; clause?(在“GROUP BY中重用選擇表達式的結果;條款?)
                Does ignore option of Pyspark DataFrameWriter jdbc function ignore entire transaction or just offending rows?(Pyspark DataFrameWriter jdbc 函數的 ignore 選項是忽略整個事務還是只是有問題的行?) - IT屋-程序員軟件開發技
                Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array(使用 INSERT INTO table ON DUPLICATE KEY 時出錯,使用 for 循環數組)
                pyspark mysql jdbc load An error occurred while calling o23.load No suitable driver(pyspark mysql jdbc load 調用 o23.load 時發生錯誤 沒有合適的驅動程序)
                How to integrate Apache Spark with MySQL for reading database tables as a spark dataframe?(如何將 Apache Spark 與 MySQL 集成以將數據庫表作為 Spark 數據幀讀取?)
                  <tbody id='OecXG'></tbody>
              • <tfoot id='OecXG'></tfoot>
              • <i id='OecXG'><tr id='OecXG'><dt id='OecXG'><q id='OecXG'><span id='OecXG'><b id='OecXG'><form id='OecXG'><ins id='OecXG'></ins><ul id='OecXG'></ul><sub id='OecXG'></sub></form><legend id='OecXG'></legend><bdo id='OecXG'><pre id='OecXG'><center id='OecXG'></center></pre></bdo></b><th id='OecXG'></th></span></q></dt></tr></i><div class="pf5rn7p" id='OecXG'><tfoot id='OecXG'></tfoot><dl id='OecXG'><fieldset id='OecXG'></fieldset></dl></div>

                      • <bdo id='OecXG'></bdo><ul id='OecXG'></ul>
                        <legend id='OecXG'><style id='OecXG'><dir id='OecXG'><q id='OecXG'></q></dir></style></legend>

                          <small id='OecXG'></small><noframes id='OecXG'>

                          主站蜘蛛池模板: 深圳市超时尚职业培训学校,培训:月嫂,育婴,养老,家政;化妆,美容,美发,美甲. | 宝鸡市人民医院| 食品级焦亚硫酸钠_工业级焦亚硫酸钠_焦亚硫酸钠-潍坊邦华化工有限公司 | 标准光源箱|对色灯箱|色差仪|光泽度仪|涂层测厚仪_HRC大品牌生产厂家 | 精密钢管,冷拔精密无缝钢管,精密钢管厂,精密钢管制造厂家,精密钢管生产厂家,山东精密钢管厂家 | PAS糖原染色-CBA流式多因子-明胶酶谱MMP-上海研谨生物科技有限公司 | 儋州在线-儋州招聘找工作、找房子、找对象,儋州综合生活信息门户! | 气动隔膜泵厂家-温州永嘉定远泵阀有限公司 | 南京种植牙医院【官方挂号】_南京治疗种植牙医院那个好_南京看种植牙哪里好_南京茀莱堡口腔医院 尼龙PA610树脂,尼龙PA612树脂,尼龙PA1010树脂,透明尼龙-谷骐科技【官网】 | 煤矿支护网片_矿用勾花菱形网_缝管式_管缝式锚杆-邯郸市永年区志涛工矿配件有限公司 | 除甲醛公司-甲醛检测-广西雅居环境科技有限公司 | 环氧乙烷灭菌器_压力蒸汽灭菌器_低温等离子过氧化氢灭菌器 _低温蒸汽甲醛灭菌器_清洗工作站_医用干燥柜_灭菌耗材-环氧乙烷灭菌器_脉动真空压力蒸汽灭菌器_低温等离子灭菌设备_河南省三强医疗器械有限责任公司 | 爆破器材运输车|烟花爆竹运输车|1-9类危险品厢式运输车|湖北江南专用特种汽车有限公司 | 天津热油泵_管道泵_天津高温热油泵-天津市金丰泰机械泵业有限公司【官方网站】 | 铆钉机|旋铆机|东莞旋铆机厂家|鸿佰专业生产气压/油压/自动铆钉机 | 中空玻璃生产线,玻璃加工设备,全自动封胶线,铝条折弯机,双组份打胶机,丁基胶/卧式/立式全自动涂布机,玻璃设备-山东昌盛数控设备有限公司 | 宽带办理,电信宽带,移动宽带,联通宽带,电信宽带办理,移动宽带办理,联通宽带办理 | 电动高尔夫球车|电动观光车|电动巡逻车|电动越野车厂家-绿友机械集团股份有限公司 | 注塑_注塑加工_注塑模具_塑胶模具_注塑加工厂家_深圳环科 | 合肥活动房_安徽活动板房_集成打包箱房厂家-安徽玉强钢结构集成房屋有限公司 | 旅游规划_旅游策划_乡村旅游规划_景区规划设计_旅游规划设计公司-北京绿道联合旅游规划设计有限公司 | 合肥汽车充电桩_安徽充电桩_电动交流充电桩厂家_安徽科帝新能源科技有限公司 | 天津拓展_天津团建_天津趣味运动会_天津活动策划公司-天津华天拓展培训中心 | 英思科GTD-3000EX(美国英思科气体检测仪MX4MX6)百科-北京嘉华众信科技有限公司 | 气动调节阀,电动调节阀,自力式压力调节阀,切断阀「厂家」-浙江利沃夫自控阀门 | 掺铥光纤放大器-C/L波段光纤放大器-小信号光纤放大器-合肥脉锐光电技术有限公司 | MES系统工业智能终端_生产管理看板/安灯/ESOP/静电监控_讯鹏科技 | 纸张环压仪-纸张平滑度仪-杭州纸邦自动化技术有限公司 | 流程管理|流程管理软件|企业流程管理|微宏科技-AlphaFlow_流程管理系统软件服务商 | 合肥防火门窗/隔断_合肥防火卷帘门厂家_安徽耐火窗_良万消防设备有限公司 | 氧氮氢联合测定仪-联测仪-氧氮氢元素分析仪-江苏品彦光电 | 破碎机_上海破碎机_破碎机设备_破碎机厂家-上海山卓重工机械有限公司 | 今日扫码_溯源二维码_产品防伪一物一码_红包墙营销方案 | 无缝钢管-聊城无缝钢管-小口径无缝钢管-大口径无缝钢管 - 聊城宽达钢管有限公司 | wika威卡压力表-wika压力变送器-德国wika代理-威卡总代-北京博朗宁科技 | 保健品OEM贴牌代加工厂家_德州健之源| 便携式表面粗糙度仪-彩屏硬度计-分体式粗糙度仪-北京凯达科仪科技有限公司 | 振动筛-交叉筛-螺旋筛-滚轴筛-正弦筛-方形摇摆筛「新乡振动筛厂家」 | 电主轴,车床电磨头,变频制动电机-博山鸿达特种电机 | ETFE膜结构_PTFE膜结构_空间钢结构_膜结构_张拉膜_浙江萬豪空间结构集团有限公司 | 瑞典Blueair空气净化器租赁服务中心-专注新装修办公室除醛去异味服务! |