|
) ]7 D6 U0 l2 p5 t
<h4 id="flink系列文章">Flink系列文章</h4>
0 o0 i' n. Y5 N' y$ E<ol>: d* ^8 \9 A/ X/ I
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
/ @, Z0 D8 s9 W% S<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>/ O" A: H/ _, V: X4 g
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
) Y* A/ l" A4 s<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
$ {4 `: x+ z5 Z; K3 T<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>- l# ~3 w' }* d: Z2 w
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>; ], X) [( ]7 q6 Q$ k. g
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>5 x: ?$ X% H6 G3 S, _, Z6 _
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
- s7 z( n6 s& H% @<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>/ L: S( U: A" b& l
</ol>5 ~6 ?& }& |6 m) p
<blockquote>' u& C) b9 M! D$ m, o1 n9 x/ b, a
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>$ r( g# J1 l; Q2 i- s
</blockquote>
. ], V) |" | x0 Y8 f% M<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
4 N/ L U) _% U) E1 w4 R* W<h3 id="分流场景">分流场景</h3>
2 w/ l2 B F6 c( F1 r" ], C& ^2 {<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
# |, ?, P5 g j- C- F6 P1 `<h3 id="分流的方法">分流的方法</h3>) s4 T% o; o; \ J: g( i+ I
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>3 l( o7 J" `9 t1 Q% j* ^# n
<h4 id="filter-分流">Filter 分流</h4>
! @8 u5 F% c, F/ W1 d2 O/ c<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>5 j! ~5 J0 T6 C* g
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
* b# k* c. ^; a, C0 |<p>来看下面的例子:</p>* l! ?5 }" K6 s7 ~
<p>复制代码</p>
2 F4 K$ _4 e4 u. C, J<pre><code class="language-java">public static void main(String[] args) throws Exception {- W" A1 l. d" f- F- |) `
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
0 Q. t. k# k0 A* x& v- g0 c //获取数据源
4 q9 M$ i& S6 v% R! g List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();6 @! d) W, [1 Q
data.add(new Tuple3<>(0,1,0));
% U& {; u, u# _( b1 R# t data.add(new Tuple3<>(0,1,1));
+ f% p' T/ I" o6 \7 ?: F data.add(new Tuple3<>(0,2,2));
! ?& q5 y- @( X0 N* j data.add(new Tuple3<>(0,1,3));" a3 z# d( X+ p; K+ z8 n
data.add(new Tuple3<>(1,2,5));
) `1 p, Z q8 w/ ?* [ data.add(new Tuple3<>(1,2,9));
& X6 T/ l; s/ ~ data.add(new Tuple3<>(1,2,11));
$ `( {8 J6 A, g5 G& a9 g5 c data.add(new Tuple3<>(1,2,13)); A6 _7 r3 z& H2 B& @
; L' g& C* k4 v$ [" e DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
0 T( v, p3 z1 Y8 g/ N
, |( m# X/ t" I) U/ t+ q$ P
3 q# j& p# ^( x
. O$ }3 c' H7 w" A$ i$ q' J SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
0 l2 E0 g0 ]. Y1 f S, l
6 z6 X+ M6 Q4 s, @: Y$ M { SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);; b% W: Q8 d' Z) f/ y
& W' {* l2 f; I
3 I9 @# p. q$ ]7 w9 x6 i! p
' U, b4 C& Y+ V3 T* V- a& f, r4 ` zeroStream.print();: B8 e( t2 V! W/ o9 c/ R
% o' E- T4 n6 z3 g9 Q% `
oneStream.printToErr();
* x. f a# C/ v0 f
W3 w7 T; }6 D# j+ y9 }. L/ J; r- I, f6 r; }3 z
" I5 c" {4 e* N9 W& w
6 g6 _/ \ M" R+ H$ i2 s
6 e) z# ^; z1 J7 P1 J/ w //打印结果3 X# a T4 @3 e% B9 `* a* }
" z) x; U% C/ f& D6 A4 Y$ j
String jobName = "user defined streaming source";0 d8 M4 V, X/ g" \: I4 \
, }+ b( `$ F( Q6 s env.execute(jobName);% _& ^. }, I. V, h; i
6 T( `1 Z- n6 d: v
}
$ d3 k" ?6 q- F& ^1 R- p; \6 p8 |' r</code></pre>
0 Y% V3 D J6 i( y0 _0 z- Z<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
( \( M4 @" m, X( E<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
- m+ E5 T" G' A<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
+ w8 x9 a1 M8 B" C: C<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
4 D1 H7 d6 E0 q9 x# Y2 G8 N<h4 id="split-分流">Split 分流</h4>0 Q6 U% U/ \% v0 } `: p- n4 x; T1 z
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
% l5 Z% {4 V5 @( L<p>我们来看下面的例子:</p>
9 w9 K( g* X7 A8 P0 v1 p" W<p>复制代码</p>
' j) B; X6 h4 {<pre><code class="language-java">public static void main(String[] args) throws Exception {
( x9 b9 u, z" O- z1 W$ Q
1 r( \$ {9 J% a( e2 t) j6 d$ X5 N
% I! D) M) r. s. p4 O$ V$ g" B StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
) @4 K) `" M8 I* h( }7 j" T5 b+ t8 }. Y8 {& z
//获取数据源4 z2 P! k% @: C. _3 q( u7 I! S
- X1 S- d6 _2 L
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();* w& i) U! F* l& F9 u3 y
1 \' B/ t* G" g* d8 d& m
data.add(new Tuple3<>(0,1,0));! [$ C- O1 l# Z
6 Q* D, e; `- {- W! Q, s
data.add(new Tuple3<>(0,1,1));. v( A: H4 u" i; X" w& c. D
7 E0 E" K3 F5 h \7 u! p, { data.add(new Tuple3<>(0,2,2));
' \1 t! w: e1 K. W; Y" Z+ _( S6 C0 R( s# q
data.add(new Tuple3<>(0,1,3));/ j) G p7 |1 O: P
0 `0 J7 Q8 M7 d# L/ L9 g data.add(new Tuple3<>(1,2,5));
! P/ t( m1 g7 C8 a) |
0 y) ]! a5 {1 j$ j data.add(new Tuple3<>(1,2,9));
- s9 y& b6 d- W: F
! S' v) @- `2 h/ L! e data.add(new Tuple3<>(1,2,11));
; y& ?, r8 g( u
' `+ e; |$ e& v ~ R' N- m data.add(new Tuple3<>(1,2,13));) \( {+ t! G( S R: X* P; m
5 n0 E+ R% R, X7 |7 ?
8 @0 x. `$ H! I% R3 }, L3 X/ C. E( y- i7 U& X! { O' T
* ~0 D- C. ]! E5 Q8 D& z) Y$ J9 K
$ }6 g' i% |# v0 M4 q( n DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
; ?; c! V. z) o( ], G) d8 N4 c# T6 g Q( c; f$ o/ E
4 _$ m6 E1 i, {: N# o. j$ J
8 G' h% D0 [$ f* O9 m
4 @- z, e |4 y/ d# j, U5 i6 z3 R% q2 t- X: T4 J: ]1 T$ E
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
2 m3 o, @' L' y: `: I! z! @8 N: j8 A; x' J
@Override
* i B% G+ l) G/ p
, A, Y& R1 ?9 m public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
" C$ g) a p/ G& E
# Q& T" C% j% N9 X/ G List<String> tags = new ArrayList<>();, @/ D7 Y7 [) ~' z% r
: c1 l% M/ u) x if (value.f0 == 0) {! C* t1 V0 t; C" s. {( i2 |/ i0 r
; ]$ o4 X% a. k, r' X tags.add("zeroStream");3 H* ], B+ o' R# B5 p
3 e+ G* l0 ^5 m0 g3 m" x+ ]
} else if (value.f0 == 1) {: t* y7 q) U2 O" i! }9 }) S
1 a/ }' n9 i- v' j9 H, ~ tags.add("oneStream");
* e, y1 Z* I1 V$ u) r9 h% }
" V% |' U, }# t% u }
% L4 ]8 O; [* | R7 l' H' k, P
; ?& }" ?. z# [8 M return tags;
% b- `6 S8 w6 P' E% E. D1 ~; p: m' t" C( V
}$ J- T& J& k1 ?0 T; ?& ~: V
2 j+ Y* X) m' N5 T2 F' j$ Q5 A
});, f: C) i0 s& d" h) t9 P. h
4 ]3 }- x1 B2 `1 ~, t& k
( ~5 l+ ^4 J' y' h0 S4 I1 p
1 o$ ~# U T) f* h0 n! ?3 r* B
splitStream.select("zeroStream").print();
* _0 w. `% E! a# m8 l% F$ ]. O+ I9 `, c9 x4 j6 X
splitStream.select("oneStream").printToErr();
& L% o: ]7 G# f+ z) K6 ~- R; E& \3 @. {# M
' a2 b9 x! [+ C0 `; {. p# t; I
# b. M2 e6 b5 K* O8 P //打印结果" e3 e7 n Y' w, J& Q; L
7 C0 W6 Q3 N$ e$ i* B+ S; `7 C8 L String jobName = "user defined streaming source";( k K* |; a' T# f8 x. E' |
' l$ I9 [# V5 O- u
env.execute(jobName);3 a0 Z) Z- q' {7 A8 b
/ Z/ l1 m0 M7 {2 a
}+ R' H3 x2 b8 V% m) i( }, n7 z
</code></pre>
( R' V. f; Q( ]. m/ K1 e& t& u4 T<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
: P6 B3 _3 V9 g2 C<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
3 ?) l8 N/ }( V8 W0 k<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>$ ?; J& T3 v: S, B5 b& C
<p>复制代码</p>
, P2 K* B& M) V1 p6 ?3 L<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.( _" a3 e# y) Z: D: |: _, w
</code></pre>
9 ?( Y( B0 ]0 \& v, X! `# {, R<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>: C" y- I) H$ T9 h4 ?. K# G
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>0 j7 X3 S8 h, W3 _9 C& Y% w
<h4 id="sideoutput-分流">SideOutPut 分流</h4>/ I9 R( Z( a5 L
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>8 l# T7 @0 q5 a
<ul>
9 d# i3 N" s1 _. M+ |<li>定义 OutputTag</li>) W/ W$ ]6 g6 @' P8 }
<li>调用特定函数进行数据拆分
" d- @, i7 u) [+ S% V5 T<ul>* B2 H. j- b! g4 Q6 b
<li>ProcessFunction</li>7 Y" T3 B: d( ]; I$ F) s) o
<li>KeyedProcessFunction</li>/ k5 n. L7 a; j/ n, K9 P
<li>CoProcessFunction</li>8 I! N9 B' g6 p+ Q3 r+ l' L
<li>KeyedCoProcessFunction</li>
. ?! P$ H% Q& e<li>ProcessWindowFunction</li>+ Y1 q3 ^6 w: b' m, W) p
<li>ProcessAllWindowFunction</li>
5 @$ Z1 E" L2 S9 q</ul>
8 k( f0 E6 J2 t$ H% Q4 L* N O</li>
3 @( J6 a- Q& ]" z</ul>9 p" V; j3 G( O7 h; b* i
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
; |! ]' A- N3 L0 m2 n<p>复制代码</p>- i1 S! K8 ?! r3 i/ I7 I; l
<pre><code class="language-java">public static void main(String[] args) throws Exception {' V" m3 w" w, I! I8 ?1 a1 x- [
2 N0 V) N3 N4 o M& _5 B( @
. S: L6 d: D! p
Q5 P4 ~6 d& G% ]4 s8 p StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
8 g( e: n* P5 v: k$ A1 _
1 E6 _7 h2 S# o( {" p8 c% | //获取数据源$ Q/ d" n* k, i6 t
# i/ y( Y6 G' k6 s% _5 ^ List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();- o" F7 U% ?$ j2 ]( R+ e
. ?6 B: z( a! ` n$ P
data.add(new Tuple3<>(0,1,0));% G. {! |; O* q( s6 G& Y
; j' A( s" ]" n- s# O data.add(new Tuple3<>(0,1,1));
* ]+ E/ [2 U4 z) P# j1 j0 w
/ G$ g n U" L) s% ~) c) L data.add(new Tuple3<>(0,2,2));- I' z$ E! |+ g; |, o
' w1 Q5 }7 g7 A. |6 l3 l/ s data.add(new Tuple3<>(0,1,3));
9 T4 _5 y" ^4 J5 K- P$ K9 z, R) m* W+ p+ _" ?8 a
data.add(new Tuple3<>(1,2,5));
" Z: ?8 Q$ _+ }! p
# Z# y& b- c$ d8 v/ G data.add(new Tuple3<>(1,2,9));
. o. ?7 {; U3 k2 i: \5 V6 c: l) [! `8 Q7 B( [* ]
data.add(new Tuple3<>(1,2,11));, I: W9 I; V9 c, R& I. O
. a) e6 `: |" Z2 S1 O
data.add(new Tuple3<>(1,2,13));
/ O( b5 }# t, }) T" z
* R# m. D7 @7 K7 v8 _" N! l# F
" l# {$ X$ I% G0 ^5 k. h' g% `* S6 J! x8 F7 Q
' J% U' O8 ~( P1 k" T) b' H
8 l" _ ~( [5 [" O2 Z! X5 w5 j
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
, V& m! ~( ~5 f/ h5 s9 b5 H) {/ C4 ]- x# t+ ]
( P: k* v1 ]5 [1 L; d# q
) J' w% ^3 W' ?- t" O# F; E" ] OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};8 I, X1 a6 Z1 j6 n4 j: \- U2 ?
! v8 n! S0 N- k) M9 J+ e
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};$ _ B3 _- G$ F5 C1 m8 S
6 F6 g1 i, |; g( H. x/ R) \8 {* G+ e: H& B
: G! W( y& @, u& U' G$ @7 d3 }; f, E! P3 X
0 U' }8 d& m8 L& |" c6 Z2 o
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
8 a: G/ Z" I$ |' r* k7 m; z$ d5 u' P3 N8 o1 t7 Q
@Override: l- K- }8 E# z0 R+ k; k+ K" w
3 T3 u4 `/ I/ `5 ] U
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
' R: I, p3 w, P3 L, W& m+ ~5 f
; C$ C7 e; d- \' w, s' i
! o4 |1 O/ `$ D1 ]) G% C' R$ m! N) E1 N/ P$ B. v: u
if (value.f0 == 0) {- i" x$ | n4 e# g* r5 I/ n0 S# r
1 x- }# p# o( S, Z- d" D! H ctx.output(zeroStream, value);! u) K' f- w) y0 ?5 A0 A
3 e: @3 f' K( X5 ^5 T! X
} else if (value.f0 == 1) {- b, A! G" A; V) A* B
# e+ ^3 U5 M7 B; I3 A
ctx.output(oneStream, value);
; F( J' x. } {: }1 J/ g1 a! f( i+ Y; H, Y" |# f
}9 A8 k7 _; y3 E5 c% I' t! c
* R* v; y- m, U
}0 W( U g7 ~/ x
' `9 q) ~; p3 @* p/ w
});( u1 s3 ^8 i" f( O9 G! i& |
0 g5 K/ r; A! t+ z7 s& M7 r+ X- b3 Q# _% p* g
, {: c% G6 z% ]" U$ H DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
1 p+ R# z0 V; `6 G2 r8 C9 ?6 \; X8 @- e+ M, M/ }! C {
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
. f% Q- _- o" e0 R3 z# h& i6 F: B8 F$ c
* ]% c4 B7 F* A# A0 }; E
5 b, L3 [! N# S1 a! `/ }8 {5 K
zeroSideOutput.print();
' W- t% P* A6 b4 i
: c3 h% ]$ ~* |" ? oneSideOutput.printToErr();
( B. F- X5 d* I: G, b$ g; @4 L- T! G1 e3 V1 `9 A* ~
* l. z4 Z @# q9 Z- W, C9 S$ ]8 q1 h) `+ A+ b
! S0 G2 N$ _& q6 O) r4 L
4 Z% _$ ?8 n7 Z1 X) e) ^: ~" g% w //打印结果* @( A' u( c' `4 P e4 {
+ L( ?% u3 i- T7 Z; n
String jobName = "user defined streaming source";% f, X. l+ u3 L0 h! ]4 L; m
r% S/ q- j- e! \ env.execute(jobName);; k6 T' T* y8 \* z* S
- K1 m3 T/ q) W8 m4 A, a( Z
}
& A4 D7 e& F4 C Z" M0 \9 H5 F* F</code></pre>
/ M" l+ H+ h$ p' ]+ }; x0 K<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>; l$ T. [7 k, k- |/ \" F
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
! l9 T! \ g" J( O<h3 id="总结">总结</h3>2 t$ e) x( [0 v2 N2 V; S
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
. C* {6 T' S; j8 a' X. t3 o1 M* s<blockquote>
G: d* X/ Q* h' m1 q2 o9 M<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>( Q6 H" N7 C0 W& g1 h9 `. X- D
</blockquote>
* e5 q. X4 w7 y- J- ^( |- H \% B3 V; a! D* {, k/ S% u+ t7 s. Z
|
|