高h调教女m强制高潮,欧洲无码一区二区三区在线观看,天天摸天天做天天爽水多,人妻用嘴含精大口吞精

您的位置:首頁(yè) >資訊 > 正文

環(huán)球速遞!大數(shù)據(jù)NiFi(十八):離線同步MySQL數(shù)據(jù)到HDFS

來(lái)源:騰訊云2023-02-21 20:16:57

?離線同步MySQL數(shù)據(jù)到HDFS

案例:使用NiFi將MySQL中數(shù)據(jù)導(dǎo)入到HDFS中。


【資料圖】

以上案例用到的處理器有“QueryDatabaseTable”、“ConvertAvroToJSON”、“SplitJson”、“PutHDFS”四個(gè)處理器。

一、配置“QueryDatabaseTable”處理器

該處理器主要使用提供的SQL語(yǔ)句或者生成SQL語(yǔ)句來(lái)查詢MySQL中的數(shù)據(jù),查詢結(jié)果轉(zhuǎn)換成Avro格式。該處理器只能運(yùn)行在主節(jié)點(diǎn)上。

關(guān)于“QueryDatabaseTable”處理器的“Properties”配置的說(shuō)明如下:

配置項(xiàng)

默認(rèn)值

允許值

描述

Database Connection Pooling Service(數(shù)據(jù)庫(kù)連接池服務(wù))

用于獲得與數(shù)據(jù)庫(kù)的連接的Controller Service。

Database Type(數(shù)據(jù)庫(kù)類(lèi)型)

Generic

選擇數(shù)據(jù)庫(kù)類(lèi)型。Generic 通用類(lèi)型OracleOracle 12+MS SQL 2012+MS SQL 2008MySQLPostgreSQL

Table Name(表名)

查詢數(shù)據(jù)庫(kù)的表名,當(dāng)使用“Custom Query”時(shí),此為查詢結(jié)果的別名,并作為FlowFile中的屬性。

Columns to Return(返回的列)

查詢返回的列,多個(gè)列使用逗號(hào)分隔。如果列中有特殊名稱(chēng)需要加引號(hào),則所有列都需要加引號(hào)處理。

Additional WHERE clause(where條件)

在構(gòu)建SQL查詢時(shí)添加到WHERE條件中的自定義子句。

Custom Query(自定義SQL查詢)

自定義的SQL語(yǔ)句。該查詢被構(gòu)建成子查詢,設(shè)置后不會(huì)從其他屬性構(gòu)建SQL查詢。自定義SQL不支持Order by查詢。

Maximum-value Columns(最大值列)

指定增量查詢獲取最大值的列,多列使用逗號(hào)分開(kāi)。指定后,這個(gè)處理器只能檢索到添加/更新的行。不能設(shè)置無(wú)法比較大小的列,例如:boolean/bit。如果不指定,則參照表中所有的列來(lái)查詢?nèi)繑?shù)據(jù),這會(huì)對(duì)性能產(chǎn)生影響。

Max Wait Time(最大超時(shí)時(shí)間)

0 seconds

SQL查詢最大時(shí)長(zhǎng),默認(rèn)為0沒(méi)有限制,設(shè)置小于0的時(shí)間默認(rèn)為0。

Fetch Size(拉取數(shù)據(jù)量)

0

每次從查詢結(jié)果中拉取的數(shù)據(jù)量。

Max Rows Per Flow File(每個(gè)FlowFile行數(shù))

0

在一個(gè)FlowFile文件中的數(shù)據(jù)行數(shù)。通過(guò)這個(gè)參數(shù)可以將很大的結(jié)果集分到多個(gè)FlowFile中。默認(rèn)設(shè)置為0,所有結(jié)果存入一個(gè)FlowFile。

Output Batch Size(數(shù)據(jù)輸出批次量)

0

