借助Docker学习大数据:Flink

借助学习大数据:Flink
注意:本博文基于WSL2 & ,如无法达到实验效果,请注意环境差异 。如果你使用的是、虚拟机等方式,注意下文提到的 。
WSL2安装:
本文注重实验,原理可参考:
一、Flink集群搭建 1.1 镜像下载
首先下载Flink镜像
docker pull flink # 获取镜像docker images# 查看下载的镜像
1.2 集群搭建
我们可以直接运行or
方式分别如下:
docker run --name flink_jobmanager -d -t flink jobmanager # JobManagerdocker run --name flink_taskmanager -d -t flink taskmanager # TaskManager
我们这里直接通过 的方式运行一个集群:
介绍
首先新建一个文件夹用于存放yml文件 。这里我在WSL2的home路径新建一个 -flink 文件夹,并在该文件夹中新建 -.yml 文件,内容如下:
version: "2.1"services:jobmanager:image: ${FLINK_DOCKER_IMAGE_NAME:-flink}expose:- "6123"ports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagertaskmanager:image: ${FLINK_DOCKER_IMAGE_NAME:-flink}expose:- "6121"- "6122"depends_on:- jobmanagercommand: taskmanagerlinks:- "jobmanager:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=jobmanager
比如我的文件如下:

借助Docker学习大数据:Flink

文章插图
version: "2.1"services:jobmanager:image: flinkexpose:- "6123"ports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagertaskmanager:image: flinkexpose:- "6121"- "6122"depends_on:- jobmanagercommand: taskmanagerlinks:- "jobmanager:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=jobmanager
创建完成,直接在该目录运行如下命令启动即可:
docker-compose up -d
使用浏览器打开 :8081 即可:
当然我们也可以直接在 看到集群的信息,发现是 11 ,如果想要扩展可以通过如下命令:
docker-compose scale taskmanager=
二、Java 编程2.1 Maven
1.11.1org.apache.flinkflink-streaming-java_2.12${flink.version}org.apache.flinkflink-java${flink.version}org.apache.flinkflink-clients_2.12${flink.version}org.apache.flinkflink-core${flink.version}
2.2 Java
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;/*** @author Creek*/@SuppressWarnings("serial")public class WordCount {public static void main(String[] args) throws Exception {// the host and the port to connect tofinal String hostname;final int port;try {final ParameterTool params = ParameterTool.fromArgs(args);// 注意:WSL2 的 hostname 不是localhost,可以在WSL2中输入ifconfig获得//hostname = params.has("hostname") ? params.get("hostname") : "localhost";hostname = params.has("hostname") ? params.get("hostname") : "172.31.61.151";port = params.getInt("port");} catch (Exception e) {System.err.println("No port specified. Please run 'SocketWindowWordCount " +"--hostname--port ', where hostname (localhost by default) " +"and port is the address of the text server");System.err.println("To start a simple text server, run 'netcat -l ' and " +"type the input text into the command line");return;}// get the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// get input data by connecting to the socketDataStream> text = env.socketTextStream(hostname, port, "\n");// parse the data, group it, window it, and aggregate the countsDataStream windowCounts = text.flatMap(new FlatMapFunction, WordWithCount>() {@Overridepublic void flatMap(String value, Collector out) {for (String word : value.split("\\s")) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(5)).reduce(new ReduceFunction() {@Overridepublic WordWithCount reduce(WordWithCount a, WordWithCount b) {return new WordWithCount(a.word, a.count + b.count);}});// print the results with a single thread, rather than in parallelwindowCounts.print().setParallelism(1);env.execute("Socket Window WordCount");}// ------------------------------------------------------------------------/*** Data type for words with count.*/public static class WordWithCount {public String word;public long count;public WordWithCount() {}public WordWithCount(String word, long count) {this.word = word;this.count = count;}@Overridepublic String toString() {return word + " : " + count;}}}
这里我们配置args如下:
2.3 监听9000端口
在WSL2中
netcat -l 9000 # nc -l 9000
运行main函数,在终端随便输入几个单词:
效果如下:
需要指出的是:现在你看到终端有结果,并不是借助flink运行的,如果你把运行的flink的集群关掉,仍然可以看到结果 。
下面我们把代码打包,放入集群中运行查看效果 。
三、打Jar包,提交集群
使用IDEA打包,找到jar包,右键打开文件位置
在flink中提交
填写主类信息、参数信息,
发现正在运行:
【借助Docker学习大数据:Flink】参考资料: