从头开始构建您的第一个 Flink 应用程序
教您如何从头开始构建您的第一个 Flink 应用程序。
开发环境准备
Flink可以运行在Linux、Max OS X或Windows上。 Flink应用程序的开发需要本机具备Java 8.x和maven环境。
如果您有Java 8环境,运行以下命令将输出以下版本信息:
$ java -version java version "1.8.0_65" Java(TM) SE Runtime Environment (build 1.8.0_65-b17) Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode) |
如果您有maven环境,运行以下命令将输出以下版本信息:
$ mvn -version Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00) Maven home: /Users/wuchong/dev/maven Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre Default locale: zh_CN, platform encoding: UTF-8 OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac" |
此外,我们推荐ItelliJ IDEA(社区免费版Sufficient)作为Flink应用程序的开发IDE。尽管 Eclipse 是一个选项,但 Eclipse 在混合 Scala 和 Java 项目时存在已知问题,因此不推荐使用 Eclipse。在下一章中,我们将向您展示如何创建 Flink 项目并将其导入 ItelliJ IDEA。
创建 Maven 项目
我们将使用 Flink Maven 原型来创建项目的结构和一些初始默认依赖项。在工作目录下,运行以下命令创建项目:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.1 \
-DgroupId=my-flink-project \
-DartifactId=my-flink-project \
-Dversion=0.1 \
-Dpackage=myflink \
-DinteractiveMode=false |
上面的groupId、artifactId、package都可以编辑到想要的路径。使用上述参数,Maven 自动创建如下项目结构:
$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
└── main
├── java
│ └── myflink
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties |
我们的 pom.xml 文件已经包含必要的 Flink 依赖项,并且可以在 src/main/java 下找到几个示例框架。然后我们开始编写我们的第一个 Flink 程序。
编写Flink程序
启动IntelliJ IDEA,选择“导入项目”,选择my-flink-project根目录下的pom.xml文件。按照指南完成项目导入。
在src/main/java/myflink下创建文件SocketWindowWordCount.java:
package myflink;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
}
} |
这个程序还是很基础的,我们一步一步把代码填好。请注意,我们不会签署导入语句,因为 IDE 会自动添加它们。我将在本节末尾向您展示完整的代码。如果您想跳过以下步骤,可以直接将整个代码粘贴到编辑器中。
Flink 程序的第一步是创建StreamExecutionEnvironment。这是一个入口类,可用于设置参数、创建数据源、提交任务。因此,让我们在 main 函数中添加:
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); |
接下来,我们创建一个数据源,从本地端口 9000 上的套接字读取数据:
DataStream<String> text = env.socketTextStream("localhost", 9000, "\n"); |
这将创建一个字符串类型的 DataStream 。 DataStream是Flink数据流处理的核心API,定义了一些常用操作(如过滤、转换、聚合、加窗、关联等)。在此示例中,我们感兴趣的是每个单词在给定时间窗口(例如 5 秒的窗口)中出现的次数。为此,我们首先需要将字符串数据(由 Tuple2 表示)解析为单词和时间。第一个字段是单词,第二个字段是数字,数字的初始值为1。为了分析,我们实现了一个flatmap,因为一个单词可能包含多个单词。数据线。
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
}); |
然后我们根据字字段(即索引字段号0)对数据流进行分组。这里我们可以简单地使用 keyBy(int index) 方法来获取一个以 word 为键的 Tuple2。 流。然后我们可以在流上指定所需的窗口,并根据窗口中的数据计算结果。在我们的例子中,我们希望每 5 秒聚合一次字数,每个窗口从零开始: 。
DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1); |
第二次调用.timeWindow() 指定我们需要 5 秒的陷阱窗口。第三次调用返回每个键和窗口的聚合函数 sum。在我们的例子中,是根据编号字段(即索引字段编号1)添加的。生成的流每 5 秒输出一次这 5 秒内每个单词的出现次数。
最后一件事是将流打印到控制台并开始执行:
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount"); |
最后一次调用env.execute需要启动实际的Flink任务。每个算子操作(例如源创建、聚合、打印)都简单地以图表形式表示了内部算子操作。仅当调用 execute() 时,才会在集群或本地计算机上执行。
以下是完整的代码,部分代码已经过简化(代码也可以在GitHub上找到):
package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// 创建 execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口
DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");
// 解析数据,按 word 分组,开窗,聚合
DataStream<Tuple2<String, Integer>> windowCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
} |
运行程序
要运行示例程序,首先启动netcat以获取输入流从终端:
nc -lk 9000 |
对于 Windows 平台,您可以从 https://nmap.org/ncat/ 安装 ncat 并运行:
ncat -lk 9000 |
然后直接运行 SocketWindowWordCount main 方法。 。
只需在netcat控制台中输入单词,就可以在SocketWindowWordCount的输出控制台中看到每个单词的词频统计。要查看大于 1 的数字,请在 5 秒内重复键入相同的单词。
![]()
版权声明
本文仅代表作者观点,不代表Code前端网立场。
本文系作者Code前端网发表,如需转载,请注明页面地址。
code前端网
