updateStateByKey&mapStateWithKey


spark中如何实现全局count

说明

两种方式都可以实现对同一key的累计统计

区别
updateStateByKey会返回无增量数据的状态,所以会相对较大的数据资源开销
mapStateWithKey 相当于增量统计

使用

updateStateByKey :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

public static Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunctionByUpdate() {
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = (values, s1) -> {
Integer newSum = 0;
if (s1.isPresent()) {
newSum = s1.get();
}

Iterator<Integer> i = values.iterator();
while (i.hasNext()) {
newSum += i.next();
}
return Optional.of(newSum);
};
return updateFunction;
}

mapStateWithKey :

1
2
3
4
5
6
7
8
9
10
11
public static Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> updateFunctionByMap() {
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> updateFunction2 = (word, one,
state) -> {
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
state.update(sum);
return output;
};
return updateFunction2;
}


Author: Kuiq Wang
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source Kuiq Wang !
  TOC