一、SpringBoot集成Flink
其實(shí)沒(méi)什么特別的,就把Flink依賴的包在pom引入就行了。只是FlinkTask的寫(xiě)法要小調(diào)整下,把相關(guān)依賴交給spring管理就行。
然后如果放棄Flink的Dashboard端監(jiān)控task執(zhí)行相關(guān)信息,那也可以在SpringBoot的啟動(dòng)類里調(diào)用就行,但是可能出現(xiàn)task的相關(guān)對(duì)象沒(méi)有注入,這種都是小問(wèn)題(實(shí)際就是springboot啟動(dòng)完成再調(diào)用,或者通過(guò)自動(dòng)任務(wù)調(diào)用。也可以在springBoot的入口類用@ComponentScan注解掃描flinkTask所在的目錄)。
實(shí)際更瀟灑一點(diǎn)的做法可以將flinkTask的信息存表,通過(guò)springBoot的接口調(diào)用restfullApi接口,接口里調(diào)用task,甚至可以做task的啟停、線程監(jiān)控(接口里開(kāi)線程調(diào)用task)。
二、FlinkTask寫(xiě)法調(diào)整
@Component
@Slf4j
public class JianGongStopCarTask {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<JGParkingLotInfo> dataStream = env.addSource(new JGParkingLotInfoSource());
// 獲取到數(shù)據(jù)之后轉(zhuǎn)換格式 此處不做轉(zhuǎn)換
SingleOutputStreamOperator<JGParkingLotInfo> jgParkingLotInfoSingleOutputStreamOperator = dataStream.map(jgParkingLotInfo -> jgParkingLotInfo);
jgParkingLotInfoSingleOutputStreamOperator.addSink(new SinkToMySQL());
env.execute();
}
}
PS:
實(shí)際就是@Component這個(gè)注解,然后這個(gè)示例里的JGParkingLotInfoSource、SinkToMySQL類都需要加這個(gè)注解。其他就沒(méi)有什么調(diào)整了,然后如果還有其他依賴沒(méi)有,使用hutool的SpringUtil進(jìn)行g(shù)et對(duì)象就行。
三、打包插件
<plugins>
<!-- 編譯插件 -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- spring boot 項(xiàng)目打包
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>-->
<!-- Flink打包方式一 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.easylinkin.dc.olap.JianGongStopCarTask</mainClass>
</manifest>
</archive>
<!-- 打包依賴 -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- flink打包方式二
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>module-info.class</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.easylinkin.dc.olap.JianGongStopCarTask</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
-->
<!-- flink打包方式三
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.easylinkin.dc.olap.JianGongStopCarTask</mainClass>
<useUniqueVersions>false</useUniqueVersions>
<addClasspath>true</addClasspath>
<classpathPrefix>./lib/</classpathPrefix>
</manifest>
</archive>
<excludes>
<exclude>module-info.class</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<excludeTransitive>false</excludeTransitive>
<stripVersion>false</stripVersion>
</configuration>
</execution>
</executions>
</plugin>-->
</plugins>
除了打成SpringBoot用springboot的插件打包,flinkTask的打包有3種方式,方式三適合lib包提前傳到task指定的依賴存儲(chǔ)目錄。這樣上傳flinkTask就很小。
方式二是官方推薦FlinkTask的打包方式,地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/configuration/maven/
說(shuō)一千道一萬(wàn)就是因?yàn)榇虬腗ETA-INF下的MANIFEST.MF文件的內(nèi)容有區(qū)別。springBoot項(xiàng)目的這個(gè)文件有自己的JarLauncher。
FlinkTask的jar這個(gè)文件內(nèi)容
四、Flink的上傳與運(yùn)行
1、上傳并命令運(yùn)行
配置好flink環(huán)境,命令就是
flink run WordCount.jar
2、Flink管理大屏上傳運(yùn)行
點(diǎn)擊“Submit”
總結(jié)
這樣處理應(yīng)該是最優(yōu)雅的了,task的寫(xiě)法改動(dòng)也小。
官方用的是方式二打包,實(shí)際我覺(jué)得要是依賴特多,用方式三打包,然后將依賴的jar傳到flink運(yùn)行環(huán)境flinkTask指定的目錄下應(yīng)該也不錯(cuò)(flinkTask的包就變小了)
flink的這個(gè)計(jì)算監(jiān)控真香
所以基本還是就當(dāng)springBoot集成flink的項(xiàng)目一樣開(kāi)發(fā)吧,在打包的時(shí)候注意切換插件就ok了。就分享到這里,up!
到此這篇關(guān)于SpringBoot超詳細(xì)講解集成Flink的部署與打包方法的文章就介紹到這了,更多相關(guān)SpringBoot Flink內(nèi)容請(qǐng)搜索html5模板網(wǎng)以前的文章希望大家以后多多支持html5模板網(wǎng)!