Flink API编程(Transforming Data)
Flink 官方提供了一个可以练手的入门项目,包含4个练习,每个练习都有原理、提示,以及参考解决code在github上也有实现,可以说是非常适合入门练级啦。地址:training.ververica.com。项目的数据来自纽约市交通委提供纽的约市自2009年到2015年的公开数据集,获取地址:
1
2
wget http://training.ververica.com/trainingData/nycTaxiRides.gz
wget http://training.ververica.com/trainingData/nycTaxiFares.gz
数据说明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Schema of Taxi Ride Events(出租车行程记录schema说明):
rideId : Long // a unique id for each ride
taxiId : Long // a unique id for each taxi
driverId : Long // a unique id for each driver
isStart : Boolean // TRUE for ride start events, FALSE for ride end events
startTime : DateTime // the start time of a ride
endTime : DateTime // the end time of a ride, "1970-01-01 00:00:00" for start events
startLon : Float // the longitude of the ride start location
startLat : Float // the latitude of the ride start location
endLon : Float // the longitude of the ride end location
endLat : Float // the latitude of the ride end location
passengerCnt : Short // number of passengers on the ride
Schema of Taxi Fare Events(出租车费用记录schema说明):
rideId : Long // a unique id for each ride
taxiId : Long // a unique id for each taxi
driverId : Long // a unique id for each driver
startTime : DateTime // the start time of a ride
paymentType : String // CSH or CRD
tip : Float // tip for this ride
tolls : Float // tolls for this ride
totalFare : Float // total fare collected
1 无状态的算子(Stateless transformation)
- map(),map转换是无状态的算子,是一对一的转换,数据流的转换类型是:DataStream → DataStream。下面是一个map()例子,这个例子很简单,就是对数据流上的每条行程记录做一个map操作,对每条记录扩展属性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//静态类:提供构造方法,将ride对象转换为EnrichedRide,增加startCell、endCell位置属性,起始位置通过每条记录(TaxiRide)的行程开始经纬度、行程结束经纬度生成。
public static class EnrichedRide extends TaxiRide {
public int startCell;
public int endCell;
public EnrichedRide() {}
public EnrichedRide(TaxiRide ride) {
this.rideId = ride.rideId;
this.isStart = ride.isStart;
...
this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
}
public String toString() {
return super.toString() + "," +
Integer.toString(this.startCell) + "," +
Integer.toString(this.endCell);
}
}
//静态类:实现了MapFunction<TaxiRide, EnrichedRide> 接口,用于将流上的每条记录(taxiRide)转换为新的记录(EnrichedRide)
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
@Override
public EnrichedRide map(TaxiRide taxiRide) throws Exception {
return new EnrichedRide(taxiRide);
}
}
//主类:
DataStream<EnrichedRide> enrichedNYCRides = rides
.filter(new RideCleansing.NYCFilter())
.map(new Enrichment());
- flatMap(): flatMap()也是一个无状态的转换算子,map()是对每条流上的记录实现1对1的转换,如果需要实现一对多(包括1对0)的转换,就需要使用flatMap(),数据流的转换类型是:DataStream → DataStream。下面是一个flatMap()的例子,这个例子用flatMap()实现一个对1或1对0的map(),用来实现对流上每条记录的过滤。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//静态类:实现了FlatMapFunction<TaxiRide, EnrichedRide>接口,用于将流上的每条记录(TaxiRide)转换为n条记录(EnrichedRide),这里的n是0或者1,
//RideCleansing.NYCFilter 根据taxiRide里起始位置的经纬度判断该行程是否在纽约市内
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
@Override
public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}
//主类:
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
DataStream<EnrichedRide> enrichedNYCRides = rides
.flatMap(new NYCEnrichment());
enrichedNYCRides.print();
2 对流进行分组(Keyed Streams)
- KeyBy(): KeyBy(KeySelector)算子的使用场景主要是根据流上每条记录的一些属性进行分组,Flink的KeyBy()使用的是Hash partition 算法,数据流的转换类型是:DataStream → KeyedStream。下面是一个一个KeyBy()的例子,这个例子里用KeyBy()实现了对行程在同一片区的记录进行分组:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//以下三种写法均正确
//主类:这种写法,key通过字段名"startCell"指定,缺点是便器无法推断key的类型,会将key当做tuple处理
rides
.flatMap(new NYCEnrichment())
.keyBy("startCell")
//主类:这种写法,通过实现KeySelector<EnrichedRide, int>接口,明确指定了key的类型
rides
.flatMap(new NYCEnrichment())
.keyBy(
new KeySelector<EnrichedRide, int>() {
@Override
public int getKey(EnrichedRide ride) throws Exception {
return ride.startCell;
}
})
//主类:这是第二种写法的lamda写法
rides
.flatMap(new NYCEnrichment())
.keyBy(ride -> ride.startCell)
通过KeyBy()获取的分组数据流(KeyedStream),在这KeyedStream上Flink提供了丰富的聚合算子,包括sum(),max(),min(),minBy(),maxBy()。max()与maxBy()的区别是,max()返回max()算子作用field的最大的值,maxBy()则是返回包含这个最大值的整条记录。 在KeyedStream还有种常用的算子,如reduce(),fold(),window(),intervalJoin()等,这些算子后面的文章中会介绍。下面是一个在KeyedStream使用算子的简单例子,该例子实时地输出当前时刻从同一区域出发的行驶时间最长的出租车行程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//主类:
DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
@Override
public void flatMap(EnrichedRide ride,
Collector<Tuple2<Integer, Minutes>> out) throws Exception {
if (!ride.isStart) {
Interval rideInterval = new Interval(ride.startTime, ride.endTime);
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
}
}
});
minutesByStartCell
//下面的(0),(1)这种写法是对记录为Tuple时,可以用field在tuple中序号来代替改field
.keyBy(0) // startCell 同一区域,可以理解为同一小区
.maxBy(1) // duration 行程的持续时间
.print(); // 直接打印
3 有状态的算子(Stateful Transformation)
3.1 Flink的状态机制(Flink state)
Flink 的状态管理是相比其他流式计算引擎来将,是一个非常大的提升。在状态管理上,FLink的提供了以下特性:
- local: Flink state可以保存在在计算节点的本地,并且可以以内存速度访问。
- durable: Flink state 采用检查点机制自动保存
- vertically scalable: Flink state也可以保存在RocksDB中,RocksDB可以通过增加磁盘做到垂直方向扩展
- horizontally scalable: Flink state 随着集群规模的增长或缩小可以水平扩展
- queryable: Flink state 可以通过REST API直接查询 这小节的例子中,我们通过Flink API来管理状态。
3.2 Rich Function接口(Rich Functions)
前面我们已经了解了Flink的一些函数接口,如FilterFunction, MapFunction,FlatMapFunction.所有的这些接口,Flink还提供了一种称为”rich”的变种接口,比如RichFlatMapFunction,RichFlatMapFunction接口有三个方法:
1
2
3
open(Configuration c)
close()
getRuntimeContext()
-
open(): open()方法只在算子初始化时调用一次,可以用来加载静态数据,打开外部服务连接等(如读取redis数据,sink数据到kafka等场景)
-
getRuntimeContext(): getRuntimeContext()提供了对Flink 运行时的整个上下文的访问途径,最主要我们可以用用来创建和访问Flink 管理的state.
3.3 一个使用keyed state的例子
例子中,输入流中每条记录是Tuple2<String, Double> 类型,表示<传感器ID,传感器参数>,需要做的是对每个传感器的记录(事件)进行实时的平滑,平滑就是根据当前值和历史值取平均。这里 读取传感器的平均值和存入新的平均值其实都是通过Flink的状态管理来实现的。 Flink支持多种类型的keyed state,例子中使用最简单的ValueState,这意味着,对对每一个key,Flink 会存储一个对象,例子中的对象是MovingAverage类型,此外,Flink还提供了ListState、MapState。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//主类:
DataStream<Tuple2<String, Double>> input = …
//keyBy(0)将所有数据按照传感器id分组,数据流变为Keyed Stream,所以对应keyed state
DataStream<Tuple2<String, Double>> smoothed = input.keyBy(0).map(new Smoother());
//静态类:
public static class Smoother extends RichMapFunction<Tuple2<String, Double>, Tuple2<String, Double>> {
private ValueState<MovingAverage> averageState;
@Override
//在open()中初始化averageState
public void open (Configuration conf) {
ValueStateDescriptor<MovingAverage> descriptor =
new ValueStateDescriptor<>("moving average", MovingAverage.class);
averageState = getRuntimeContext().getState(descriptor);
}
@Override
public Tuple2<String, Double> map (Tuple2<String, Double> item) throws Exception {
// Flink会根据key访问对应的averageState
MovingAverage average = averageState.value();
// create a new MovingAverage (with window size 2) if none exists for this key
if (average == null) average = new MovingAverage(2);
// add this event to the moving average,计算新的均值
average.add(item.f1);
//更新新的均值
averageState.update(average);
// 返回平滑后的记录
return new Tuple2(item.f0, average.getAverage());
}
}
3.4 状态清理
在上面的例子中,思考下如果传感器的个数是无限的怎么办?那状态也会有无穷多个,因此Flink需要采用一定的策略管理这些状态,清理掉一些state,这是通过clear()方法实现:
1
2
averageState.clear()
4 多条流的连接(Connected Streams)
一些场景下我们需要将两条流的数据同时处理,比如一条是实时采集的日志数据流,一条是控制流(日志规则,计算规则等,告警规则),作用两个流上的操作称为 Connected Stream。 在本例中,控制流指定了 streamofwords 流需要筛选出的单词,这个操作通过一个名为controlFunction。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//主类
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
DataStream<String> streamOfWords = env.fromElements("data", "DROP", "artisans", "IGNORE").keyBy(x -> x);
control
.connect(datastreamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
//静态类
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
//blocked是一个Keyed state,是两条流共享的。
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
//每个控制流事件,会更新key对应 Keyed state(即blocked)的值
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
//单词流中的每个事件,会根据当前key对应 Keyed state(即blocked)的值,确定是否输出当前事件
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
RichCoFlatMapFunction 接口也是一种FlatMapFunction接口,这个接口可以访问rich 函数接口,这使得RichCoFlatMapFunction具有状态性。flatmap1()和flatmap2()由flink运行时调用, 记录来自两个连接的流中的每一条记录,在我们的例子中,来自控制流的元素被传递到flatmap1(),来自streamofwords流的元素被传递到flatmap2(),这个顺序是由control.connect(datastreamofwords)连接两个流的顺序决定的。 在这个例子中,我们可以发现connected 流之间的状态是共享的,这点在Flink中非常重要。
5 相关实验
5.1 实验环境配置:
官方的这个入门项目有配套的工程代码和指导,开始实验时需要先配置下环境,后续本系列的博客都是使用这个环境,环境配置可以参考两个文档: 环境配置 如何开始实验
5.2 Filtering a Stream:
题目:实时过滤在纽约市内的行程,GeoUtils
提供了一个isInNYC(float lon, float lat)
判断一个行程是否在纽约市内
输入:TaxiRide
事件流,由TaxiRideSource
产生
输出:DataStream<TaxiRide>
事件流,里面的事件的起始位置只在纽约市内
基础代码:
- Java: com.dataartisans.flinktraining.exercises.datastream_java.basics.RideCleansingExercise
- Scala: com.dataartisans.flinktraining.exercises.datastream_scala.basics.RideCleansingExercise
实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class RideCleansingExercise extends ExerciseBase {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int maxEventDelay = 60; // events are out of order by max 60 seconds
final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
// start the data generator
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
DataStream<TaxiRide> filteredRides = rides
// filter out rides that do not start or stop in NYC
.filter(new NYCFilter());
// print the filtered stream
printOrTest(filteredRides);
System.out.println(env.getExecutionPlan());
// run the cleansing pipeline
env.execute("Taxi Ride Cleansing");
}
private static class NYCFilter implements FilterFunction<TaxiRide> {
@Override
public boolean filter(TaxiRide taxiRide) throws Exception {
if(GeoUtils.isInNYC(taxiRide.startLon,taxiRide.startLat)&&GeoUtils.isInNYC(taxiRide.endLon,taxiRide.endLat)){
return true;
}
else{
return false;
}
//throw new MissingSolutionException();
}
}
}
参考实现:
- Java: RideCleansingSolution.java
- Scala: RideCleansingSolution.scala
5.3 Stateful Enrichment:
题目:关联TaxiRide
出租车行程流 和 TaxiFare
费用流,实时输出每趟行程对应的费用
输入:两个数据流,TaxiRide
和TaxiFare
分别由TaxiRideSource
和 TaxiFareSource
生成。(注意,如果为了使的任务能够真正地具备错性,你可以使用CheckpointedTaxiRideSource
和CheckpointedTaxiFareSource
)
输出:Tuple2<TaxiRide, TaxiFare>类型的事件流,每个rideId
对应流中一个事件。需要忽略状态为END的行程,只关联状态为START的行程。
基础代码:
- Java: com.dataartisans.flinktraining.exercises.datastream_java.state.RidesAndFaresExercise
- Scala: com.dataartisans.flinktraining.exercises.datastream_scala.state.RidesAndFaresExercise
实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class RidesAndFaresExercise extends ExerciseBase {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String ridesFile = params.get("rides", pathToRideData);
final String faresFile = params.get("fares", pathToFareData);
final int delay = 60; // at most 60 seconds of delay
final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
DataStream<TaxiRide> rides = env
.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
.filter((TaxiRide ride) -> ride.isStart)
.keyBy("rideId");
//keyBy 指定列名 的缺点是 编译器不知道类型,Flink会转化成Tuples,使用lamda:ride ->ride.startCell则可以知道类型
//我们也可以 用tuple的索引指定
DataStream<TaxiFare> fares = env
.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
.keyBy("rideId");
DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
.connect(fares)
.flatMap(new EnrichmentFunction());
printOrTest(enrichedRides);
env.execute("Join Rides with Fares (java RichCoFlatMap)");
}
public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
/**
* Flink 支持多种 keyedState,ValueState 代表一个key,flink 存储一个Object.flink 其他state包括 ListState,MapState
* Flink 也有non_keyedState
*/
// the state is shared by two streams
private ValueState<TaxiRide> rideValueState;
private ValueState<TaxiFare> fareValueState;
@Override
//open()只在算子初始化的时候调用一次,可以李永来加载static data,open connection to an external server
//getRuntimeContext() 提供一个全局上下文访问,大多数时候用来访问 Fling 的manage state
//
public void open(Configuration config) throws Exception {
rideValueState = getRuntimeContext().getState(new ValueStateDescriptor<TaxiRide>("saved ride",TaxiRide.class));
fareValueState = getRuntimeContext().getState(new ValueStateDescriptor<TaxiFare>("saved fare",TaxiFare.class));
}
@Override
public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
//通过 key(rideid) 访问 state
TaxiFare taxiFare = fareValueState.value();
if(taxiFare!=null){
//已经匹配过了,清除掉。
// clear()用于清理state,有些数据的state会无限地增长,就需要调用这个函数。flink 1.6之后加入了state ttl 功能,
// 可以自动清理不必要的state
fareValueState.clear();
out.collect(new Tuple2<>(ride,taxiFare));
}else {
//根据 key 更新 state
rideValueState.update(ride);
}
}
@Override
public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiRide taxiRide = rideValueState.value();
if(taxiRide!=null){
rideValueState.clear();
out.collect(new Tuple2<>(taxiRide,fare));
}
else {
fareValueState.update(fare);
}
}
}
}
参考实现:
- Java: RidesAndFaresSolution.java
- Scala: RidesAndFaresSolution.scala