輸出的FlowFile批次數(shù)據(jù)大小,當(dāng)設(shè)置為0代表所有數(shù)據(jù)輸出到下游關(guān)系。如果數(shù)據(jù)量很大,則有可能下游很久沒(méi)有收到數(shù)據(jù),如果設(shè)置了,則每次達(dá)到該數(shù)據(jù)量就釋放數(shù)據(jù),傳輸?shù)较掠巍?/p>

Maximum Number of Fragments(最大片段數(shù))

0

設(shè)置返回的最大數(shù)據(jù)片段數(shù),設(shè)置0默認(rèn)將所有數(shù)據(jù)片段返回,如果表非常大,設(shè)置后可以防止OOM錯(cuò)誤。

Normalize Table/Column Names(標(biāo)準(zhǔn)表/列名)

false

truefalse

是否將列名中不兼容avro的字符修改為兼容avro的字符。例如,冒號(hào)和句點(diǎn)將被更改為下劃線,以構(gòu)建有效的Avro記錄。

Transaction Isolation Level

設(shè)置事務(wù)隔離級(jí)別。

Use Avro Logical Types(使用Avro邏輯類(lèi)型)

false

truefalse

是否對(duì)DECIMAL/NUMBER, DATE, TIME 和 TIMESTAMP 列使用Avro邏輯類(lèi)型。

Default Decimal Precision(Decimal數(shù)據(jù)類(lèi)型位數(shù))

10

當(dāng) DECIMAL/NUMBER 數(shù)據(jù)類(lèi)型轉(zhuǎn)換成Avro類(lèi)型數(shù)據(jù)時(shí),指定的數(shù)據(jù)位數(shù)。

Default Decimal Scale(Decimal 數(shù)據(jù)類(lèi)型小數(shù)位數(shù))

0

當(dāng) DECIMAL/NUMBER 數(shù)據(jù)類(lèi)型轉(zhuǎn)換成Avro類(lèi)型數(shù)據(jù)時(shí),指定的小數(shù)點(diǎn)后的位數(shù)。

Generic 通用類(lèi)型OracleOracle 12+MS SQL 2012+MS SQL 2008MySQLPostgreSQL

