|
|
d% t4 C8 J) d% x<h4 id="flink系列文章">Flink系列文章</h4> s8 ~+ D! \: S- O, h
<ol>0 x/ b& Y* g% q# s+ E5 U, ]" ]
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
% Q! V! S! P& `; Q7 d<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>' W8 D% o4 Q2 s5 z* [7 d2 E1 _/ X
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>5 J3 K2 o. ? \1 I: l/ \
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
o( {9 G5 f, e$ o6 @, f: K/ {<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>: d5 x/ g Q9 T% R3 n8 n, Y
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
* j5 u# L* h/ P- E. m$ l! _<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>; q6 ~- k& D! K2 r
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
# ?% F& P W) w5 l<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
' }9 W" `, Y, S) X) E) C1 m4 D</ol>" T ^6 J' j" i" V0 ?* {! x
<blockquote>/ d5 R9 g$ B; R/ d4 ~9 S: w8 [
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>' `% [# {+ A: _7 n" X n# J D7 @
</blockquote>1 _7 a/ p, z; i' p3 p, W4 d
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>* g. U' D, A5 ~. j& |
<h3 id="分流场景">分流场景</h3>6 H4 l0 X: h) F" O' w6 n* \; M1 B! ]5 w
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
8 [* R/ h& k Q% }" T$ \! n1 }<h3 id="分流的方法">分流的方法</h3>0 z& ?7 J; J o5 k, _
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>. t v" @# V' V9 U# l: U- V
<h4 id="filter-分流">Filter 分流</h4>
6 M i% m( p+ k+ x; c0 ?/ w<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
: y, e& [+ |, c& o6 W! u6 v<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>! B( `, i5 R. N
<p>来看下面的例子:</p>: y2 U4 [: d4 |5 t8 q: x3 C
<p>复制代码</p>
' Z* I; R- V% }% O0 m: n/ M<pre><code class="language-java">public static void main(String[] args) throws Exception {$ I! m9 Q- c- b+ F' y! @
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();; M. P8 {! {2 ^5 s# D; i; Y, q
//获取数据源
5 n. u+ @! T' B! y List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();) [ z6 ?# i% X+ x1 F0 V8 b' M; T) p
data.add(new Tuple3<>(0,1,0));
$ g. S; d9 t/ S% B. v p data.add(new Tuple3<>(0,1,1));* M8 u; f4 y; }6 [% z/ c+ s+ Y8 o
data.add(new Tuple3<>(0,2,2));7 d. Z7 T" n2 M* {
data.add(new Tuple3<>(0,1,3));4 A9 W+ d5 C0 E( ~
data.add(new Tuple3<>(1,2,5));
# ^7 @; K! T( n: B data.add(new Tuple3<>(1,2,9));7 ?! I$ {. b' s) a/ S
data.add(new Tuple3<>(1,2,11));1 i, j4 e7 e$ u7 H, [- M7 U' N
data.add(new Tuple3<>(1,2,13));# F( ]7 ^* b! O0 e7 f
. h5 [+ I& S# D8 f: \ E
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);# }# I. V$ y! x% t+ d/ L g
; n4 K4 C. L3 @
! P* T6 A9 X3 \% M3 _
5 b7 b6 G8 G$ n! j M SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);( c1 q) Z$ v+ G! F4 a, x: f2 d4 J
. U. ]8 @2 e G1 f5 S& k; p# Q
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
9 }* J; Y8 c$ s1 v; N; V; |' R2 F+ G+ o# D0 k4 z
# d s2 C$ z( X# k. z4 G/ J
5 I/ o: @& h/ n; d zeroStream.print();
% E7 C! w. w& ~5 U# T% {" m8 B3 I4 t1 J- u1 k
oneStream.printToErr();6 t/ h4 z% d9 k- p* n6 j/ C
5 G' f3 V* o: A( Z0 W9 f
8 d7 Q' Q8 h4 w( `5 `4 E
7 s5 v( k8 ^2 F0 F! q# c& \/ W; e- ?# y3 O: d
+ u" s8 @, e2 ^6 s G) R
//打印结果3 `( }! Q8 T0 q0 R' |/ ~6 d! @
^: |0 t3 m) ?' Q3 ` String jobName = "user defined streaming source";
4 ]! ]: Q6 g) ]
9 r! B! n5 B+ @% p8 A9 q7 | env.execute(jobName);8 s, I# e! R& B% L; ~
% m8 z2 G& H6 B) P. o3 s% {}! `* x% W( _+ [6 Y" a* r
</code></pre>
$ Y. c9 r- M* E5 Z' o9 S- {4 L<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
1 E. k r. O1 W7 C: {<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>. X( H# M7 y5 r. u" s5 H
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
8 ]0 k# B+ [0 d<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>- `: n% Z/ T% p; C, I; S
<h4 id="split-分流">Split 分流</h4>3 E, s* Y4 v* S/ o& ]8 N
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
1 y% X/ t& L0 M( v<p>我们来看下面的例子:</p># e& J( C% f) J8 |- P* w
<p>复制代码</p>9 J; p* q/ t2 R
<pre><code class="language-java">public static void main(String[] args) throws Exception {! \( y; k' L7 C6 U6 X* r
+ v5 @4 ?& T& ~1 _- C! r) O8 j
% o& Q$ C- Y/ L; e: Q; [
, Y [) W% |1 I, E8 f# ^ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6 T$ h e: F3 s5 g- g! [
! A0 O0 J2 v) Z1 F, f //获取数据源/ N2 z1 Q; H2 [
3 ]/ K/ l7 n* [4 @1 q; X
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
; g; O+ W3 a+ u& N ?: Q0 [0 E& l' Z4 ?" z- h/ k
data.add(new Tuple3<>(0,1,0));5 c; ~& Z& D- K
8 l& W/ V/ ~$ H' m' y2 W data.add(new Tuple3<>(0,1,1));
7 ^ W6 x5 j' |$ E0 z9 o
! S s$ w, U& A2 \ Y+ F data.add(new Tuple3<>(0,2,2));
+ O5 N9 C" M9 c/ `' [% U: K) [, z9 f4 y2 G. V5 l$ y
data.add(new Tuple3<>(0,1,3));
; L( I) j6 H3 O
+ k9 `# v& _) [5 z. B0 y+ s' } data.add(new Tuple3<>(1,2,5));
$ O) n8 i/ }9 z! [
" ~$ t! i: n5 L) D data.add(new Tuple3<>(1,2,9));
2 Q' c% K2 T8 O0 \ `
- }2 s D) C0 I$ ] data.add(new Tuple3<>(1,2,11));. ~% \+ e. G! t$ }$ D9 o
- L. ]4 U) L* E data.add(new Tuple3<>(1,2,13));0 q) M& `9 R, N+ g6 r9 m% k- s
' `# a. d$ G6 [+ o& B. s
0 `" K" X. L2 R6 `
, d+ M# v3 t# l7 N" N6 }7 x# C2 B
& q2 }9 \& o7 Y2 L. f# y5 g/ n; `: r4 I
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);1 d/ }# V! b; _$ @) l% P
5 v8 |+ n2 L( }7 |; \) f, a f N* i) y0 ?
6 l0 N8 R! x- S
; j0 G3 S4 R' b3 U3 \# e+ P3 u A, V' f. P" ^
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
. Q& \3 w L5 ?8 P9 p" l) V7 k9 C' X t% t8 U
@Override2 }" }9 d. y' [/ X, F/ D/ n3 l9 ?6 z
$ y& ^& B2 O) ]0 y7 p+ f( t
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {3 c5 B, ~# U. z7 h
9 K1 ^4 \8 _" M5 z List<String> tags = new ArrayList<>();
: i8 `5 w4 w( R+ D
- P) m7 R! e* \; a. H2 c if (value.f0 == 0) {! \ @$ W2 ~% A ?
$ A* p/ Q {7 |0 l tags.add("zeroStream");5 Q- V; U* _* p2 k& e. B) P
# ~6 L O' p" C& v9 y9 m } else if (value.f0 == 1) {
/ v$ _9 r# G' r# o$ x; A, I0 ~! |* c
% R; I6 e* i( C! V tags.add("oneStream");
Y- A+ j' H/ c m8 D: J
9 H1 v9 Y# R8 b- f" B! x1 f. Q }7 F: j4 L8 n- T8 x/ B
$ h0 Y: T2 G0 n6 K
return tags;- v: G7 O1 a, ]
( r! }6 m* S5 `# {1 H }
6 f+ M% {, ]1 u1 K, q$ t6 R
- D0 C9 s9 K( e) f: k, P- w1 A });( ^4 E" F8 X0 V8 m1 o
9 O" m5 C8 t6 }% R8 Q$ {
, R: f- T9 w3 y
( _4 R+ L* d+ U: Z splitStream.select("zeroStream").print();6 _/ V* Y1 L! r/ I s4 U
) k: q1 f' Z' x" J. C splitStream.select("oneStream").printToErr();
" O: s: e! Q( V+ W4 ?" j- }# }2 e: g @: C" d# c
8 \: A/ ^( S9 ~7 X8 J7 j! E) k9 B) A" k. H
//打印结果
/ D' _, C: Y- V% X
4 @9 K; Y$ E0 _ [ String jobName = "user defined streaming source";" |; f2 t+ q) Z0 u
9 S% S( `& i% m& B$ v% e: Z; D
env.execute(jobName);0 C% M5 e* {9 d3 z, n2 M6 i
% E9 O* x4 W, w- l) r3 N0 n
}0 i9 r H6 m! p. {- k( p
</code></pre>
4 k+ d9 c- [: j7 D/ [+ Q<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p> J$ e6 x# j- D C' H/ Y
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
6 E/ _ N k" M# s5 r<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
: V9 o4 G4 c) K' ]<p>复制代码</p>
4 g4 p+ i7 ~% L F% T( \<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.# F: _/ v8 n4 M) p2 |
</code></pre>! @3 h4 \& Z# s& r: \: s, S8 g) q
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
% P4 |6 l. I0 l5 m<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>3 {: c7 m# e Z
<h4 id="sideoutput-分流">SideOutPut 分流</h4>+ j8 Z4 Y; W+ j0 Q7 E4 z
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>9 q5 L2 }& k5 [4 d. Z
<ul>: u6 [9 ]9 J1 F. o3 y: L
<li>定义 OutputTag</li>' Z7 A; i" p: Y9 L+ T3 K# B# E
<li>调用特定函数进行数据拆分8 m6 y2 Y. I& H2 J, B8 S
<ul>7 C6 T% a0 W- R& K
<li>ProcessFunction</li>
' c- B2 M8 f4 u* K8 f: a( t) X( k<li>KeyedProcessFunction</li>3 }" ?4 D9 T9 {2 h, `! d1 Y
<li>CoProcessFunction</li>
+ j' _' b* E$ I5 g3 `<li>KeyedCoProcessFunction</li>
# H. M1 b4 L) J' m; Y( r, d" E<li>ProcessWindowFunction</li>+ A# B2 N* \' H5 L+ s
<li>ProcessAllWindowFunction</li>- h# ]1 E5 A8 v) P! X
</ul>% O9 t5 \9 F/ O0 ~2 }
</li># o8 W6 h$ D6 _/ C& G
</ul>
/ {) ]6 K' M% y* ~1 {1 z<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
8 J. e( \6 L/ H* s<p>复制代码</p>6 Y* k. N8 z, E1 }. w
<pre><code class="language-java">public static void main(String[] args) throws Exception {
# i1 V+ K* \3 d# s6 f, Z" U1 c; C6 }6 U3 d- l
0 b" a; l+ c3 m" f) K' L* J) T) K" S7 w- ]1 f& X! ^, q5 [
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
8 K4 h1 X9 q( x" d4 \- L! | T: k" o0 o) K8 v
//获取数据源
$ H. _, q" W3 R5 s1 x( r' p; x3 P$ [4 G
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();5 \4 r7 o: A+ @7 g- ~/ |8 [$ x
" @6 J/ r3 i* @2 g! I data.add(new Tuple3<>(0,1,0));, D8 s5 Y- p$ \0 d1 i U0 A
( `) C! H: n/ H+ a data.add(new Tuple3<>(0,1,1));
9 B3 ?4 I4 W1 i' c; ^2 [. p0 v) c: [6 f+ b+ C/ q8 \
data.add(new Tuple3<>(0,2,2));
6 t; X# c# @7 l. }
+ I0 y3 o1 E( I8 t% S. {2 W3 C3 w data.add(new Tuple3<>(0,1,3));
1 d' @2 E, l" C7 c) n0 ? s; V% R
data.add(new Tuple3<>(1,2,5));
8 H; Z' q; y) O. v% c! f! B9 a9 Q* f3 D( ?( a
data.add(new Tuple3<>(1,2,9));
6 B! W; Q/ m6 f% x9 u N1 [% F# m% _2 V
data.add(new Tuple3<>(1,2,11));+ ?. q, ^6 z" F5 g2 I
8 A8 r& Q. A) F7 ~5 J data.add(new Tuple3<>(1,2,13));
% X2 J; n; A( k7 a& ]* m7 n+ U& t
Y# t& ?+ ~, N$ p5 P& R
8 N. W, Z( D' B/ Z# D# E& C) K4 E, i( P O
1 p: ~1 }* F5 m2 M" f$ i+ Q* {5 s
, n. F% ~3 l1 H) V9 G DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
( P/ M" y9 F& Y$ L5 a, g, g
* h# }, G0 k7 |5 y; T
/ F* T" b# a" c A% F+ u1 M2 E3 H, g w* }2 P, E
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};, E! I8 K% I, y/ `
: i3 Z2 Q% w/ s. Y( b0 g( w8 y OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
[* J% \7 b: ~0 `6 C9 k1 R5 d$ D; e. Y$ z8 U
- u7 C5 X' x! R' `( q
0 H6 v" ]+ E; p, M
0 x5 \) X" O! W F+ n
% B6 q A4 T3 ]. \% {) `/ V+ ?2 _
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {9 n; y! K5 X8 d2 y& O$ ^! J
j% o# I: {3 z- ~- S2 Z" s# C @Override! N: b U0 t8 [, [: }8 M
# j& X' l" a, T& n public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {4 D/ v# K5 A" V0 J7 T+ U, k
0 w- Y- `. X* U2 @7 D# P5 w9 i0 ~7 C3 ?* Q$ i8 H4 ^, u$ q
% M3 D, Z9 [+ D, P/ `1 X6 c
if (value.f0 == 0) {0 ]! Q3 N% k% ~6 B/ e2 S0 L* O
" i. G' I8 q9 x0 J
ctx.output(zeroStream, value);
& }6 ]4 T# ?. P
3 A2 i+ n( [2 d/ G+ J+ n } else if (value.f0 == 1) {: f7 k* e$ l! d" E; h: `& ^0 ?% S
) U# Q, D" j8 ^' L6 p ctx.output(oneStream, value);
4 Z; D; ?+ F" D" o! B' P. p$ k$ G* i/ [# O& [/ f- F8 n
}
$ z. |: n) X4 Q: ~/ E% H8 ^# d! p0 E7 K4 w. _+ a+ @! S- }! ?
}
& ]5 }! b- q% F# P5 l
" n" a. |$ y5 c4 ~8 d4 j });1 @$ m# r' C0 \# A+ e( J5 i
W1 O6 Q$ P( _! H& m. s
6 d9 \; G/ @" ^+ ]4 x* m* l( s- p
1 x( M! q7 ~, d9 ], r DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
# B+ m# L: _% ]! A( }* X
4 K8 i, k6 q) j0 h. X' n f DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
7 W1 f0 a# L. G9 R# v# v& E
: H2 A* Z: Z+ o/ [4 Y$ ]1 x: }% G% q* Z; [' _% D9 I
d; @# X \2 Y2 l zeroSideOutput.print();
4 Q6 H& e+ y4 Z! r. j/ J! @) X& s$ |* w3 l$ i% _
oneSideOutput.printToErr();, W* a$ q- V2 ^4 V! M/ E9 ~. F0 ~
+ U" c+ q0 E( A9 \8 @
$ F7 U# d6 w% ~% ?3 [
& o7 ?; V; |, N
( D9 T& h J4 S8 E( Q/ e+ O9 w- v2 n6 {( {5 ]+ i" L
//打印结果
( g/ W; D4 `3 `9 [ O9 Z
7 {# M* ?. B& ~ e String jobName = "user defined streaming source";
, {8 e3 x! n K' ^1 R
" k( ~7 I y5 p" W4 h env.execute(jobName);
2 V3 t$ U: o( |4 X
1 q3 D: N% ^' [; ~8 V% e# f& p}
7 D. ~! H0 }7 K" o2 }$ O</code></pre>0 \) [, c0 k+ G
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>, e/ l; `( Y* d, [! Q0 V' H
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
5 |+ |. w( l. g! G2 l4 H9 J; ?" z<h3 id="总结">总结</h3>
% k# F% `5 c u- s" b, J<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>! \4 Y, u( ~8 }( r
<blockquote>5 L* _0 x2 ]3 }! k* p
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
- }/ Q0 e* P# y/ I8 s6 r: Q</blockquote>6 Z: C% O+ m/ A0 _) L+ M
" d6 f4 k9 \% Y& I4 d
|
|