从头开始构建您的第一个 Flink 应用程序
教您如何从头开始构建您的第一个 Flink 应用程序。
开发环境准备
Flink可以运行在Linux、Max OS X或Windows上。 Flink应用程序的开发需要本机具备Java 8.x和maven环境。
如果您有Java 8环境,运行以下命令将输出以下版本信息:
$ java -version java version "1.8.0_65" Java(TM) SE Runtime Environment (build 1.8.0_65-b17) Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode) |
如果您有maven环境,运行以下命令将输出以下版本信息:
$ mvn -version Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00) Maven home: /Users/wuchong/dev/maven Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre Default locale: zh_CN, platform encoding: UTF-8 OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac" |
此外,我们推荐ItelliJ IDEA(社区免费版Sufficient)作为Flink应用程序的开发IDE。尽管 Eclipse 是一个选项,但 Eclipse 在混合 Scala 和 Java 项目时存在已知问题,因此不推荐使用 Eclipse。在下一章中,我们将向您展示如何创建 Flink 项目并将其导入 ItelliJ IDEA。
创建 Maven 项目
我们将使用 Flink Maven 原型来创建项目的结构和一些初始默认依赖项。在工作目录下,运行以下命令创建项目:
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.6.1 \ -DgroupId=my-flink-project \ -DartifactId=my-flink-project \ -Dversion=0.1 \ -Dpackage=myflink \ -DinteractiveMode=false |
上面的groupId、artifactId、package都可以编辑到想要的路径。使用上述参数,Maven 自动创建如下项目结构:
$ tree my-flink-project my-flink-project ├── pom.xml └── src └── main ├── java │ └── myflink │ ├── BatchJob.java │ └── StreamingJob.java └── resources └── log4j.properties |
我们的 pom.xml 文件已经包含必要的 Flink 依赖项,并且可以在 src/main/java 下找到几个示例框架。然后我们开始编写我们的第一个 Flink 程序。
编写Flink程序
启动IntelliJ IDEA,选择“导入项目”,选择my-flink-project根目录下的pom.xml文件。按照指南完成项目导入。
在src/main/java/myflink下创建文件SocketWindowWordCount.java
:
package myflink; public class SocketWindowWordCount { public static void main(String[] args) throws Exception { } } |
这个程序还是很基础的,我们一步一步把代码填好。请注意,我们不会签署导入语句,因为 IDE 会自动添加它们。我将在本节末尾向您展示完整的代码。如果您想跳过以下步骤,可以直接将整个代码粘贴到编辑器中。
Flink 程序的第一步是创建StreamExecutionEnvironment
。这是一个入口类,可用于设置参数、创建数据源、提交任务。因此,让我们在 main 函数中添加:
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); |
接下来,我们创建一个数据源,从本地端口 9000 上的套接字读取数据:
DataStream<String> text = env.socketTextStream("localhost", 9000, "\n"); |
这将创建一个字符串类型的 DataStream
。 DataStream
是Flink数据流处理的核心API,定义了一些常用操作(如过滤、转换、聚合、加窗、关联等)。在此示例中,我们感兴趣的是每个单词在给定时间窗口(例如 5 秒的窗口)中出现的次数。为此,我们首先需要将字符串数据(由 Tuple2
表示)解析为单词和时间。第一个字段是单词,第二个字段是数字,数字的初始值为1。为了分析,我们实现了一个flatmap
,因为一个单词可能包含多个单词。数据线。
DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { for (String word : value.split("\\s")) { out.collect(Tuple2.of(word, 1)); } } }); |
然后我们根据字字段(即索引字段号0)对数据流进行分组。这里我们可以简单地使用 keyBy(int index)
方法来获取一个以 word 为键的 Tuple2。
流。然后我们可以在流上指定所需的窗口,并根据窗口中的数据计算结果。在我们的例子中,我们希望每 5 秒聚合一次字数,每个窗口从零开始: 。
DataStream<Tuple2<String, Integer>> windowCounts = wordCounts .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); |
第二次调用.timeWindow()
指定我们需要 5 秒的陷阱窗口。第三次调用返回每个键和窗口的聚合函数 sum
。在我们的例子中,是根据编号字段(即索引字段编号1)添加的。生成的流每 5 秒输出一次这 5 秒内每个单词的出现次数。
最后一件事是将流打印到控制台并开始执行:
windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); |
最后一次调用env.execute
需要启动实际的Flink任务。每个算子操作(例如源创建、聚合、打印)都简单地以图表形式表示了内部算子操作。仅当调用 execute()
时,才会在集群或本地计算机上执行。
以下是完整的代码,部分代码已经过简化(代码也可以在GitHub上找到):
package myflink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class SocketWindowWordCount { public static void main(String[] args) throws Exception { // 创建 execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口 DataStream<String> text = env.socketTextStream("localhost", 9000, "\n"); // 解析数据,按 word 分组,开窗,聚合 DataStream<Tuple2<String, Integer>> windowCounts = text .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { for (String word : value.split("\\s")) { out.collect(Tuple2.of(word, 1)); } } }) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); // 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程 windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } } |
运行程序
要运行示例程序,首先启动netcat以获取输入流从终端:
nc -lk 9000 |
对于 Windows 平台,您可以从 https://nmap.org/ncat/ 安装 ncat 并运行:
ncat -lk 9000 |
然后直接运行 SocketWindowWordCount main 方法。
。
只需在netcat控制台中输入单词,就可以在SocketWindowWordCount
的输出控制台中看到每个单词的词频统计。要查看大于 1 的数字,请在 5 秒内重复键入相同的单词。
版权声明
本文仅代表作者观点,不代表Code前端网立场。
本文系作者Code前端网发表,如需转载,请注明页面地址。
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。