Table Name(表名)查詢數(shù)據(jù)庫(kù)的表名,當(dāng)使用“Custom Query”時(shí),此為查詢結(jié)果的別名,并作為FlowFile中的屬性。 Columns to Return (返回的列) 查詢返回的列,多個(gè)列使用逗號(hào)分隔。如果列中有特殊名稱(chēng)需要加引號(hào),則所有列都需要加引號(hào)處理。 Additional WHERE clause (where條件) 在構(gòu)建SQL查詢時(shí)添加到WHERE條件中的自定義子句。 Custom Query (自定義SQL查詢) 自定義的SQL語(yǔ)句。該查詢被構(gòu)建成子查詢,設(shè)置后不會(huì)從其他屬性構(gòu)建SQL查詢。自定義SQL不支持Order by查詢。 Maximum-value Columns (最大值列) 指定增量查詢獲取最大值的列,多列使用逗號(hào)分開(kāi)。指定后,這個(gè)處理器只能檢索到添加/更新的行。不能設(shè)置無(wú)法比較大小的列,例如:boolean/bit。如果不指定,則參照表中所有的列來(lái)查詢?nèi)繑?shù)據(jù),這會(huì)對(duì)性能產(chǎn)生影響。 Max Wait Time(最大超時(shí)時(shí)間)0 seconds SQL查詢最大時(shí)長(zhǎng),默認(rèn)為0沒(méi)有限制,設(shè)置小于0的時(shí)間默認(rèn)為0。 Fetch Size(拉取數(shù)據(jù)量)0 每次從查詢結(jié)果中拉取的數(shù)據(jù)量。 Max Rows Per Flow File(每個(gè)FlowFile行數(shù))0 在一個(gè)FlowFile文件中的數(shù)據(jù)行數(shù)。通過(guò)這個(gè)參數(shù)可以將很大的結(jié)果集分到多個(gè)FlowFile中。默認(rèn)設(shè)置為0,所有結(jié)果存入一個(gè)FlowFile。 Output Batch Size(數(shù)據(jù)輸出批次量)0 輸出的FlowFile批次數(shù)據(jù)大小,當(dāng)設(shè)置為0代表所有數(shù)據(jù)輸出到下游關(guān)系。如果數(shù)據(jù)量很大,則有可能下游很久沒(méi)有收到數(shù)據(jù),如果設(shè)置了,則每次達(dá)到該數(shù)據(jù)量就釋放數(shù)據(jù),傳輸?shù)较掠巍? Maximum Number of Fragments(最大片段數(shù))0 設(shè)置返回的最大數(shù)據(jù)片段數(shù),設(shè)置0默認(rèn)將所有數(shù)據(jù)片段返回,如果表非常大,設(shè)置后可以防止OOM錯(cuò)誤。 Normalize Table/Column Names(標(biāo)準(zhǔn)表/列名)false true false 是否將列名中不兼容avro的字符修改為兼容avro的字符。例如,冒號(hào)和句點(diǎn)將被更改為下劃線,以構(gòu)建有效的Avro記錄。 Transaction Isolation Level 設(shè)置事務(wù)隔離級(jí)別。 Use Avro Logical Types(使用Avro邏輯類(lèi)型)false true false 是否對(duì)DECIMAL/NUMBER, DATE, TIME 和 TIMESTAMP 列使用Avro邏輯類(lèi)型。 Default Decimal Precision(Decimal數(shù)據(jù)類(lèi)型位數(shù))10 當(dāng) DECIMAL/NUMBER 數(shù)據(jù)類(lèi)型轉(zhuǎn)換成Avro類(lèi)型數(shù)據(jù)時(shí),指定的數(shù)據(jù)位數(shù)。 Default Decimal Scale(Decimal 數(shù)據(jù)類(lèi)型小數(shù)位數(shù))0 當(dāng) DECIMAL/NUMBER 數(shù)據(jù)類(lèi)型轉(zhuǎn)換成Avro類(lèi)型數(shù)據(jù)時(shí),指定的小數(shù)點(diǎn)后的位數(shù)。

配置步驟如下:

1、新建“QueryDatabaseTable”處理器

2、配置“SCHEDULING”調(diào)度時(shí)間

這里調(diào)度時(shí)間配置為99999s,讀取數(shù)據(jù)庫(kù),這里讀取一次即可,默認(rèn)0會(huì)不間斷讀取數(shù)據(jù)庫(kù)會(huì)對(duì)服務(wù)器造成非常大壓力。執(zhí)行僅支持“Primary”主節(jié)點(diǎn)運(yùn)行。

3、配置“PROPERTIES”

配置“Database Connection Pooling Service”選擇創(chuàng)建,在彈出頁(yè)面中可以按照默認(rèn)選擇直接點(diǎn)擊“Create”。

點(diǎn)擊“->”繼續(xù)配置MySQL連接:

在彈出的頁(yè)面中填入:

連接MysqlURL:

jdbc:mysql://192.168.179.5:3306/mynifi?characterEncoding=UTF-8&useSSL=false

MySQL驅(qū)動(dòng)類(lèi):com.mysql.jdbc.DriverMySQL jar包路徑:需要提前在NiFI集群各個(gè)節(jié)點(diǎn)上創(chuàng)建對(duì)應(yīng)目錄并上傳jar包。連接mysql的用戶名和密碼。

通過(guò)以上配置好連接mysql如下:

配置其他屬性如下:

二、???????配置“ConvertAvroToJSON”處理器

