|
|
3 R& n( K: v9 Q2 \* M<h4 id="flink系列文章">Flink系列文章</h4>) y1 K/ G8 I# X$ A6 N* }
<ol>) x V# A7 q& f0 ?, x
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
1 a( P/ B: b: t) J9 H5 ~- i0 b<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
2 q N+ W; c f/ H! X2 J% F! C: T<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>, c+ e2 t4 E- G$ Y1 T5 N
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
" l( Z; b0 _( l' n<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>/ ?) o+ n# L1 J h) G
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>! E! K% d7 S" k2 m. M
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
+ |% O5 |- X1 u- H% r% s<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li># ` p, {6 N# q+ q0 t% [ ]9 x
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>" {6 H$ u/ D/ O* R" _2 C3 W
</ol>% z9 G5 u0 L, L% [# G- Q: b
<blockquote>
+ V! G6 T7 K* X* n2 h+ u( K* k& p<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
; i2 H& i& u- O) F1 [3 ?</blockquote>; I+ X/ Z _' ^6 c0 w5 y
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
2 t+ p6 C" m$ o+ ]+ }# }5 ?. ?<h3 id="分流场景">分流场景</h3>
5 m( Y% r" O8 v/ k1 c7 ~4 E3 q! [, Z<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>2 a! m" R) a8 J; B) T+ h
<h3 id="分流的方法">分流的方法</h3>
/ L7 t* z$ ^( i9 `! x" N<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>/ f* Q/ v- ?4 s$ o* G
<h4 id="filter-分流">Filter 分流</h4>9 D A- @; ]. E3 s
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>8 V" e0 S0 w8 H# f0 ^' D& c3 ~& ]
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>2 h2 S' R$ t5 e3 u- _
<p>来看下面的例子:</p>
2 F T9 d0 \0 ]; X' k1 d<p>复制代码</p>7 A i# m! O1 R" K, i
<pre><code class="language-java">public static void main(String[] args) throws Exception {9 W* P1 y' l0 p
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();2 r: Z6 \8 I3 X; n
//获取数据源
) y- A) }+ ?, z/ q% j List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();% G: e- s8 d% f h g/ g5 v
data.add(new Tuple3<>(0,1,0));
: a) y/ C, ?/ k. ?1 L data.add(new Tuple3<>(0,1,1));" g9 w) ?( Q' N6 h% d% m y
data.add(new Tuple3<>(0,2,2));
" `! i& V/ }, ~ data.add(new Tuple3<>(0,1,3));8 x8 c5 f: a C; R2 c2 ]
data.add(new Tuple3<>(1,2,5));) S; a4 X* ]- y
data.add(new Tuple3<>(1,2,9));
, P' G# Y' M0 o2 h% v" n* V data.add(new Tuple3<>(1,2,11)); ?. u* C% ~6 ^+ A# @' f
data.add(new Tuple3<>(1,2,13));
4 Q, L3 s6 l7 a, u' S3 h: s" J
+ o+ u* R! D# k4 {6 a6 k2 ~ DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
- Y5 `! k# r0 J4 l! S( _1 O6 O( _2 A' r
2 K( n6 B. i+ q+ K7 H& l" \' N
9 n1 ~7 |; ?) F% l+ m* L- v SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);: w. ~& X# `& r* N, V. W$ W
) |' o* f9 J; r! q SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);1 s5 ]5 \+ U) \0 P$ e3 W" h3 {: \) [
& d* Q% M1 T- c. ^+ v4 X# @
$ K* S. r3 F% b; Z* _7 L
" ?) g I, @# {5 a. ~ zeroStream.print();; T- S$ d) b( i9 k7 ^
& p/ t1 z, ~( a8 x7 k# W oneStream.printToErr();
( C0 G2 n3 b1 Y" l1 Y
" N# E1 b5 M1 @' i0 X& B3 F9 V/ z( p
( \4 R& ]/ \! f+ X1 ~+ |+ z: p' R1 W; J' `3 x, p
1 i ]( f2 a# q: }- Q) y1 J! F7 w0 J
//打印结果
4 o+ u6 a+ S3 }' V0 Y8 `. ?4 G6 ? U2 L$ Y) e# Y8 P4 Z3 s/ s
String jobName = "user defined streaming source";' H1 m/ x% D% y0 o1 |0 k
4 X0 m( D1 K. f1 o env.execute(jobName);/ u' N0 p5 D* s1 R
2 h" t4 f- O# n5 n
}2 C* N& u0 d, d4 {0 E) y% X0 i
</code></pre>
! Z7 [( q8 ]+ F5 L9 P<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>, h7 l, W; i' w( f" y
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
/ x/ Q1 y" ~' n1 {6 ?) g<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
6 b# |: y5 I# N: Z<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>4 y* m# f& ~- P6 D% f- M4 c8 s% {4 M
<h4 id="split-分流">Split 分流</h4>
0 x" q1 Z5 w3 i" f3 O+ ~<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>4 f! z0 { V0 ?
<p>我们来看下面的例子:</p>
$ w; z( L# ^. \/ b# s3 z<p>复制代码</p>+ f! `8 D$ g; X* W
<pre><code class="language-java">public static void main(String[] args) throws Exception {4 Y% Y+ o# S+ L! q1 H0 {0 O
, m$ Z" e; V/ g% o/ t: w- d4 j1 b, N! q4 N$ o9 h9 X7 U3 [% k6 ?$ P7 }
# S, ]+ B# F0 b B# Z. O
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();0 m* h& J8 m( F
% b7 g" s/ a7 a7 `% W //获取数据源
U0 ~6 Y$ j( v5 O9 _
- j- x4 M. ^& c5 }1 }! ? List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();( M4 ^2 q2 g, X9 F# q+ ~
0 O2 v% e3 o. I( u! T" d) K% J1 N data.add(new Tuple3<>(0,1,0));; @. K) G* T) W8 k3 t
8 }# m! m- }. i+ L k) }) R
data.add(new Tuple3<>(0,1,1));
+ Q+ f( ~* b7 n" x; Z. C
9 v+ o- R* V* m0 c" |# D- g1 d data.add(new Tuple3<>(0,2,2));
) N8 s) x! f6 Z: h1 D$ T; i3 F! `- _# b4 Z+ \( b
data.add(new Tuple3<>(0,1,3));
w, t% N- n+ E5 ~4 S4 R) t; w" s
data.add(new Tuple3<>(1,2,5));
* X7 k6 z0 P5 B% @
q+ @8 q( ~6 |) W h( Y7 ]9 _8 { data.add(new Tuple3<>(1,2,9));7 q$ D- I- a: ` Q* r" L% S
2 T' l+ I+ X5 _. H; H1 G/ A. P data.add(new Tuple3<>(1,2,11));: H7 c A2 m3 ?% _
- z, {7 C2 s- J: s* M/ e. ^3 Q data.add(new Tuple3<>(1,2,13)); M2 {& o( J4 Y$ z7 u4 h& p. w( K
: h' ]: l$ z |: F# p3 ]0 l8 C5 F$ \+ m4 C+ \* O
$ J8 J& P9 k& Q# ?5 m: P
0 m5 l! u+ B# y' q- ~" ?/ c
. e$ o2 D9 _: ^. g# a DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);3 J% q* h$ w! u: z$ `3 m: W
4 P/ d/ q Q; F/ h) W
- D- S A: h3 |8 ~5 @" T% f: \* a4 k; q U0 l# z0 P
% K+ [* [5 [* e, f
4 y4 R; j" @9 ?, v$ x SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {2 d1 \0 B! d( ^( d( o
% s- s8 d2 u9 K, Y @Override
; \. f* _. P5 G2 z
/ ?6 q, l0 X: } public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {0 k" I- F9 L+ P7 [* `
- Z4 p* v! h( ~8 ]- b) a( w List<String> tags = new ArrayList<>();
: K. l1 o' J4 x' E# A( L9 B6 s" i. F2 T1 R: [
if (value.f0 == 0) {! H5 B% ~5 h3 S" P
2 v$ \, P9 P2 v5 Z9 W
tags.add("zeroStream");
6 k5 _, _ Q, a u9 B8 X6 s o r* |- e
} else if (value.f0 == 1) {
4 p4 c# P& }* b/ x) h- e
1 w' H6 U& z! s* \; Y9 A# B tags.add("oneStream");* }* w3 n; ]2 h
( A1 _ b/ z( Q }. n4 I8 r4 g$ {* w
9 A- Q! y; T2 v: s* s
return tags;/ c7 ?' U$ F6 R0 z& \" {( C. X
" i) R3 Y' y5 {+ Z- C
} M) J& f& E+ ^0 D
0 B) f1 }# L$ F( P" p });! [* C2 X! ~5 c! f( A! s, \' v" _
, N/ G0 K& c- @0 W R/ W. e+ M1 D2 K3 N* d+ R
3 E) Y# d, H: |4 `
splitStream.select("zeroStream").print();
& Y: k5 k- W' U
+ F1 U% T- x& \* q. X5 U4 k splitStream.select("oneStream").printToErr();4 f8 b0 m0 M& g/ u2 F0 G, L
: d6 [9 K: A" |3 C, |$ p
5 Z" A$ Y- Y! x) N4 Z
) \; g0 b& _5 h0 h //打印结果7 v7 T j) @/ `2 c9 ?& E
) F2 {7 d- G. v4 g String jobName = "user defined streaming source";7 O( R* U6 ^0 J5 k! ?; ^2 r
5 \) _9 w4 n$ I& k7 k3 j env.execute(jobName);
" b, n1 W: c: o: X4 e& O
6 Y3 T% F5 A, e6 V |; u}
9 X9 f6 H* C9 h: W( B</code></pre>& q5 u j- S' u- }9 b" y9 J
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>2 E% Z* v/ D" I9 W
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
" M' o4 R7 I/ X, X/ F<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>& e5 r$ Z, G! ?+ w. N$ _, ~
<p>复制代码</p>
R* H& H4 K" f4 p<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.. d& K- B2 ?) B2 w, _0 Y
</code></pre>
5 q" K: U6 d* G1 C: o3 x<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
! A6 ^' r- w6 T% G/ B<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
% W+ X" J6 N% ?2 L<h4 id="sideoutput-分流">SideOutPut 分流</h4>
* ]6 A9 |% E) m# \( h! ?) [<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
+ o- \6 [) i. c' E8 D+ n<ul>
0 m B' e! I) F<li>定义 OutputTag</li>
% ]0 e' L% R, b. d9 Q$ `. x<li>调用特定函数进行数据拆分
. e G1 b2 s: q% o<ul>, [ v" ?6 U7 K2 J" }
<li>ProcessFunction</li>
9 |. X' e$ u# K M d4 h& v<li>KeyedProcessFunction</li>% b; O4 U l" G2 M) M8 H+ z; b
<li>CoProcessFunction</li>6 g/ x! }, ~; e4 I' m. t
<li>KeyedCoProcessFunction</li>0 c D7 J' |0 H. C n
<li>ProcessWindowFunction</li>7 J5 a8 p1 u) k$ h7 {
<li>ProcessAllWindowFunction</li>
z/ F7 [! q% W) L</ul>) x5 f! Z7 T3 E2 C0 Y1 ?, z
</li>
, p V, y3 D: n</ul>$ \0 l+ J, @1 I& L: O
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
; C2 ]6 _' Z4 T+ T<p>复制代码</p>2 i+ ~6 m5 }" ?
<pre><code class="language-java">public static void main(String[] args) throws Exception {9 M0 i. e6 R5 M4 r8 y) k
" w& z: V e$ x0 F: X! u0 B: d- L- X0 e# ]+ q& X. D
8 l1 y- k1 k9 B) s
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();0 G3 u- e' I0 I7 H
+ F" g0 x$ k" ^: ?2 v, A //获取数据源
& c N+ J& ]: T* O# |, U, r4 A4 g$ C5 I# M+ n# x) y
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
# d9 E7 I5 L+ ]6 R4 n, g0 t, E( v' {( F, |' c7 a1 Y! `
data.add(new Tuple3<>(0,1,0));6 A6 L$ {/ i) T: B5 g$ ~
4 c' g6 z5 I/ ~' \. s6 R O8 y
data.add(new Tuple3<>(0,1,1));0 ?* j4 V; \# s* y( a
* W- b+ F7 _1 y/ Z6 q0 S [5 b
data.add(new Tuple3<>(0,2,2));
) u$ ^( W5 c, s7 f9 q+ H; k, k% z7 Y& ^; N
data.add(new Tuple3<>(0,1,3));
* S% Y. L6 k) Q
! ~+ M( c. _2 _% b data.add(new Tuple3<>(1,2,5));! w( S1 P! k5 n" L" F
" d8 m3 I2 e; G% \" V$ V6 L data.add(new Tuple3<>(1,2,9));+ T3 f3 o$ O1 i
: H9 r/ h I, p, b3 R; Z' V. k
data.add(new Tuple3<>(1,2,11));! P, G* M4 c; W
" ?9 |8 R6 ?( G+ j2 w7 I! t% E
data.add(new Tuple3<>(1,2,13));
9 p5 R0 v4 M% O7 v
! G4 U( ^8 F4 j: T* h
& @8 c; @8 Y- b9 }9 Q, u
& Q. A3 g& @9 _& ~, f
% ~ \% [( H0 p4 L5 [! V
- ]7 m/ H% \) `6 A1 D% Q- N. F DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);& F6 i. I4 Z! T* v/ J8 _* `
) g9 c! Z; `% [, f' g
/ }% m. F, k! K9 Y" H/ l
$ T; l. g( Q# s/ G- a) S OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
/ a+ S3 l" g( h5 M& Z/ W8 e. c
% O) s8 V: Q' e; n; ~ OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};4 A4 e+ E6 r e& Q; L6 k- d8 D
! C! i; s+ Z# r* L8 L8 g6 c
; R6 x3 `/ t+ v, Z) A7 q5 [$ v; c' b' O" P z
{5 V6 P2 y# F3 a4 s. }/ ~4 v" |0 I
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
- t* Q6 T% m7 s2 i" l8 b1 W
; `: }* K* N2 K @Override
" | q2 X3 i+ k3 G; t' T5 s1 r' y& Y
: N! w7 @; r; U) k public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
3 R- }3 p B( X, E$ r9 A
: y( P; }5 Q6 I! U g8 H# e4 b" Q3 K8 X3 M
6 z J* G: Z* V9 A. Z
if (value.f0 == 0) {
' ^. u( S' D! a" O/ r( o5 @5 j1 g* |* B
ctx.output(zeroStream, value);" E! G$ e2 D/ e; e* h! t
$ I0 y. s) ^+ w1 A
} else if (value.f0 == 1) {
0 w: S; j/ t( P6 d- ^
0 I& x+ X! o1 g6 B0 w ctx.output(oneStream, value);
, y9 D. l# L9 c9 c6 u' @) j p2 Q7 W! \
}
" h5 ` u7 ?" h' K2 J" d5 E; Z6 D( B5 a3 o0 A1 b1 C
}
! f$ w, k9 s. b T' B. I: N6 ^5 {5 ~7 l
});
" H! u" p3 p0 H L* Y+ y; U% G, O
9 _/ N$ q( \ a: r" p4 _5 T6 }& l
- B, W q" i/ {, @' U0 I
- ]8 V2 Y6 o# N/ a( [ DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);8 Q0 |, _) @ a
; e0 s* y1 H |, W DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
A n* u* @" T# o B
, V! ?! P, M# g- B7 ~7 Q3 ?) J# y9 q- v+ ~3 H
r# C, f1 R. Q- ~- q zeroSideOutput.print();) q2 `, J2 i# `* k& P: n
6 P9 v- f/ A% A oneSideOutput.printToErr();% o0 F3 b. I4 ?/ S" ~7 ~
3 n# M1 @' V2 T' X2 ]5 ]: |
! j4 J" a0 H9 X4 g7 _1 ~5 y7 u
- g. K: t, s' q1 L% z, b
. K% ] S; I) y' }& a2 A
2 s% K# p- @8 B1 }* ]/ Q3 d( j6 r1 j7 B //打印结果
" E A0 _* F5 W
" z! i+ N7 S/ f. K: } String jobName = "user defined streaming source";
! e6 S ^/ p& W8 v4 _8 u' ?
8 m# n1 P1 l( d8 t env.execute(jobName);
3 A3 i) M1 f9 }9 [. ~1 w$ e7 k) M' w* h$ H
}+ W5 X; _' V# h7 X4 Z" W
</code></pre>
Q* A& u' L0 Q8 X+ Q1 D! p<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>; S- |9 O& w8 P5 X" s" Y6 b
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
% N6 a+ [* E! ~+ z# m- W E* H<h3 id="总结">总结</h3>
, O5 q9 S* b; Y<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>; x) i2 z0 I+ R' a5 U" p
<blockquote>. s5 n' ?7 ^" y2 w5 I
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>* Z+ T% z0 z% E: [. E$ n
</blockquote>
4 e' C" a/ a( F. b' T, N$ v& x# r
4 D! G! E1 r. u3 E! L |
|