当前位置:实例文章 » C#开发实例» [文章]我的第一个Flink程序 - 三种WordCount方式

我的第一个Flink程序 - 三种WordCount方式

发布人:shili8 发布时间:2025-03-09 06:18 阅读次数:0

**我的第一个Flink程序 - 三种WordCount方式**

作为一名新手,刚接触到Apache Flink这个流式处理框架,我自然会先尝试着写一个简单的WordCount程序。这种程序是流式处理的一个经典例子,它可以帮助我们理解如何使用Flink来处理数据流。

在本文中,我们将介绍三种不同的WordCount方式:第一种是使用Flink提供的`DataSet` API,第二种是使用`DataStream` API,第三种是使用`Table` API。每种方式都有其特点和优势。

### 第一种方式:使用DataSet API首先,让我们来看一下如何使用Flink提供的`DataSet` API来实现WordCount程序。

java// WordCount.javaimport org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class WordCount {
 public static void main(String[] args) throws Exception {
 // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

 //读取数据源(假设是文本文件)
 DataSet text = env.readTextFile("input.txt");

 // 将文本转换为单词 DataSet words = text.map(new MapFunction() {
 @Override public String map(String value) throws Exception {
 return value.toLowerCase();
 }
 });

 // 统计每个单词的出现次数 DataSet> wordCounts = words.groupBy(0).aggregate(
 new ReduceFunction() {
 @Override public Long reduce(Long aLong, Long aLong1) throws Exception {
 return aLong + aLong1;
 }
 },
 new GroupReduceFunction>() {
 @Override public void reduce(Iterable values, Collector> out) throws Exception {
 long sum =0L;
 for (Long value : values) {
 sum += value;
 }
 out.collect(new Tuple2<>("", sum));
 }
 });

 // 输出结果 wordCounts.print();
 }
}

在这个例子中,我们首先读取一个文本文件,然后将其转换为单词。接着,我们使用`groupBy`和`aggregate`方法来统计每个单词的出现次数。

### 第二种方式:使用DataStream API接下来,让我们看一下如何使用Flink提供的`DataStream` API来实现WordCount程序。
java// WordCount.javaimport org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WordCount {
 public static void main(String[] args) throws Exception {
 // 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 //读取数据源(假设是文本文件)
 DataStream text = env.readTextFile("input.txt");

 // 将文本转换为单词 DataStream words = text.map(new MapFunction() {
 @Override public String map(String value) throws Exception {
 return value.toLowerCase();
 }
 });

 // 统计每个单词的出现次数 DataStream> wordCounts = words.keyBy(0).sum(1);

 // 输出结果 wordCounts.print();
 }
}

在这个例子中,我们首先读取一个文本文件,然后将其转换为单词。接着,我们使用`keyBy`和`sum`方法来统计每个单词的出现次数。

### 第三种方式:使用Table API最后,让我们看一下如何使用Flink提供的`Table` API来实现WordCount程序。
java// WordCount.javaimport org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class WordCount {
 public static void main(String[] args) throws Exception {
 // 创建表执行环境 TableEnvironment env = TableEnvironment.create();

 //读取数据源(假设是文本文件)
 Table text = env.readTable("input.txt");

 // 将文本转换为单词 Table words = text.map(new MapFunction() {
 @Override public String map(String value) throws Exception {
 return value.toLowerCase();
 }
 });

 // 统计每个单词的出现次数 Table wordCounts = words.groupBy(0).aggregate(
 new ReduceFunction() {
 @Override public Long reduce(Long aLong, Long aLong1) throws Exception {
 return aLong + aLong1;
 }
 },
 new GroupReduceFunction() {
 @Override public void reduce(Iterable values, Collector out) throws Exception {
 long sum =0L;
 for (Long value : values) {
 sum += value;
 }
 out.collect(new Table() {
 @Override public String toString() {
 return "word=" + "" + ", count=" + sum;
 }
 });
 }
 });

 // 输出结果 wordCounts.print();
 }
}

在这个例子中,我们首先读取一个文本文件,然后将其转换为单词。接着,我们使用`groupBy`和`aggregate`方法来统计每个单词的出现次数。

通过以上三个例子,我们可以看到Flink提供了多种方式来实现WordCount程序,每种方式都有其特点和优势。

其他信息

其他资源

Top