我的第一个Flink程序 - 三种WordCount方式
发布人:shili8
发布时间:2025-03-09 06:18
阅读次数:0
**我的第一个Flink程序 - 三种WordCount方式**
作为一名新手,刚接触到Apache Flink这个流式处理框架,我自然会先尝试着写一个简单的WordCount程序。这种程序是流式处理的一个经典例子,它可以帮助我们理解如何使用Flink来处理数据流。
在本文中,我们将介绍三种不同的WordCount方式:第一种是使用Flink提供的`DataSet` API,第二种是使用`DataStream` API,第三种是使用`Table` API。每种方式都有其特点和优势。
### 第一种方式:使用DataSet API首先,让我们来看一下如何使用Flink提供的`DataSet` API来实现WordCount程序。
java// WordCount.javaimport org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//读取数据源(假设是文本文件)
DataSet text = env.readTextFile("input.txt");
// 将文本转换为单词 DataSet words = text.map(new MapFunction() {
@Override public String map(String value) throws Exception {
return value.toLowerCase();
}
});
// 统计每个单词的出现次数 DataSet> wordCounts = words.groupBy(0).aggregate(
new ReduceFunction() {
@Override public Long reduce(Long aLong, Long aLong1) throws Exception {
return aLong + aLong1;
}
},
new GroupReduceFunction>() {
@Override public void reduce(Iterable values, Collector> out) throws Exception {
long sum =0L;
for (Long value : values) {
sum += value;
}
out.collect(new Tuple2<>("", sum));
}
});
// 输出结果 wordCounts.print();
}
}
在这个例子中,我们首先读取一个文本文件,然后将其转换为单词。接着,我们使用`groupBy`和`aggregate`方法来统计每个单词的出现次数。
### 第二种方式:使用DataStream API接下来,让我们看一下如何使用Flink提供的`DataStream` API来实现WordCount程序。
java// WordCount.javaimport org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取数据源(假设是文本文件)
DataStream text = env.readTextFile("input.txt");
// 将文本转换为单词 DataStream words = text.map(new MapFunction() {
@Override public String map(String value) throws Exception {
return value.toLowerCase();
}
});
// 统计每个单词的出现次数 DataStream> wordCounts = words.keyBy(0).sum(1);
// 输出结果 wordCounts.print();
}
}
在这个例子中,我们首先读取一个文本文件,然后将其转换为单词。接着,我们使用`keyBy`和`sum`方法来统计每个单词的出现次数。
### 第三种方式:使用Table API最后,让我们看一下如何使用Flink提供的`Table` API来实现WordCount程序。
java// WordCount.javaimport org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建表执行环境 TableEnvironment env = TableEnvironment.create();
//读取数据源(假设是文本文件)
Table text = env.readTable("input.txt");
// 将文本转换为单词 Table words = text.map(new MapFunction() {
@Override public String map(String value) throws Exception {
return value.toLowerCase();
}
});
// 统计每个单词的出现次数 Table wordCounts = words.groupBy(0).aggregate(
new ReduceFunction() {
@Override public Long reduce(Long aLong, Long aLong1) throws Exception {
return aLong + aLong1;
}
},
new GroupReduceFunction() {
@Override public void reduce(Iterable values, Collector out) throws Exception {
long sum =0L;
for (Long value : values) {
sum += value;
}
out.collect(new Table() {
@Override public String toString() {
return "word=" + "" + ", count=" + sum;
}
});
}
});
// 输出结果 wordCounts.print();
}
}
在这个例子中,我们首先读取一个文本文件,然后将其转换为单词。接着,我们使用`groupBy`和`aggregate`方法来统计每个单词的出现次数。
通过以上三个例子,我们可以看到Flink提供了多种方式来实现WordCount程序,每种方式都有其特点和优势。
其他信息
其他资源
最新文章
热门标签
欢迎提供和下载各类你熟悉的实例,感谢您对"实例吧"的支持,诚心接受各类问题反馈。
Top

