|
0 C/ F/ t( S! f7 q: k
<h4 id="flink系列文章">Flink系列文章</h4>. K2 x1 c n- E$ ]' h
<ol>" N% X/ c) N$ X9 C
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>5 j7 x0 ^8 l! U# s% o
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>7 \$ }$ j1 C7 b
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
, \/ K! \, E6 m: z<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
! S' p( Z1 U G1 Q S<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>% z6 R5 x+ Y, q4 q
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
5 \9 L5 E8 ^ G9 e" ?) l# h Z) z& W<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>/ A3 ]% S: m& K! |3 @
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
2 y P, K6 l, S3 l2 y: \' i U<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
% R* T, j$ c1 [</ol>
/ v- e: g( ]: K<blockquote>
* V5 D: s+ }* \7 u<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>5 @ s8 j# J6 S0 k* {
</blockquote>
: ~; ^, _1 X; m<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>4 B! g3 G! s& j
<h3 id="分流场景">分流场景</h3>( ?; c$ p& n4 o9 T3 ], ~0 Z
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
2 r, e) J( s, K4 Q. Y8 x% c' }<h3 id="分流的方法">分流的方法</h3>
/ j% r$ V2 c3 ^) K/ W7 W<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
, D4 y2 [' t% y' S9 ?& s7 g<h4 id="filter-分流">Filter 分流</h4>; V! n u; x+ u) N; I3 y' U
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
. m, \4 n2 F- x# M, n* L$ f6 L<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
1 y5 v. N3 p3 ?: _6 K/ n' `- J. r- p<p>来看下面的例子:</p>4 Z6 E7 N* w# d b+ O. p
<p>复制代码</p>: J N% d3 O* U8 a/ @& Y) V
<pre><code class="language-java">public static void main(String[] args) throws Exception {9 C& A# G! M4 x: N6 N6 k1 L
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();& \1 Z2 r9 s* h2 Q. P" ~6 m6 j
//获取数据源
! T6 w: E& E& u3 h9 G6 _ List data = new ArrayList<Tuple3<Integer,Integer,Integer>>(); w$ h+ |! e& K: ?; ^! Y
data.add(new Tuple3<>(0,1,0));
/ K5 t% i) g$ S9 ^$ Z data.add(new Tuple3<>(0,1,1));; Q3 ]; q( o+ d2 J& o+ x
data.add(new Tuple3<>(0,2,2));
# U3 [% c6 J0 ]8 D+ J7 @ data.add(new Tuple3<>(0,1,3));
, n* x6 @& O' r; I% ?* N; t data.add(new Tuple3<>(1,2,5));
' Y" D. U% P2 S3 g6 h) V data.add(new Tuple3<>(1,2,9));
( } m F* D" D, n" h2 S data.add(new Tuple3<>(1,2,11));
( V6 {$ `9 H6 p* C' g6 K9 W/ M data.add(new Tuple3<>(1,2,13));, ^. j( i7 |4 A8 e7 J
; J/ `7 d/ q: o5 `( H DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
! |. ^$ Y: o/ E7 r% h' H: X, s+ j/ m- \3 g; c
9 H. Q, f9 r( F, y: |* ]* g* K2 S% l+ f0 r& J2 S6 y
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
4 x5 P' L* C, |( V# w7 A3 {$ z+ b! @6 N: W" r8 W: a
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
2 h! V, \9 B4 \7 R% L h
( Y( U/ |( |0 X2 L8 h3 r/ f1 ^% l7 E( k; S' c5 C& e& x M
" ~1 W6 W' p2 m% J
zeroStream.print();
9 m9 |6 ~# X3 D) ^
; N/ ~; W# x [: Z/ ]4 u oneStream.printToErr();
2 e. q3 }9 u9 H; [2 ~& e2 X$ y% P) K8 G
; k* Z3 S. h$ w8 ~& s' v, r- r2 |" _$ N, ~' ~) e8 }3 S
" ?9 A8 J6 ^- A2 v/ s) B; H; B/ [
; R; ?; V; J x" H0 W+ G //打印结果
& x) Q9 q( m+ f! @/ ?9 W# h& e
* j0 I! Q! a, S$ M+ I1 ] String jobName = "user defined streaming source";
. O0 X! M) e4 l. ^4 [, _
# P4 @" e* l! @/ a3 e. I2 K env.execute(jobName);
! J1 X. P; O7 O
& p: R+ o' r8 C6 _, h) Q. [3 ]}8 n; R7 u& ^! U2 _1 Z$ x# \
</code></pre>
: ?* J0 R: }4 E# Q/ X<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
9 i! |4 g$ `3 X# N<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>1 @6 V! |( k c
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
% d8 a$ R1 A- Z: b2 o+ \<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>% E6 t+ d7 Y- o7 o
<h4 id="split-分流">Split 分流</h4>% q1 P7 E5 @# r) ?0 t. a! l" J
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
2 s2 K0 _ \' h7 g: k- c* m. I<p>我们来看下面的例子:</p>
0 {0 U% f- v3 [0 J# V<p>复制代码</p>
" e& F; d9 G0 [. `$ {' q7 ^<pre><code class="language-java">public static void main(String[] args) throws Exception {
: m* b# y/ o$ Y y! F- V& E: e+ x9 v, t W
1 e2 h! [+ i/ Q, I
3 z7 r0 |# z2 F StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();5 m! o/ `4 g. p2 b3 m" c! `
+ f* x" f: q6 T# w& ]% s0 y //获取数据源" J5 ?: I/ h: g) Q: X" y
, h) O' w2 C% B
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();- _$ x' g f8 E( p& @
% v- x6 R6 e4 q! D; d ^% I) P data.add(new Tuple3<>(0,1,0));
( A0 m' [ q8 _9 F
# Y2 ]1 v# o' O z4 j t* L; U4 { data.add(new Tuple3<>(0,1,1));, q* w5 r) ], F
3 l6 O: i2 [7 k# H
data.add(new Tuple3<>(0,2,2));
+ D0 R; u B; L9 D0 K, e# m
' x+ H' k, l! a" O2 J data.add(new Tuple3<>(0,1,3));6 Y$ d; |/ Z7 |) t
1 B2 h. g& P" |: c2 d data.add(new Tuple3<>(1,2,5));+ l( y$ m2 I5 G9 i2 \4 u
; w( z7 m/ ], }' N: ], E3 { data.add(new Tuple3<>(1,2,9));( ^+ i* i1 \7 n# V! e
# u( m/ L% l# _& W/ z
data.add(new Tuple3<>(1,2,11));; s7 [/ b. k2 j8 T" Q8 [
1 K& Q& |6 B, ?
data.add(new Tuple3<>(1,2,13));% g7 v5 ?! n1 ? H$ P8 B
7 O" b; s9 K5 a- K0 P; r. V5 Z( h* s6 V) O5 Z0 ^* p; s, U
& ]: f/ q0 A3 T& J/ q7 a* F
_( O& F* i0 r1 y" j: W& r
# u' ^, S! D" @- i+ b6 M" f DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);6 v! o. F% i* |) _ }! ^
# V, u# p% y$ B0 P; x) K
6 ~ u, K6 P- B) l0 T: ?! ^
$ e) N7 b" H8 d, D' \, M( @* Y, h; E8 ?( h- j
) g6 n7 d c, k1 v+ F$ B2 V8 ]9 A
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {- z$ P+ t/ E% o* ^" O% ~
1 D% k8 {0 J' `1 f/ f) S/ M7 z
@Override
7 A8 F( w9 c% R: V( G1 V" r* w' B4 }+ I# m3 j+ m
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {$ V* e7 ?6 q) W. G
; o# N% [: |0 K4 @" M List<String> tags = new ArrayList<>();0 Q: F0 w2 [$ _. C( h+ w+ G
4 ]: ^# Z/ [/ }* B
if (value.f0 == 0) {
- @- ^4 G4 E {. u6 ^+ m6 ~7 F0 C {8 A
tags.add("zeroStream");1 g, q$ z2 P" q5 `$ s: g
# U8 T/ O% \% T# v' t( p5 A
} else if (value.f0 == 1) {
% ]$ @# O+ \1 b- d( y% L, W2 L
1 Q* H. Z4 t; I( [, S. t0 T1 ~( ? tags.add("oneStream");
" ^: i% u6 z, J3 C. E
& j4 d' w' p- M2 d. i }" U4 r8 n: ?: X; I- h) a
/ s4 F, f1 A/ T# W i; e
return tags;# s/ v6 r- @- R* N, \
& I9 D- g7 e: q/ |' a6 C Z& ~2 I }
, x) ?0 P' L7 O
2 I1 W& w. y8 @ });* Y7 T& g% ^. K* F1 v% n
! P7 A6 h& r. _( ?$ r) G. M# f
: t3 K3 V6 h) @ F( w% q! p, s& O* X3 P, u3 V* e! u
splitStream.select("zeroStream").print();' {5 O6 R% M4 p' d9 `; Q8 s
2 u* ^: ]& k6 P0 N: z
splitStream.select("oneStream").printToErr();
3 [8 ~ f2 { t4 E( \3 z( ]; v0 r/ r7 B
5 U& V) d1 ~' `4 {; z
8 n1 B$ u6 l- |
//打印结果
8 o, o, i! _$ ]% f v; X: X' f6 J& L+ d }
String jobName = "user defined streaming source";
v- ?! D7 o% i; q2 g# H5 z/ G( F/ p* o0 ?# r1 z+ ?) Y: S% ?" M0 e
env.execute(jobName);
8 V- T4 _' v/ D) {1 f! ?/ R* W( W5 P
}! Y2 j1 G0 H6 Y0 }7 [+ z
</code></pre>
, K7 e. D% n5 Q% `<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>4 G6 i1 x- H0 E" |& V1 J
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
2 h: p9 R3 E) G<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
- D6 u# n; y3 Q0 U2 g% @+ \<p>复制代码</p> S% E% l( l2 d) Y/ K; K
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.5 o# p; g: j% G' R
</code></pre>
3 J, i' {! S$ }<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
) ?% X1 L0 n& H( v7 D( ~ G0 ^<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>) ?3 q0 ]/ x! t( s% P
<h4 id="sideoutput-分流">SideOutPut 分流</h4>3 {$ ]& H* y- z- @$ w3 @
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
6 i# ]; X* _1 r- `/ c% G<ul>
: Q, |' Q% C" Y3 H6 T( ~. m<li>定义 OutputTag</li>6 E" g. J; F. p
<li>调用特定函数进行数据拆分! q" K( F' S! U3 x, W0 @
<ul>
; n, u4 ~; X' o4 s: E2 F( E2 B<li>ProcessFunction</li>% u: M" T" r9 {; L+ P
<li>KeyedProcessFunction</li>
5 \" c A% j+ H9 Q9 w<li>CoProcessFunction</li>
" [8 [7 N6 E n* M0 Z+ H<li>KeyedCoProcessFunction</li>* T/ W2 q0 G$ ?+ G4 o- |$ E
<li>ProcessWindowFunction</li>
6 F- }3 u- X/ @: b, Z; k# Q& a<li>ProcessAllWindowFunction</li># ], `7 X3 m: B, y6 U9 W+ C
</ul>' [' v; y9 z! u3 n
</li> K @; `8 [# U" p4 W8 ?; y; _
</ul>
/ i$ T7 O/ R0 M, G<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
4 A& u2 U0 W/ S<p>复制代码</p>
9 i- t$ {" _3 g: W0 t% R# A5 t<pre><code class="language-java">public static void main(String[] args) throws Exception {" b, A- t; I5 X9 G
5 q+ S. p5 e% S) O& A, f% h g: W& e. o
9 c4 k8 s/ t& f$ y" E9 o StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
8 C2 _ A- g+ V0 D; F9 K+ M' S d+ q; \/ {. x+ u3 k8 q" E! k
//获取数据源
4 _ {8 z5 \# c, m4 H( V' M3 c/ I3 y. R, d) B9 C& `/ F
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();8 T+ {7 j! U$ O& Z/ b0 A' Z) F. [
% i2 ?4 G. |/ q0 w( h
data.add(new Tuple3<>(0,1,0));
$ }" v" E3 A m1 V+ r' t! a+ y2 a0 \1 ^9 W8 p; J
data.add(new Tuple3<>(0,1,1));$ W6 b3 o' g1 d0 q- ~3 P
6 }& Y4 z! P; {3 y data.add(new Tuple3<>(0,2,2));
3 _8 l2 B# n/ M, h e/ d& {. i) F! T0 G
data.add(new Tuple3<>(0,1,3));# I3 _, h- v' s0 K9 n- v& i
. }' [( r# M8 Z; X$ B% f2 C8 A5 ?
data.add(new Tuple3<>(1,2,5));# _7 ]& w) \9 m" s, \
/ J u. ]. R1 S3 y/ T$ D( k* y; g data.add(new Tuple3<>(1,2,9));
0 ]) K% V/ D, K6 \& u3 T
" K2 g/ _5 p% b( s9 M+ e. v8 w2 q data.add(new Tuple3<>(1,2,11));
6 l* ] f. P$ Y* ]* f, ^
]+ }* z9 s2 X% {" Q( ? data.add(new Tuple3<>(1,2,13));4 h6 A' x8 P" Z# k9 y
$ I& r- M0 h, Y! c' C/ h4 y7 I |7 b0 W3 s6 c& U& M7 w& H. d
9 p& w5 |4 Z l% k. z. _7 u
' B$ ~9 r3 m" i- ]" w2 `1 S
% M6 \! f5 h1 v* ]+ r+ @6 B DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);7 b3 c$ z" A/ S6 N: V% O
1 f. X3 C- O$ A% i' T m4 k9 s+ Y6 u9 \
( v, T6 \+ S6 R OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};: O) ~' D; C2 I: R* ?) L
# J( ~0 e9 o, E' B6 U1 p OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
4 \$ l4 |9 ?4 s" m0 s' `5 `4 Z$ I5 D" s, U
. L# o6 y9 w9 q2 F" P. |: _
/ q8 Q8 O; g! s1 |
0 V3 o' |& e7 `$ @3 u8 S1 x
2 b7 B$ H& Z+ U: b/ x SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
+ d8 U. ^2 E _; T S" _- e9 l2 _3 C' r- m( |1 o" u
@Override6 S# M6 d$ r8 m! Y' p
- \5 d/ z- d& Z5 }
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
. g6 l1 u5 I" o6 D
1 t1 [+ R, y6 N O& C8 d3 W1 m/ T6 P6 z
. r$ j" W8 a V" w% W
if (value.f0 == 0) {- S. b& a8 ]# K1 p" [" c
. v# s% ?5 l" C4 }" ^ A& u ctx.output(zeroStream, value);+ Y1 B8 r$ X& D9 ~9 R* D) M
/ {! z6 I# m- i. P, @9 G6 p& C( {8 I
} else if (value.f0 == 1) { A6 w% i5 }# I7 Y5 i9 J: l
% W! ]2 k% U, k2 y ctx.output(oneStream, value);
# |& C) Y% C2 ~" ~* o/ [
8 P$ @! F2 J! p# t1 g( J }
6 Z9 U% c G& ~+ u: d& n: }6 H* [- \; o9 x$ R& ?$ t
}
$ a3 V! t" Q" s' |- H& \; O( l) l$ F6 d/ e( I4 p
});& i% C6 B0 |8 e+ T" C7 C A+ }& B
; y j3 N- s' r% J
8 d8 |0 d7 k! `( W/ |$ U( k) A$ N8 w- Q w% [6 {6 i
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);0 G3 G, ^/ W: E, m( c, h. H
, A" q! T# E0 @; z9 e" z B& N7 _
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);0 q# y) J) R, U) o, X: I; f8 F5 ?
: z: N; t5 s# k" p# l; L
+ \, i5 ~# T" z! B' R# [0 J5 g9 [
9 ?4 r' e+ K. X6 G7 S zeroSideOutput.print();
+ W! t$ Z9 d; R! L$ k" J& ]. f* f2 G' `( o* _6 `
oneSideOutput.printToErr();
1 ^1 _1 A! y+ |* f# a; o& \
2 i. H# s* y2 J/ v# b/ G( m3 B" ~/ L
' R' o+ Q$ n% k0 V& A* G1 e7 U$ ?3 O. i+ y
4 V q- t0 |+ [$ S6 Q/ {
( T f; M5 i5 }& z! F, B //打印结果
) c% {6 u0 s% J" U+ G/ _+ c3 L5 v! N' Y7 L+ U. q9 Z
String jobName = "user defined streaming source";* F" Z$ c/ S2 i+ B9 e% h
* b& r1 J5 y* g8 F& z7 Y8 ~0 q env.execute(jobName);' R5 v; a* ?; g& m0 ~# E' Q5 n' e
/ [" r A/ h8 u
}
1 N- E e; B' E$ M# ^3 B Q( S0 Q</code></pre>
4 q6 g; j' P7 J, b- U, y<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
9 l) ]8 K0 L+ J, e+ b! z7 N<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>+ U8 t3 A' C. |$ h" ~1 N% r9 @
<h3 id="总结">总结</h3> q6 R8 }# q8 R( ^) _7 G) B
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>9 G2 Q- h, k5 Q# \4 [. m4 n
<blockquote>
# W0 O/ N" x+ j7 T<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
) v) }/ A3 e- T) C2 g</blockquote>
$ U& n" R! x( D7 C9 D! e2 X# V. v3 q' x( a4 Q
|
|