此處理器是將二進(jìn)制Avro記錄轉(zhuǎn)換為JSON對(duì)象,提供了一個(gè)從Avro字段到JSON字段的直接映射,這樣得到的JSON將具有與Avro文檔相同的層次結(jié)構(gòu)。輸出的JSON編碼為UTF-8編碼,如果傳入的FlowFile包含多個(gè)Avro記錄,則轉(zhuǎn)換后的FlowFile是一個(gè)含有所有Avro記錄的JSON數(shù)組或一個(gè)JSON對(duì)象序列(每個(gè)Json對(duì)象單獨(dú)成行)。如果傳入的FlowFile不包含任何記錄,則輸出一個(gè)空J(rèn)SON對(duì)象。

關(guān)于“ConvertAvroToJSON”處理器的“Properties”配置的說(shuō)明如下:

配置項(xiàng)

默認(rèn)值

允許值

描述

JSON container options(Json選擇)

array

nonearray

如何解析Json對(duì)象,none:解析Json將每個(gè)Json對(duì)象寫(xiě)入新行。array:解析到的json存入JsonArray一個(gè)對(duì)象

Wrap Single Record(數(shù)據(jù)庫(kù)類(lèi)型)

false

truefalse

指定解析到的空記錄或者單條記錄是否按照“JSON container options”配置包裝對(duì)象。

Avro schema(表名)

如果Avro數(shù)據(jù)沒(méi)有Schema信息,需要配置。

配置步驟如下:

1、創(chuàng)建“ConvertAvroToJSON”處理器

2、配置“PROPERTIES”

3、連接“QueryDatabaseTable”處理器和“CovertAvroToJSON”處理器

連接好兩個(gè)處理器后,可以配置“Connection”為負(fù)載均衡方式傳遞數(shù)據(jù):

三、???????配置“SplitJson”處理器

該處理器使用JsonPath表達(dá)式指定需要的Json數(shù)組元素,將Json數(shù)組中的多個(gè)Json對(duì)象切分出來(lái),形成多個(gè)FlowFile。每個(gè)生成的FlowFile都由指定數(shù)組中的一個(gè)元素組成,并傳輸?shù)疥P(guān)系"split",原始文件傳輸?shù)疥P(guān)系"original"。如果沒(méi)有找到指定的JsonPath,或者沒(méi)有對(duì)數(shù)組元素求值,則將原始文件路由到"failure",不會(huì)生成任何文件。

關(guān)于“SplitJson”處理器的“Properties”配置的說(shuō)明如下:

配置項(xiàng)

默認(rèn)值

允許值

描述

JsonPath Expression(Json表達(dá)式)

一個(gè)JsonPath表達(dá)式,它指定用以分割的數(shù)組元素。

Null Value Representation(Null值表示)

empty string

empty stringthe string "null"

指定結(jié)果為空值時(shí)的表示形式。

配置步驟如下:

1、創(chuàng)建“SplitJson”處理器

2、配置“PROPERTIES”

3、連接“ConvertAvroToJSON”處理器和“SplitJson”處理器

連接后,連接關(guān)系選擇“success”:

同時(shí)配置“ConverAvroToJSON”處理失敗的數(shù)據(jù)自動(dòng)終止:

四、配置“PutHDFS”處理器

該處理器是將FlowFile數(shù)據(jù)寫(xiě)入到HDFS分布式文件系統(tǒng)中。關(guān)于“PutHDFS”處理器的“Properties”主要配置的說(shuō)明如下:

配置項(xiàng)

默認(rèn)值

允許值

描述

Hadoop Configuration Resources(Hadoop配置)

nonearray

HDFS配置文件,一個(gè)文件或者由逗號(hào)分隔的多個(gè)文件。不配置將在ClassPath中尋找‘core-site.xml’或者‘hdfs-site.xml’文件。

Directory(目錄)

需要寫(xiě)入文件的HDFS父目錄。如果目錄不存在,將創(chuàng)建該目錄。

Conflict Resolution Strategy(沖突解決)

fail

replaceignorefailappend

指示當(dāng)輸出目錄中已經(jīng)存在同名文件時(shí)如何處理。

