|
|
" X; n/ H0 C4 H7 z<h4 id="flink系列文章">Flink系列文章</h4>
( D6 D. I4 a; m A4 {# Z! i$ l; P$ }5 n<ol>4 {$ V+ e9 p, V
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>, I9 }' u( e7 Y; q% j4 a
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
5 a7 M$ o' b0 [0 Q3 V, M<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
( P, B) I; V2 O" u; c3 i<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
7 @, Y, Y& c8 R3 e; ~" v<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
8 r: S6 j0 B% i<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
1 ?* [# \9 n- M% {; c. L+ M<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>+ L2 Y* N6 i- x/ |- M: @
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
4 c& l) I4 N/ G& d: r; X' r<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>% h5 I* G0 e* i; V
</ol>
* L+ B& O/ }' N! u" }& O6 Y<blockquote>
* y) W0 Q" \3 Y+ @+ X<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
4 |: e6 A c! T3 Z5 \/ `/ Q</blockquote>
5 g3 \* y# v1 x) C% }<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
7 ^2 e, ^- A, H: `0 J% {<h3 id="分流场景">分流场景</h3>
1 J8 W$ i2 t0 G) g! Q<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
) G' s( f3 I, [% ?7 |<h3 id="分流的方法">分流的方法</h3>
4 R* ~0 @, Y g- E2 m6 P# j$ P<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
% V# y1 u' L$ a$ Q4 Q0 D$ [* d<h4 id="filter-分流">Filter 分流</h4>
9 q6 k1 h2 j* o/ C7 W7 ~1 b<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
6 @ X. w" x* X: a( \<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>3 i3 C8 G, ~% d7 ^$ w
<p>来看下面的例子:</p>6 l: b/ z7 Q9 }& t) Z5 O; Z, s/ ?
<p>复制代码</p>
4 Z h/ a5 b6 A) \<pre><code class="language-java">public static void main(String[] args) throws Exception {
2 Y: a/ b3 Q; X. F4 _ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
% x/ N1 i# h- s //获取数据源
1 @6 D1 |! P7 f; \- X4 f) J: c+ I+ r List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
3 Q8 n/ _3 @: Y% t) I data.add(new Tuple3<>(0,1,0));
' G7 w: I8 ~' ~: F8 D- k# s* u data.add(new Tuple3<>(0,1,1));
+ L, P; n- U* w% y data.add(new Tuple3<>(0,2,2));6 P8 \. ]0 x/ @1 N) _
data.add(new Tuple3<>(0,1,3));
# ~% d: d4 A1 D; q data.add(new Tuple3<>(1,2,5));/ S! @& F3 Y4 K: _/ o" q
data.add(new Tuple3<>(1,2,9));
5 j7 |8 O% ^, o& a. c9 m" ]% N data.add(new Tuple3<>(1,2,11));. N2 H0 O$ z, e6 {
data.add(new Tuple3<>(1,2,13));
: z1 l( z* N) _$ D
* W& A# r4 X5 ]$ ?' ] Y ? DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);$ E: _# x! _+ U4 E1 ?# S
' s0 Q; D1 ?0 k U) w' b, O
1 X$ O3 I' L3 `0 B/ o
# z" |2 t- O7 e SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
1 p- m0 U' Y; S" |: v% r. r) S) \$ P W: }1 l
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
' q3 Q' J$ D O8 n$ D/ ?9 x& a5 y: _7 B% b- U& f5 O8 S
. U1 d$ S7 O1 }$ ?1 I2 X8 ?9 Z% D0 z7 @2 M# H
zeroStream.print();+ y" P* c- Z* R
+ g7 {. l _2 e) j1 x N$ d0 Q9 B
oneStream.printToErr();
) B2 G' c$ S9 K o, ?7 ^) q: X' t
: t' f- D% v. \( i+ U$ n% E5 l1 a
' l/ c% p# E8 Z% z W" [
& F. p: O( y; ^5 v! w c/ t1 i1 }% Z3 ?+ K3 W
//打印结果
. H: r d( [( ?# |" y' @4 p+ \1 w! C ~ A# D0 S
String jobName = "user defined streaming source";
0 r6 T& v- N" O6 O9 b+ i7 c8 v$ M' [4 w% x$ [1 B& @5 F0 }
env.execute(jobName);
9 h. ^' ?$ d" E% X: `) ]1 `. t) P( y2 f& m# H0 ]+ z1 @
}4 a4 Q" h" U4 C& q
</code></pre>, ]) p% g+ ~# f8 h' q
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
2 N/ `( O) e$ x+ ]<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
0 g, Q) h3 c* h4 O<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
, Y- D) }- E4 e7 @2 x4 t4 L<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
$ n+ g* F5 Z9 H% C" b$ \<h4 id="split-分流">Split 分流</h4>
5 r B- R$ K2 T) l<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>5 @" C. u8 {% t' h
<p>我们来看下面的例子:</p># _0 m. i( M" V! S
<p>复制代码</p>2 v3 s5 S! L. u1 o. j& e4 _
<pre><code class="language-java">public static void main(String[] args) throws Exception {
* J; d& R3 a9 W7 f
1 c5 z+ W1 G- o! u$ e* w& r
! L& @9 |7 c b& m: \" l [2 H! J1 W7 D
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();2 \0 k; g9 @+ @( b
8 ^" i9 L; y% S. N, E8 }7 P //获取数据源
8 [' x$ ^9 O- n: g" z3 I- z
; f$ b* S# c+ V- a List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();- W7 E# v2 C+ d/ Z, c4 S- X
% p% e' Z. k, Q& M data.add(new Tuple3<>(0,1,0));# ?; p8 y8 i3 j
' K% ~$ ?, a' X$ K& w ~+ r1 } data.add(new Tuple3<>(0,1,1));9 j" J Q* `$ x! ?3 [
9 b* F/ k8 |! ?$ r
data.add(new Tuple3<>(0,2,2));
' D5 K8 x, ~$ U3 i1 \
: l! x! z* p* r$ i3 X% ?- V& q data.add(new Tuple3<>(0,1,3));
: |0 i# Y6 j0 E; P, N" [3 p/ k! u; o3 g) V
data.add(new Tuple3<>(1,2,5));' o; q* |& b+ A7 q
. Y. |6 N1 ~/ J# L, G* U
data.add(new Tuple3<>(1,2,9));& a* w7 S, d' S
Y2 X9 \8 q. a3 v1 G4 W/ | data.add(new Tuple3<>(1,2,11));5 C% i/ r- W7 n
n$ b9 r! j* v! x/ Z: F0 k data.add(new Tuple3<>(1,2,13));
) a- }. _- K+ F. w, U5 z3 Y3 l/ V% f6 n' i% |
( g) M, c( I# r/ s3 l
5 K# ]2 \7 ^, }' r4 ~0 j# S4 m
3 {7 Z2 A6 K/ L6 ]+ e
! T0 Q9 k8 H1 R* h2 K- d1 { DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);' c# n- E/ U' V
3 e( E2 t5 l0 V4 e2 C
9 F5 I2 A: G# C* _& U5 N$ ?% t
% E- v% U! M1 K+ f$ A2 Y0 [! Z5 H( [8 @. n
Y, x1 v# A: {" Z3 |
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() { z( A8 x; n8 y5 B/ ^
7 M8 W+ b+ |( T6 @
@Override3 V% o5 Q3 r# f
0 O! |2 m" `8 q$ | public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {6 V7 C7 \0 d0 y p8 W
) F: x/ v; n/ x! c# O+ I4 ?1 w Z List<String> tags = new ArrayList<>();
4 o* T/ N) o& X: N8 }& p7 c, ~! O! T# E6 o
if (value.f0 == 0) {
& w- V, |5 h( z0 m
5 ~- G& ]: K# E! j2 b2 _! Z8 Q tags.add("zeroStream");
' k- K* ^$ f5 ^/ P8 P0 U$ `; u3 r$ R) t) f7 r
} else if (value.f0 == 1) {- q; s: i6 _9 ]1 r3 q
5 m% t2 O( t* y2 e8 { tags.add("oneStream");7 o# |) @! J7 C6 C+ r, B
$ G/ g/ c( |3 z+ z) E4 O( p! t
}" `* H3 \3 ?9 b6 @
$ m) P0 {1 x d% _% v0 I5 @$ o3 K- [
return tags;
+ o" ?# d+ `5 ]! ~4 P
, Q& B8 O% v3 S! _ s+ t' r }
2 Q2 c/ _; N8 p8 P# r0 D# s. L8 T8 ~* i1 ?: n2 g _6 m
});
2 V& Z! ?9 J$ G0 m/ p
5 H* _/ x1 Z7 H4 U8 j* V* W4 E X
; ?$ Y/ M1 V; V. Q2 e/ b0 j3 R2 }) m) I4 y r- ?2 K$ \
splitStream.select("zeroStream").print();+ B+ n2 C: F/ W3 d
/ [2 k+ W7 K5 Y! s5 ]; R$ `) A
splitStream.select("oneStream").printToErr();
! k% O; p' Q) o& u' Z+ A5 H( o7 e) m$ C, b
. e2 [& |; C2 B& g* m$ |
: ]5 `4 G# F+ x //打印结果& w/ y( G8 f; W- a4 F/ y5 r
& z& l( O0 N/ U! b
String jobName = "user defined streaming source";
6 b0 O5 L4 v: k* I, z. ^! O# H* K/ N$ m, Q
env.execute(jobName);
% u5 ^* k' Y) V; r( i$ ^# a4 |6 p) Y! H6 q7 t" T
}2 m; m5 b3 ~% T' o% e: M: Z& Z5 P- [2 o5 b
</code></pre> _. V, b o: C, {1 h, {; }/ z
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>4 a! T! `! G+ y# L
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>) @; n3 j p& H. y7 a
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>2 s: }" z% b8 o6 @ p1 X% k
<p>复制代码</p>
& u. ]( T' `' ~7 e* F<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs./ H# P2 F% @3 r" p' k; _5 X
</code></pre>0 y8 r- I2 Q* U' l q
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
0 S G9 `# i; X% H' w<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>+ i# {0 {& T! ^7 j9 Q
<h4 id="sideoutput-分流">SideOutPut 分流</h4>! f) m7 G5 ]; W4 Y* z
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>; Q( I7 W! l# b/ s( [
<ul>
9 M. o" `/ E0 ^) @# W<li>定义 OutputTag</li>4 d! p. T2 Q& e- F% c; `" ]2 G
<li>调用特定函数进行数据拆分
Q4 H6 b- e: p$ ^5 u<ul>0 D8 j$ P1 Y" V' P3 N
<li>ProcessFunction</li>
, R# K' m5 C! v; F; D) w<li>KeyedProcessFunction</li>
) m- m0 e+ E9 [; {# g$ N<li>CoProcessFunction</li>
/ p g" X# o/ ]4 [<li>KeyedCoProcessFunction</li> x" e+ U. E5 @) N
<li>ProcessWindowFunction</li>
# S& I9 E" j1 h/ w1 X- K<li>ProcessAllWindowFunction</li>
5 V+ w# l" J9 a3 a; k w/ I</ul>
8 O) |4 L9 O# E: P</li>
9 a# o/ L. a- Q4 v' q</ul>
" r5 A8 f: E8 K) |<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>' H1 Q5 H0 j# Y1 C' N
<p>复制代码</p>
9 G# C: y+ L: e; z<pre><code class="language-java">public static void main(String[] args) throws Exception {
* Q4 S4 V' y3 F6 P$ \
1 r) L- w; x4 r: @" H
1 m$ E9 |9 h( R) H; u6 a0 C
G! O- H& [0 w- v, x6 _ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
( m9 }; p) L. i! S, D0 d9 t, }
3 T1 C+ z# B! W //获取数据源
) v; g- Q- G* o1 k% e F2 K# _# W/ t! ~( W: A# S
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
1 }, g2 O4 _+ H
( z( i9 T3 A3 M$ @1 A data.add(new Tuple3<>(0,1,0));4 s! l+ e& F I" [& n
% c4 ?; G* a7 k! D3 R data.add(new Tuple3<>(0,1,1));
7 e! p$ p8 g; m/ p
* }0 Q, Q( @5 Y( F% x data.add(new Tuple3<>(0,2,2));, }+ O, z2 X a
4 @. d& J1 g) l4 r data.add(new Tuple3<>(0,1,3));9 V6 x A+ P5 N2 ]& v
) _$ P! a! F2 z8 E( z2 H" I4 @ data.add(new Tuple3<>(1,2,5));' w q1 i+ \% Q- g) d9 u4 E
) M% ?8 ^9 z8 K! Q2 U9 a' V data.add(new Tuple3<>(1,2,9));' V6 \$ F" q, x" O
4 i' r4 X9 ]" \. M j3 D) E6 q data.add(new Tuple3<>(1,2,11)); r$ u/ p7 Y! o& m8 [) a
4 t5 l: h0 m1 _6 z
data.add(new Tuple3<>(1,2,13));- A" S n% A. Q: ~
- @6 z0 E9 |- W' u
! L) g; y" B$ Z8 X# \
# B' ?4 U! x9 }8 [/ x+ F; ]
. ]2 g9 k+ `" A2 k( n2 w8 S6 E# L3 \# E" u9 p& {. D3 t
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
0 \) I3 o+ [- L' Z5 u" b& q2 T0 f1 R" b5 l. F& t
[' s& x& b7 x
2 y8 }1 g/ d" O/ h) H o
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
t5 F* t7 X- E! F: s# ^, p
i8 X1 k( g' I7 N OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};9 D3 l) E& S4 @+ ]& Z7 y& Y" s$ ~+ Z% @
- v8 t. p% q! y+ p
: o7 E/ N; n! E, x. p- U9 E4 R# o( A& d: [% i
- D' h/ V' h! L6 p- e0 p6 `, Z, K) I2 C1 C
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {4 ~0 O# ~9 h( c- o1 _$ q. |4 U
: _1 A. C& ?3 U @Override
: a& S' w* L8 ]8 K, f# t9 n6 W, v$ U' N; b: g3 c1 |* m1 C! X
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {# R2 z; O. E: ^" r2 a# W
7 C2 O& J* E( {# n% g. a' n/ J
8 W# O! U: y* ~
$ `. N0 n. c$ o, `* f& z6 t' Q if (value.f0 == 0) {# e; d' k5 v& R" n6 U
) J( C0 ~5 Q: l" N9 L4 y ctx.output(zeroStream, value);8 T/ o }% L5 e3 ^/ n+ K! e
" b( q( j, w! p( B: Y } else if (value.f0 == 1) {( o6 L7 m1 g# L2 h- \
" b4 c1 {! h% ^+ U
ctx.output(oneStream, value);
* H7 Z% C2 F/ h/ L$ O
, l1 E' T) p( l H+ ^ }
/ Q" E# O9 t" e) G# h% V N/ i5 x ?
}" I9 c: ?9 d1 i- h8 B* n3 q6 q- w3 R) f
% w+ D% N4 Y# I, h$ x- s3 G
});
6 |, r- H( [+ {) j0 l$ l2 L# o1 D
9 B' i& V& i) O4 q/ l7 [
' f+ W4 \ m- m) g- u DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
& \- f$ \: E5 x0 w! [6 T8 J( M' {0 v& \6 U; \; k. ?
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
( M/ o4 p, F% F, o, D- A$ f5 }! t, y$ c
0 p: U1 Q k+ W2 m! O' D$ T1 P+ Z5 W
zeroSideOutput.print();7 f! t G. g, R9 h3 D4 Y" s9 b
! d1 z9 c3 W9 _; b9 ]* o! ~" A oneSideOutput.printToErr();: S4 ^8 X8 G' i* {. I( r2 ]* c
# F$ R8 T; l' i* j' M
. g7 r" j% o+ C! M( h+ `/ T
) t6 f, x' N& G( M( p3 d7 ]1 m2 y4 o# _
" J) s8 r7 s7 D$ R8 N
//打印结果
9 X6 S: a! Q: l
6 R, Y4 w5 ^: h8 U4 \% ]9 \ String jobName = "user defined streaming source";
2 G% b- M+ l4 V/ Y) H0 F& E8 R( T
env.execute(jobName);* s6 l5 m5 a$ ~
1 _* _" D/ @* `3 a7 C' t}
# j! H# k6 M, D0 Z) h9 V- k</code></pre>
$ Z; E+ ?( O6 y% o<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
D: I, t9 D1 G<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
+ x4 h7 ?0 B. Y$ d+ Q4 J<h3 id="总结">总结</h3>3 e8 Z) l$ v/ [
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>& s. h+ T, i' y' u0 r$ f
<blockquote>
* Y- R" k5 j3 g& |: t<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
& r7 d; w! j( @. Z* ]2 [) Q/ [</blockquote>, U1 t. `7 L1 H. Q9 `* }
; w/ E% M% ~" R- A |
|