|
6 d) O/ ]" H: d+ @7 ]) C" m8 X<h4 id="flink系列文章">Flink系列文章</h4>3 ~4 @: e: L9 A9 h1 A0 x
<ol>
) J6 F# u/ S3 K' K6 Z' i<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>0 {& \3 k- B2 V+ r# y" Q
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
6 N% S. ^, P( B) [6 u<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
" J6 ?* T4 l! \) s7 ?: t<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>+ ^* F0 s9 f2 R" d
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
1 |1 r# n% l/ @6 H* Q3 O- Z<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>( O8 E, } n% x( {
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>1 n/ M3 q* l. L3 u/ F. ^
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
0 y0 R) a: j! e; `<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
- y1 l9 _& a( m4 _8 b</ol>( S) m; k2 B$ p3 R& A
<blockquote>
& i |% F- I- C+ {<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
! p8 ~1 n% R% e/ r9 @$ L( Y( R/ F</blockquote>
9 O; _# \; R% I' r; b, E d<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>" ?2 f. r$ Z" m5 x) m
<h3 id="分流场景">分流场景</h3>. k, C& I4 p, T' b* l+ S) l
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
0 _! ?- Q1 r7 m9 S/ ?6 v$ {6 Q% i<h3 id="分流的方法">分流的方法</h3>
( E% |% P4 v( k6 b# R2 }- @ c<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
/ Q) O7 m( n# g8 f( m; j0 I a$ n<h4 id="filter-分流">Filter 分流</h4>
/ G; K- ?5 m9 E% M$ w; z* P( f<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
3 l8 E3 h. V7 ?; E2 Z<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
. Z7 e' k7 I7 x( f/ E0 E* E+ z<p>来看下面的例子:</p>
- Z* y' |$ v1 t- C) Q<p>复制代码</p>
8 K8 G7 N& f' x8 s( |<pre><code class="language-java">public static void main(String[] args) throws Exception {
3 B8 C- D: q8 g4 U4 M StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
: O0 R! t3 z& U //获取数据源) J D; a5 U5 w
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
9 F: m; g: j+ h' p+ @. M data.add(new Tuple3<>(0,1,0));! z9 O( P# G0 ?" V& s* a) D
data.add(new Tuple3<>(0,1,1));
" H9 q( }! G+ `( k2 e4 W data.add(new Tuple3<>(0,2,2));( I! a7 N) f1 p) {) q
data.add(new Tuple3<>(0,1,3));* R- c5 n) B: F( e# N
data.add(new Tuple3<>(1,2,5));
0 O* ?7 f0 b% E6 {: {! A3 ` data.add(new Tuple3<>(1,2,9));& _+ W; U2 Z! Y+ b- V' N8 z3 `1 U
data.add(new Tuple3<>(1,2,11));
7 j @) R1 w1 C( ^, b" f& b data.add(new Tuple3<>(1,2,13));, O* D6 P/ H; F1 i) Q/ k- {
1 ~* N( |9 u! B! K% @4 V
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);0 `( g# {! [) c0 N+ A O' n3 X
, r9 t* G4 z; |) [/ F
9 Q" _& {4 Y8 w g1 F+ J9 }- @
( D8 P' I* X/ F- O9 m/ F4 {5 R
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);$ m/ ~ {, ?( r3 ?; a- v
9 D; x8 f1 @& c2 J6 D: R$ n4 P
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
5 s9 t9 u( k) e B. k
, G4 q% [7 E/ P5 Q+ w8 E4 k& @- D5 j
2 ^5 J6 t. `7 g+ q1 T: N: }1 V% t8 @ A$ y0 Z9 ~* \
zeroStream.print();
7 l- v7 k8 D. R; g
) U1 `& I# B! k7 d- u R oneStream.printToErr();
8 L8 Z& v7 b' x5 G5 T/ h" C1 p9 y, w" a
5 F1 B$ }/ M; E8 v! _/ F1 Z
9 C- G( S0 y9 u; \, e" i% v
0 @+ p, Q% ^4 [2 g' f4 K, @! q5 W$ S7 J* k2 C/ t% V2 p
//打印结果
$ F; e' p2 Z+ j: l3 Q% Q1 i) D# L' U% e( I4 J9 V' m: L! L2 z& y
String jobName = "user defined streaming source";
" M0 k, e1 f1 F9 o
- n, X! Z Y/ E* t( O" [8 A" ` env.execute(jobName);
. T) k: n0 r- { o; ]( B& C
7 A. @5 u1 i$ h c) W. ^3 i: r, L}) f* n& H3 R- V% H8 F
</code></pre># g) B& d6 L/ e& [
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>6 f( K; l+ K. f
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p> h+ l4 {# v5 b& M$ f" g& L1 n
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>' }9 a2 t! I% n8 v/ E
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>, Q$ a' X5 n) m( K
<h4 id="split-分流">Split 分流</h4>
9 K" [7 i$ C' Q<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>+ h6 G. q( [' Q T1 B* d7 d
<p>我们来看下面的例子:</p>5 @2 B* d7 m4 w# Y) O
<p>复制代码</p>
$ T6 b1 t. `5 j5 X2 B& ?<pre><code class="language-java">public static void main(String[] args) throws Exception {
. p$ n; A3 W5 M8 ?9 u
- e7 U9 ?) {' T. Q
, p/ ?# N. f, R0 {; q# p9 `/ |; s+ U# n0 ]3 K' C8 ~; L+ r
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
3 x& \: t# A: d" D* E: T7 ~+ A x" v- P/ v
//获取数据源
, M' ?. u8 S" B: R. L, | z# Y5 U9 ~5 c6 V5 M8 J
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();( d3 J6 i7 ^- F0 S4 w
4 p* G1 F8 u6 \) D9 n( E: e! G
data.add(new Tuple3<>(0,1,0));
/ k& u8 z7 ]: b! D5 n/ [% \: P' M) {9 K* D) h0 ^
data.add(new Tuple3<>(0,1,1));1 C7 `6 A7 F6 f4 M
: T! @) Q7 M5 z F; \6 K data.add(new Tuple3<>(0,2,2));
) X4 e# E! D" L2 D% X- @
- R W' l- G) I, W, j data.add(new Tuple3<>(0,1,3));
" h$ J7 F6 b k* a
$ B( e! n: w/ F* d; ^! G data.add(new Tuple3<>(1,2,5));+ T, ?& I9 |+ y8 ~/ E
* I; `4 L! n6 i8 M data.add(new Tuple3<>(1,2,9));
# Y: I& p, |6 w0 o- i, V! u9 k! P) R0 h1 t$ Y6 A
data.add(new Tuple3<>(1,2,11));
+ g9 z9 d! V$ L+ y" Z' `
+ |4 j, ^, Y$ @' i" V; W, r. |, k data.add(new Tuple3<>(1,2,13));
/ S) k# i4 m4 u8 v- F, l9 h4 u: i+ y* X1 s$ j
- @3 g4 R0 @3 ~' F
; S4 G5 Z4 N- m, C% r* v s% k ?4 n8 J; e7 S: {
& ?* r. t- c- K8 {3 B# C2 O
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
( f) l( H$ j( F8 E
' X. R7 W7 D8 T. o; O& k" _( Q {/ }6 B
_) W/ O0 _3 A$ ], f t0 W; _0 m6 p& w4 s
9 G; @6 V/ ^( V SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
2 A6 m1 |/ |+ M$ {; P9 |( z" }* Z# p8 ^, W. ~4 d8 U4 b
@Override
' s @! z9 x4 y: q6 |2 c7 \- P' h! P8 _ j4 O
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {* `) a, `3 T- s4 P% o
& A* s) e) K' }2 t! z
List<String> tags = new ArrayList<>();
1 a. C/ O5 \6 {) r' ?- g! ?
- c P) y8 U" u8 d, U* P5 ] if (value.f0 == 0) {
- u2 O, \- U. @7 q& ?3 q! f A& i& \( W
tags.add("zeroStream");
: F( r* T" F' }& _" ^2 k! X; E7 ~% U/ y* B- K7 z0 U- r- g8 q
} else if (value.f0 == 1) {) x' O1 q4 y i$ {$ @2 N9 F
0 P0 P% j a. E/ j. f2 Z
tags.add("oneStream");4 ?# P O; A9 v; r
$ ~2 H) U/ m% F- v1 r }' d+ |! v! y& b; A
2 \) K) Q1 Y7 ^$ j
return tags;
3 s2 U$ F* C, @: E5 h2 K8 |6 w# R- O9 O8 T3 h1 `# n N$ w7 L8 K
}
+ W# W: q/ c# U$ I* r) b0 j8 I, ^# ~. @" s
});
8 W' L$ P( D; I4 w9 z
{) s! c% Y5 _/ Y7 G! H
* v, G! z. _; ^$ M+ ]
8 n/ j% D7 S9 K4 \4 a3 T( [ splitStream.select("zeroStream").print();
! z6 V7 ]9 F' {6 ^
$ y/ Q7 b! n5 p9 r4 i splitStream.select("oneStream").printToErr();4 l) z( G! j) l2 n0 ~3 m- a0 O
* @. Z S! k# w2 f6 a- N4 F# O0 E- O' A* D& s
7 ^, h8 _* r J" V* d8 O
//打印结果9 n/ C! c' N4 b: t: Q# a
9 k1 @. L. b3 V( x String jobName = "user defined streaming source";
4 c8 _( |; H; L' u/ g7 o; a5 }9 N
env.execute(jobName);7 V* o# w- |9 |* M6 l
! I! \( p( f S: D' p. B' \
}' \% f- a: z1 D" k. {2 k
</code></pre>) N8 x" N" u o, @# k( X+ `
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>7 }) J& r' g3 M' p4 E
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
1 j$ }" T5 X7 N4 }5 o7 x<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
8 ^" t0 O ~: c<p>复制代码</p>! g: I% @' S. C/ k
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
: Y0 T8 f" s! z4 W2 k k5 X9 b</code></pre>1 G# E$ _- l7 R& h
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
. \# K$ ^" E# Y/ x! I8 n# Q2 ^, m# _: o<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
& T+ h: D* f/ d& m9 l: E<h4 id="sideoutput-分流">SideOutPut 分流</h4>
3 L9 D, l7 E) r z/ j<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>6 L4 ^8 z3 ~, P4 O% Q7 O
<ul>5 J" ?5 [$ d; u1 E/ ]. w
<li>定义 OutputTag</li>
6 C$ ~6 w1 V! y" l) S<li>调用特定函数进行数据拆分
+ t% ]6 Z7 T8 x8 W8 [<ul>$ @8 z4 F. T) [6 J. r9 f
<li>ProcessFunction</li>7 o9 g5 H* N( ]
<li>KeyedProcessFunction</li>
V( Y3 F6 x; j<li>CoProcessFunction</li>* I( p& B! u5 q/ [' Z
<li>KeyedCoProcessFunction</li>5 }- j5 \: A4 O, w* K
<li>ProcessWindowFunction</li>, g6 S5 W" d. V- M7 Z4 e* o$ C# d
<li>ProcessAllWindowFunction</li>' i" A- l% n" w( e4 T5 N3 \
</ul>
& k1 e7 ?9 `6 B- K6 V! H</li>4 {+ ?' M0 x' B
</ul>* J* x+ x9 a' F: M. t
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
; v1 k5 c9 {& r, z! |, D N& Y' i5 {<p>复制代码</p>4 C* v" j0 A _& M
<pre><code class="language-java">public static void main(String[] args) throws Exception {
4 K E4 Q$ J# j; }1 T: o. [- x
# @& E, n8 I$ d9 {, }/ M1 X) Z0 h0 i m# l; h e5 ?! s
7 }) v& ^+ k+ t$ W StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();3 C& Q+ ]4 f8 ^" _8 f9 W
4 M1 y4 j; M! `1 e4 |7 j
//获取数据源
) ^7 O* M8 d2 U0 I0 I4 U+ `* e* y8 R
@& `3 T/ f# y7 A9 f' V! C List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();, q. `- T# n5 ?* K" a
' C! q6 P S) i' E/ C" v7 w
data.add(new Tuple3<>(0,1,0));# ~0 z( m. I- m# i; R
; N' o6 O' E! v0 i
data.add(new Tuple3<>(0,1,1));
L9 R3 [- f. d5 @# N$ c+ i2 V; r1 v. P3 ]" ~2 F
data.add(new Tuple3<>(0,2,2));
5 c7 q9 V& h, `0 i9 _: W& B* X6 Y- w. r0 m6 y) B2 c
data.add(new Tuple3<>(0,1,3));3 V! W1 w% i8 G" }4 M M) f
$ K2 u6 }5 e* P* Q data.add(new Tuple3<>(1,2,5));) ^( c. Q" R, S5 x
# Z2 S6 Z) h9 C( k data.add(new Tuple3<>(1,2,9));
( \7 x) x% d$ z5 r& h0 r. ]: n$ B r
* x# L1 M9 b, v% s data.add(new Tuple3<>(1,2,11));; w' H" J" P* [: D, ^
b6 t/ l: `1 X% P! t4 Y6 o% Q
data.add(new Tuple3<>(1,2,13));- I/ A7 ^/ p3 n8 ~
& e" O+ c2 U0 v8 h, i# L
% q5 v/ Y& S& {1 ?% u4 h% p* S3 a* L8 |
" x% J! g3 l& m% ~$ o1 V2 n! c4 |; u& g# @, R6 P o
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);1 Q8 b1 Z# H& ^: p# [
8 [9 V/ p6 v0 T( n6 }( w9 A8 X
* F9 M" |& X5 Y, R1 I# y4 A t: z O+ j8 Y7 X% t
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};/ d% p( h: f- J+ ~2 b% W% r
0 f% m: C* U) j$ M& s' k( w7 B! H& Q0 r OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {}; E. J d6 I. K1 J% a
7 c# R; n1 C, t* V3 T: _& I3 T. v2 s$ g
& z4 N5 \5 x5 G) G; d! e* x. G- c2 i
. |$ E( j: _ \1 ~! G$ r9 G4 ^ P2 `, S$ ~! ?
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
% }* O0 ^6 o* ]% H. C
h: a: g$ C2 e* T' W1 x: M @Override2 r7 g1 ?6 H9 u' C# S ~
9 z$ Q0 p+ T, H! c% M& \
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
# x: D4 \% I% H3 F) B3 r) {$ ^5 [; r
" S6 o9 r S, y! m* B! ^
5 P) h4 ?4 V" A7 o. z9 t if (value.f0 == 0) {
. L* P% ?- l6 i: {3 h
w( ~" o0 S% n ctx.output(zeroStream, value);9 j4 W2 A) F e$ @, ~7 J
- ^) m. J% N6 F$ R: q- Q& L# G } else if (value.f0 == 1) {
/ Y3 t& q) J) d' s7 @1 t6 J- z: M5 C( E: G
ctx.output(oneStream, value);) K" J4 W$ `, g" @% s
0 T8 D: R5 L% c4 i
}
) r9 `/ g: P2 \9 x! }" e+ L7 Y5 p6 r; D7 y# X( B
}, k6 p" f. C4 a* v8 O
5 q# z+ I0 ^2 s9 r. t- C) F! K) b });2 \6 U, e! X3 ]# n2 {
% X- J- n( U7 k1 P
0 c$ p. _- P7 q3 k2 {8 I: H7 y0 V5 O K; h3 t& ^* P
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);$ Q2 M% P7 g# D# q. p, o
0 ]5 G# c% F6 J* i DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
4 x8 ?0 x& C4 u( V" t* x' U) S! R- L
4 C$ m. A/ V e+ q* k, J3 H& X9 ]' `, P/ Z5 c% k# P0 a) l& p Q8 P
. ]* _+ |5 C, J6 _; f2 v8 R zeroSideOutput.print();
( o+ M! V* z- C6 Z* A, @, m: J
5 o5 p. \9 [( f oneSideOutput.printToErr();
2 x5 N5 y; \8 Y6 K- p, _3 Y g! O# a% P
: b1 b% k/ |8 E2 `
7 @; x$ I! [2 K& q9 O# f5 d' X! }- b8 D# G* j. J0 X/ E
4 c5 @) c6 I* k, ~2 m) g f4 v/ E2 W
//打印结果
0 T5 }2 x( r/ {/ @& D2 P
2 Q/ v3 t& a' ]. Y String jobName = "user defined streaming source";5 h8 m( `1 m( K6 ]* ~9 |0 C
0 O( c% [- c% f5 |7 D
env.execute(jobName);$ N j9 F2 X9 Y5 t: C( J$ T
% T4 f6 }1 B/ F. g# i
}4 l! |4 N9 T A9 v0 j' Y
</code></pre>( R7 r n; p8 f
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>: N4 ~# x F( y. q4 F
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
5 `6 @* v4 d4 I- Q: t<h3 id="总结">总结</h3># R4 P6 }# n4 @' ?) o8 o
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p># M3 w* O) k, O: A: F# n
<blockquote>
! k( _7 Z! {9 m- ]& y- X0 _8 D<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>( J# Z+ M7 }7 B& b( ]
</blockquote>. h# t) D# I! z# p, t
1 L% j9 ^( `6 M% u: T
|
|