久久久久久久av_日韩在线中文_看一级毛片视频_日本精品二区_成人深夜福利视频_武道仙尊动漫在线观看

  1. <legend id='42AN6'><style id='42AN6'><dir id='42AN6'><q id='42AN6'></q></dir></style></legend>

        <bdo id='42AN6'></bdo><ul id='42AN6'></ul>
    1. <i id='42AN6'><tr id='42AN6'><dt id='42AN6'><q id='42AN6'><span id='42AN6'><b id='42AN6'><form id='42AN6'><ins id='42AN6'></ins><ul id='42AN6'></ul><sub id='42AN6'></sub></form><legend id='42AN6'></legend><bdo id='42AN6'><pre id='42AN6'><center id='42AN6'></center></pre></bdo></b><th id='42AN6'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='42AN6'><tfoot id='42AN6'></tfoot><dl id='42AN6'><fieldset id='42AN6'></fieldset></dl></div>

    2. <small id='42AN6'></small><noframes id='42AN6'>

      <tfoot id='42AN6'></tfoot>

      Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 數(shù)據(jù)庫

      Google Dataflow (Apache beam) JdbcIO bulk insert into mysql database(Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 數(shù)據(jù)庫)

      • <tfoot id='dIshH'></tfoot>

        <small id='dIshH'></small><noframes id='dIshH'>

        <legend id='dIshH'><style id='dIshH'><dir id='dIshH'><q id='dIshH'></q></dir></style></legend>

            <tbody id='dIshH'></tbody>

              <bdo id='dIshH'></bdo><ul id='dIshH'></ul>

                <i id='dIshH'><tr id='dIshH'><dt id='dIshH'><q id='dIshH'><span id='dIshH'><b id='dIshH'><form id='dIshH'><ins id='dIshH'></ins><ul id='dIshH'></ul><sub id='dIshH'></sub></form><legend id='dIshH'></legend><bdo id='dIshH'><pre id='dIshH'><center id='dIshH'></center></pre></bdo></b><th id='dIshH'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='dIshH'><tfoot id='dIshH'></tfoot><dl id='dIshH'><fieldset id='dIshH'></fieldset></dl></div>
                本文介紹了Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 數(shù)據(jù)庫的處理方法,對(duì)大家解決問題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!

                問題描述

                我正在使用 Dataflow SDK 2.X Java API (Apache Beam SDK) 將數(shù)據(jù)寫入 mysql.我已經(jīng)基于 Apache Beam SDK 文檔 使用數(shù)據(jù)流將數(shù)據(jù)寫入 mysql.它一次插入單行,因?yàn)槲倚枰獙?shí)現(xiàn)批量插入.我在官方文檔中找不到任何啟用批量插入模式的選項(xiàng).

                I'm using Dataflow SDK 2.X Java API ( Apache Beam SDK) to write data into mysql. I've created pipelines based on Apache Beam SDK documentation to write data into mysql using dataflow. It inserts single row at a time where as I need to implement bulk insert. I do not find any option in official documentation to enable bulk inset mode.

                想知道是否可以在數(shù)據(jù)流管道中設(shè)置批量插入模式?如果是,請(qǐng)讓我知道我需要在下面的代碼中更改什么.

                Wondering, if it's possible to set bulk insert mode in dataflow pipeline? If yes, please let me know what I need to change in below code.

                 .apply(JdbcIO.<KV<Integer, String>>write()
                      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                            "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
                          .withUsername("username")
                          .withPassword("password"))
                      .withStatement("insert into Person values(?, ?)")
                      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
                        public void setParameters(KV<Integer, String> element, PreparedStatement query) {
                          query.setInt(1, kv.getKey());
                          query.setString(2, kv.getValue());
                        }
                      })
                

                推薦答案

                EDIT 2018-01-27:

                事實(shí)證明,這個(gè)問題與 DirectRunner 有關(guān).如果您使用 DataflowRunner 運(yùn)行相同的管道,您應(yīng)該獲得實(shí)際上多達(dá) 1,000 條記錄的批次.DirectRunner 總是在分組操作后創(chuàng)建大小為 1 的包.

                It turns out that this issue is related to the DirectRunner. If you run the same pipeline using the DataflowRunner, you should get batches that are actually up to 1,000 records. The DirectRunner always creates bundles of size 1 after a grouping operation.

                原答案:

                我在使用 Apache Beam 的 JdbcIO 寫入云數(shù)據(jù)庫時(shí)遇到了同樣的問題.問題是,雖然 JdbcIO 確實(shí)支持批量寫入多達(dá) 1,000 條記錄,但我從未真正見過它一次寫入超過 1 行(我不得不承認(rèn):這總是在開發(fā)環(huán)境中使用 DirectRunner).

                I've run into the same problem when writing to cloud databases using Apache Beam's JdbcIO. The problem is that while JdbcIO does support writing up to 1,000 records in one batch, in I have never actually seen it write more than 1 row at a time (I have to admit: This was always using the DirectRunner in a development environment).

                因此,我在 JdbcIO 中添加了一個(gè)功能,您可以通過將數(shù)據(jù)分組在一起并將每個(gè)組寫為一個(gè)批次來自己控制批次的大小.下面是基于 Apache Beam 原始 WordCount 示例的如何使用此功能的示例.

                I have therefore added a feature to JdbcIO where you can control the size of the batches yourself by grouping your data together and writing each group as one batch. Below is an example of how to use this feature based on the original WordCount example of Apache Beam.

                p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
                    // Count words in input file(s)
                    .apply(new CountWords())
                    // Format as text
                    .apply(MapElements.via(new FormatAsTextFn()))
                    // Make key-value pairs with the first letter as the key
                    .apply(ParDo.of(new FirstLetterAsKey()))
                    // Group the words by first letter
                    .apply(GroupByKey.<String, String> create())
                    // Get a PCollection of only the values, discarding the keys
                    .apply(ParDo.of(new GetValues()))
                    // Write the words to the database
                    .apply(JdbcIO.<String> writeIterable()
                            .withDataSourceConfiguration(
                                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
                            .withStatement(INSERT_OR_UPDATE_SQL)
                            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
                

                與 JdbcIO 的普通寫入方法的不同之處在于新方法 writeIterable()PCollection> 作為輸入而不是 <代碼>PCollection.每個(gè) Iterable 都作為一批寫入數(shù)據(jù)庫.

                The difference with the normal write-method of JdbcIO is the new method writeIterable() that takes a PCollection<Iterable<RowT>> as input instead of PCollection<RowT>. Each Iterable is written as one batch to the database.

                可以在此處找到具有此附加功能的 JdbcIO 版本:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java

                The version of JdbcIO with this addition can be found here: https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java

                可以在此處找到包含上述示例的整個(gè)示例項(xiàng)目:https://github.com/olavloite/spanner-beam-example

                The entire example project containing the example above can be found here: https://github.com/olavloite/spanner-beam-example

                (Apache Beam 上還有一個(gè)拉取請(qǐng)求未決,以將其包含在項(xiàng)目中)

                (There is also a pull request pending on Apache Beam to include this in the project)

                這篇關(guān)于Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 數(shù)據(jù)庫的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!

                【網(wǎng)站聲明】本站部分內(nèi)容來源于互聯(lián)網(wǎng),旨在幫助大家更快的解決問題,如果有圖片或者內(nèi)容侵犯了您的權(quán)益,請(qǐng)聯(lián)系我們刪除處理,感謝您的支持!

                相關(guān)文檔推薦

                How to use windowing functions efficiently to decide next N number of rows based on N number of previous values(如何有效地使用窗口函數(shù)根據(jù) N 個(gè)先前值來決定接下來的 N 個(gè)行)
                reuse the result of a select expression in the quot;GROUP BYquot; clause?(在“GROUP BY中重用選擇表達(dá)式的結(jié)果;條款?)
                Does ignore option of Pyspark DataFrameWriter jdbc function ignore entire transaction or just offending rows?(Pyspark DataFrameWriter jdbc 函數(shù)的 ignore 選項(xiàng)是忽略整個(gè)事務(wù)還是只是有問題的行?) - IT屋-程序員軟件開發(fā)技
                Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array(使用 INSERT INTO table ON DUPLICATE KEY 時(shí)出錯(cuò),使用 for 循環(huán)數(shù)組)
                pyspark mysql jdbc load An error occurred while calling o23.load No suitable driver(pyspark mysql jdbc load 調(diào)用 o23.load 時(shí)發(fā)生錯(cuò)誤 沒有合適的驅(qū)動(dòng)程序)
                How to integrate Apache Spark with MySQL for reading database tables as a spark dataframe?(如何將 Apache Spark 與 MySQL 集成以將數(shù)據(jù)庫表作為 Spark 數(shù)據(jù)幀讀取?)
                  <tbody id='OecXG'></tbody>
              • <tfoot id='OecXG'></tfoot>
              • <i id='OecXG'><tr id='OecXG'><dt id='OecXG'><q id='OecXG'><span id='OecXG'><b id='OecXG'><form id='OecXG'><ins id='OecXG'></ins><ul id='OecXG'></ul><sub id='OecXG'></sub></form><legend id='OecXG'></legend><bdo id='OecXG'><pre id='OecXG'><center id='OecXG'></center></pre></bdo></b><th id='OecXG'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='OecXG'><tfoot id='OecXG'></tfoot><dl id='OecXG'><fieldset id='OecXG'></fieldset></dl></div>

                      • <bdo id='OecXG'></bdo><ul id='OecXG'></ul>
                        <legend id='OecXG'><style id='OecXG'><dir id='OecXG'><q id='OecXG'></q></dir></style></legend>

                          <small id='OecXG'></small><noframes id='OecXG'>

                          主站蜘蛛池模板: 色综合天天综合网国产成人网 | 偷牌自拍 | 国产精品自拍av | 中文字幕一区二区三区在线观看 | 亚洲国产一区二区三区四区 | 日本亚洲一区 | 999久久久| 国产成人精品久久久 | 欧美精品一区二区三区在线 | 国产真实精品久久二三区 | 青青草视频网 | 特级丰满少妇一级aaaa爱毛片 | 久久久久久中文字幕 | 97精品久久 | 天天干天天操天天看 | 国产成人精品在线 | 日韩免费视频一区二区 | 奇米影视首页 | 亚洲欧美在线观看视频 | 欧美激情一区二区三区 | 日韩精品久久 | 男女羞羞的网站 | 99精品亚洲国产精品久久不卡 | 欧美成人猛片aaaaaaa | 一区二区三区四区电影视频在线观看 | 欧美精品三区 | 免费看的av| 亚洲综合成人网 | 国产精品欧美一区二区 | 91久久国产综合久久 | 国产91中文 | 精品视频一区二区三区 | 精品美女在线观看视频在线观看 | 精品国产一区二区三区免费 | 久久国产精品视频 | 精品视频久久久久久 | 欧美成年网站 | 国产精品久久久久久久7电影 | 久久毛片| 九色综合网| 亚洲免费视频网址 |