配置步驟如下:

1、創(chuàng)建“PutHDFS”處理器

2、配置“PROPERTIES”

注意:以上需要在各個(gè)NiFi集群節(jié)點(diǎn)上創(chuàng)建“/root/test”目錄,并且在該目錄下上傳hdfs-site.xml和core-site.xml文件。

3、連接“SplitJson”處理器和“PutHDFS”處理器

同時(shí)設(shè)置“SplitJson”處理器中“failure”和“original”數(shù)據(jù)關(guān)系自動(dòng)終止。

設(shè)置“PutHDFS”處理器“success”和“failure”數(shù)據(jù)關(guān)系自動(dòng)終止:

配置好的連接關(guān)系如下:

五、??????????????運(yùn)行測(cè)試

1、在MySQL創(chuàng)建庫(kù)“mynifi”,并且創(chuàng)建表“test1”,向表中插入10條數(shù)據(jù)

mysql> create database mynifi;Query OK, 1 row affected (0.02 sec)mysql> use mynifi;Database changedmysql> create table test1(id int,name varchar(255),age int );Query OK, 0 rows affected (0.07 sec)mysql> insert into test1 values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tt",22)

2、首先啟動(dòng)“QueryDatabaseTable”處理器觀察隊(duì)列數(shù)據(jù)

3、單獨(dú)啟動(dòng)“ConvertAvroToJson”處理器觀察隊(duì)列數(shù)據(jù)

4、單獨(dú)啟動(dòng)“SplitJson”處理器觀察隊(duì)列數(shù)據(jù)

5、單獨(dú)啟動(dòng)“PutHDFS”處理器觀察HDFS對(duì)應(yīng)目錄數(shù)據(jù)

查看數(shù)據(jù):

注意:

如果在“QueryDatabaseTable”處理器中設(shè)置增屬性“Maximum-value Columns”為id,那么每次查詢都是大于id的增量數(shù)據(jù)。如果想要存入HDFS文件為多行而不是一行,可以將“CovertAvroToJson”處理器屬性“JSON container options”設(shè)置為none,直接解析Avro文件得到一個(gè)個(gè)json數(shù)據(jù),然后直接連接“PutHDFS”處理器即可。

最近更新

人人妻人人澡人人爽| 国产精品视频一区二区三区不卡| 精品毛片乱码1区2区3区| 国产老熟妇精品观看| 粉嫩虎白扒开小泬| 亚洲欧洲∨国产一区二区三区| 女人做爰全过程免费观看美女| 好男人好资源影视在线| 2012国语高清完整版在线观看| 精品国品一二三产品区别在线观看 | 人妻夜夜爽天天爽三区麻豆av网站 | 久久久久国色av免费观看| 精品国产v无码大片在线观看 | 体验区试看120秒啪啪免费| 国产精品人成视频免费播放 | 老熟女草bx×| 亚洲色无码a片一区二区麻豆| 最近免费中文字幕大全高清mv| 国产成人亚洲精品无码h在线| 护士长在办公室躁bd| 日韩AV人人夜夜澡人人爽| 国产V片在线播放免费无码| 啊灬啊灬啊灬快灬深用力a片| 亚洲第一成人网站| 老熟女草bx×| 久久精品国产精品国产精品污| 裸体女模露出生殖部位| 无码免费视频aaaaaaaa片| 野花日本免费观看高清电影8| 无码A级毛片免费视频内谢5J| 亚洲熟悉妇女XXX妇女AV| 名门嫡姝-213大h慎入| 侯府荡女h叶凝欢h| 国产免费一区二区三区免费视频| 国产福利一区二区三区在线观看 | 被三个黑人折腾折惨叫| 亚洲精品久久无码老熟妇| 性裸交a片a∨天传媒公司| 狂躁美女大bbbbbb视频| 日韩精品无码人成视频手机| 98国产精品综合一区二区三区|