我的第一个Flink程序 - 三种WordCount方式
发布人:shili8
发布时间:2025-03-09 06:18
阅读次数:0
**我的第一个Flink程序 - 三种WordCount方式**
作为一名新手,刚接触到Apache Flink这个流式处理框架,我自然会先尝试着写一个简单的WordCount程序。这种程序是流式处理的一个经典例子,它可以帮助我们理解如何使用Flink来处理数据流。
在本文中,我们将介绍三种不同的WordCount方式:第一种是使用Flink提供的`DataSet` API,第二种是使用`DataStream` API,第三种是使用`Table` API。每种方式都有其特点和优势。
### 第一种方式:使用DataSet API首先,让我们来看一下如何使用Flink提供的`DataSet` API来实现WordCount程序。
java// WordCount.javaimport org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; public class WordCount { public static void main(String[] args) throws Exception { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //读取数据源(假设是文本文件) DataSettext = env.readTextFile("input.txt"); // 将文本转换为单词 DataSet words = text.map(new MapFunction () { @Override public String map(String value) throws Exception { return value.toLowerCase(); } }); // 统计每个单词的出现次数 DataSet > wordCounts = words.groupBy(0).aggregate( new ReduceFunction () { @Override public Long reduce(Long aLong, Long aLong1) throws Exception { return aLong + aLong1; } }, new GroupReduceFunction >() { @Override public void reduce(Iterable values, Collector > out) throws Exception { long sum =0L; for (Long value : values) { sum += value; } out.collect(new Tuple2<>("", sum)); } }); // 输出结果 wordCounts.print(); } }
在这个例子中,我们首先读取一个文本文件,然后将其转换为单词。接着,我们使用`groupBy`和`aggregate`方法来统计每个单词的出现次数。
### 第二种方式:使用DataStream API接下来,让我们看一下如何使用Flink提供的`DataStream` API来实现WordCount程序。
java// WordCount.javaimport org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class WordCount { public static void main(String[] args) throws Exception { // 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //读取数据源(假设是文本文件) DataStreamtext = env.readTextFile("input.txt"); // 将文本转换为单词 DataStream words = text.map(new MapFunction () { @Override public String map(String value) throws Exception { return value.toLowerCase(); } }); // 统计每个单词的出现次数 DataStream > wordCounts = words.keyBy(0).sum(1); // 输出结果 wordCounts.print(); } }
在这个例子中,我们首先读取一个文本文件,然后将其转换为单词。接着,我们使用`keyBy`和`sum`方法来统计每个单词的出现次数。
### 第三种方式:使用Table API最后,让我们看一下如何使用Flink提供的`Table` API来实现WordCount程序。
java// WordCount.javaimport org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; public class WordCount { public static void main(String[] args) throws Exception { // 创建表执行环境 TableEnvironment env = TableEnvironment.create(); //读取数据源(假设是文本文件) Table text = env.readTable("input.txt"); // 将文本转换为单词 Table words = text.map(new MapFunction() { @Override public String map(String value) throws Exception { return value.toLowerCase(); } }); // 统计每个单词的出现次数 Table wordCounts = words.groupBy(0).aggregate( new ReduceFunction () { @Override public Long reduce(Long aLong, Long aLong1) throws Exception { return aLong + aLong1; } }, new GroupReduceFunction () { @Override public void reduce(Iterable values, Collector out) throws Exception { long sum =0L; for (Long value : values) { sum += value; } out.collect(new Table() { @Override public String toString() { return "word=" + "" + ", count=" + sum; } }); } }); // 输出结果 wordCounts.print(); } }
在这个例子中,我们首先读取一个文本文件,然后将其转换为单词。接着,我们使用`groupBy`和`aggregate`方法来统计每个单词的出现次数。
通过以上三个例子,我们可以看到Flink提供了多种方式来实现WordCount程序,每种方式都有其特点和优势。
其他信息其他资源
Top最新文章热门标签欢迎提供和下载各类你熟悉的实例,感谢您对"实例吧"的支持,诚心接受各类问题反馈。