|
|
4 H, v! `: Q6 K/ l7 |: L/ `! l/ o. k
<h4 id="flink系列文章">Flink系列文章</h4>/ k9 w4 ~ \2 o, O: ^9 `- G5 i( u
<ol>
9 o3 { O( X' p: \& W8 q8 T<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>; b6 J# L$ I! U# E) a# Q
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
Z5 b9 L6 @4 ~: T3 K<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>% ~: I" k, A0 \* h3 u6 s
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
/ s1 m" n! @4 K. x& h7 Q<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
) V% u( W7 X" I4 @, H! n# O<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>$ ?! }; c+ X- S" I6 ?! b) T6 O
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>( V2 H7 c/ V, _
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
! F% o$ S6 B( u/ c( X8 |<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>/ ]$ v3 i+ _5 \5 H+ g' Y
</ol>
$ a5 L9 q4 S/ l, p- P; y! S<blockquote>
& N! J; N, z. |# x) X2 }6 B! ^<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
: E, D. }2 c* n) H: ~8 X</blockquote>
& J x& H" I, d0 \' a8 X% q<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
! @% ?" z q7 }% v5 q) @! ^9 w<h3 id="分流场景">分流场景</h3>
, ~$ w* y6 t; L( O<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
: H: D( s/ V G3 k! @8 X; y<h3 id="分流的方法">分流的方法</h3>- i w8 c7 h. h, x: y5 i6 U
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
! n& {% U* _- w; _1 M<h4 id="filter-分流">Filter 分流</h4>- v/ _/ W6 T1 l/ V3 {, t9 |
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
2 C, }! I5 {6 D0 D0 ?, s# _<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>( ^8 C" ~+ m6 b) e- A1 c
<p>来看下面的例子:</p>; v1 Y' Q$ \8 D! _0 R) R8 v* u% i
<p>复制代码</p>
5 p2 R/ U8 u4 A6 z3 k( e* S- Q, X<pre><code class="language-java">public static void main(String[] args) throws Exception {
8 K/ a0 S4 c" p* y StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
8 @. R# d7 V8 \- u9 ` //获取数据源2 p8 X* Y. z% L, n1 x
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();# ]& }9 M3 o" R# \7 m
data.add(new Tuple3<>(0,1,0));( b" z" {' X( Q
data.add(new Tuple3<>(0,1,1));
! B; J6 R0 I2 E4 @$ `" O data.add(new Tuple3<>(0,2,2));
2 t y1 [; Y$ S1 X$ C$ [! Q data.add(new Tuple3<>(0,1,3));; _6 |5 X" p9 y9 ?: q& r
data.add(new Tuple3<>(1,2,5));
+ S3 s* R- G1 I data.add(new Tuple3<>(1,2,9));
% L4 n/ m) N' [: Y% q$ C data.add(new Tuple3<>(1,2,11));8 W# c2 F) ]2 N8 C \) d! ^
data.add(new Tuple3<>(1,2,13));9 R: K0 w# v' b! u* K% j- e
4 @) Q0 h/ a, n) j. `+ s7 P, o DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
1 A1 O, A3 f! c7 R' H5 o1 N8 W! v- t) W. V6 i
4 {( ?9 h+ ~( h1 F
/ B* K9 b- v: l1 x4 z. v SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);2 E) K- \3 N- M7 r
8 o3 I% p8 r. J SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
( \/ W9 H2 z. r
; `: O/ ^) A: l. R3 P5 e
$ U' j' R$ s0 q- z
1 {" ?- A7 S- G: O& {& @7 f9 V9 t% Z zeroStream.print();
/ c2 C- z0 ~& b- A" h3 B: C3 r& w# a: k
oneStream.printToErr();* ? R/ `% t2 {" F. \1 G
9 H5 ~6 @: I: g
$ D: r) h- }/ `5 o" P/ W! y- A( b) Q! n y9 s
7 ]% L3 n. s# J" H7 f
& q/ S5 u+ G$ q' ~% V //打印结果! U/ z4 ]' U2 U/ P4 t6 W! n& e! |9 K3 z% X
4 E% _: ?2 O; R/ M# T2 W5 S9 w String jobName = "user defined streaming source";
' E' ~7 d' f$ M- \, p. i
: s8 H) U( v5 T# C. v/ ^* l$ F env.execute(jobName);
* C+ @9 S/ ?& C2 ~! ^- _4 F8 p
. s0 Y) x7 Z [+ J. N5 J}
! y1 _8 u7 \$ U9 L r: _; R; Q</code></pre>
& K$ s3 t( @6 G<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
y2 ?1 K1 N: ]+ ~& ^<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>' z, @) g+ {- d: a4 h, a2 I9 }/ ~* B
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
* \: Y: M. z% o6 U8 d: h<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>: v6 i, K5 U1 K' V4 @, k1 ?
<h4 id="split-分流">Split 分流</h4>/ c& e* O/ \# ~" _+ Z# m4 C
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>6 Y9 ]' M% q1 c' E& ]5 `6 h# h
<p>我们来看下面的例子:</p>
3 S4 Y; X* A, E: Z; N- y$ m- ~<p>复制代码</p>- o A4 U2 z* u% J
<pre><code class="language-java">public static void main(String[] args) throws Exception {% q, T) r- \0 i; Z" S
: C$ f$ N9 G+ O8 u4 O6 T
5 P+ E0 I; w7 X! R
# B! C9 X& W! U( Q4 e StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
X9 U3 o2 Z: }3 j( a+ G8 z; N2 b0 K. p
//获取数据源
# N9 S3 u$ X- s) X" d& X: D% }% H. U
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
" ]7 `3 H. e2 ^$ M( @* U
$ a, l# S% d d' N& S2 J7 M+ Y- R data.add(new Tuple3<>(0,1,0));
q2 j( {2 p9 O' A" _% g" }# g" ~+ M* R0 _7 k* T) m) m
data.add(new Tuple3<>(0,1,1));
8 ]9 V2 c0 P% q9 r( \3 \4 b! g" C5 s- V* K/ @1 A: [7 G0 t% u
data.add(new Tuple3<>(0,2,2));$ T- f% m0 o1 }) B6 x$ y
5 a, W$ s% V! N6 U$ G data.add(new Tuple3<>(0,1,3));
$ K# v8 ^$ V v) X* F3 N/ y1 ?3 m+ R# P; }6 o
data.add(new Tuple3<>(1,2,5));2 v @5 P+ u5 G9 X j" h
2 o! L6 P" G, S' [" g p data.add(new Tuple3<>(1,2,9));- A& G6 R j/ _3 {
: w- X7 N) A* q9 }9 e, F, T
data.add(new Tuple3<>(1,2,11));$ Y1 R" U/ r& W1 Q4 H& W) W
. R6 ?- t6 z* {; s x
data.add(new Tuple3<>(1,2,13));- K* p) M3 j, p" c% w
& ]: `' d: t- y( L3 K0 x
4 `" f4 p+ w# K; L5 V: M6 H' Y: Z; |2 _7 M* \
3 i6 Z! J. l' B5 t- _& }4 X: m- A
$ n# r4 w, Z( V3 q! k @3 Y% \ DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
/ p& k6 \- q/ G/ ^
3 r$ ?$ h* L& G. |2 ~) C g5 H5 u- U |( H* s: n* f# L
* \* B+ H# i9 j2 W5 [, L' p) R) W6 \ \
- }' a8 _9 i/ ?0 W9 m3 u( s( V2 G* c/ s& X
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
; N' G4 z/ ^0 e9 A
3 J5 V& a4 v- F* ~ @Override
% j8 t" I" ?0 K- ]4 m! t
. {# T! U0 u U2 e public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {6 J' w% j" q2 h) K% S! f, g
4 V9 Z D' q3 B
List<String> tags = new ArrayList<>();
% }5 s3 q* ^. }& j( l0 H4 l% Z& o6 E$ e% V' b
if (value.f0 == 0) {
# F# p6 Y) [# z* C8 _9 @( T# G$ f. `1 ?9 [) z, \
tags.add("zeroStream");& V1 x- W. ]) [+ S5 e
$ Q0 B) _% G+ N1 X* R } else if (value.f0 == 1) {) c+ s, d0 q0 P
) M& z8 V) T4 j3 _. o8 q tags.add("oneStream");" a. ?" w7 W) Q0 @2 H
0 N' m+ w' H3 P W# g# H. B }) t* W& {5 `9 G- g/ k
. r. t1 w7 Y+ j) b' Z6 [' x
return tags;
% b4 `' q9 U3 e- s5 O2 }& K2 D# C9 J# m
}
+ q6 R4 H( W: }: Z
1 n0 p/ C+ ], g, B% ?1 u });4 ]6 t6 b! A4 {
& _1 I, ~6 Z3 D/ ~0 L7 V: l, r" o3 w
x2 b9 |1 f$ K0 r7 y
, c0 M4 Y) U4 O. r# Q( g splitStream.select("zeroStream").print();# b( X* g# x, n9 }9 t" \% \
( X. z+ e( c! _% z! v splitStream.select("oneStream").printToErr();
2 M w* E" E0 f' d: G; l
3 r3 w, j0 _7 U F& d2 Q/ r4 G( w, T! V$ O c& ^" n
% [+ V( k, B4 ~8 |2 B, [
//打印结果
b! i. P3 Q# t# m% A! i& e& ]' G# L( {
String jobName = "user defined streaming source";
/ E. E7 H5 f2 H- z! I6 V8 C2 T2 J6 D9 J0 N4 ^
env.execute(jobName);* y! ?$ S- b: s2 `
0 g. o6 k/ b! R0 `}: ~: d1 |2 U4 o# d m
</code></pre>
1 `9 ]5 t# x2 H+ |" t2 g<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>) h8 i$ D/ {& Y) [ }
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>1 |9 X; ~& y f- ], L
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>! n8 ^/ H, Z+ ]" P' T3 [$ E9 \
<p>复制代码</p>
1 M0 w& u0 w6 n3 a<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.2 O3 Z0 K/ @+ T" h* C. F" C/ ]
</code></pre>; ?: q1 T. y, G# R1 T* l9 L/ G$ _
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>+ T" g6 K/ r+ ?) B4 J( R
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>, o! g w H+ O$ k& T2 @
<h4 id="sideoutput-分流">SideOutPut 分流</h4>0 p4 `. H& n9 i2 y l ?- `. I
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>/ q: r) N, r) _9 d- Q. f+ E4 ^. V+ F* k
<ul>
2 P7 t0 u1 u9 W6 C$ f8 V<li>定义 OutputTag</li>
( }* F5 W3 X( j- C7 D<li>调用特定函数进行数据拆分8 c, a4 O! p! u B+ h6 O1 v+ u' Q
<ul>
! t( Z* M$ C3 _( _( r$ U<li>ProcessFunction</li>
6 n, s; |9 n: d: K9 v* L1 z& C<li>KeyedProcessFunction</li>
% a' ?# n+ F9 H' Z<li>CoProcessFunction</li>
: ]( F7 M& F/ ^<li>KeyedCoProcessFunction</li>" d, Y6 \ G; e: I# c4 V
<li>ProcessWindowFunction</li># t. t0 A% l3 r. s( @% e
<li>ProcessAllWindowFunction</li>- S+ }9 r+ m6 U) j7 b
</ul>$ t! ?+ ]6 B- Z8 s& n' m
</li>1 @! s6 U7 D s
</ul>
' d7 z, R- y: C+ h2 R% D" i<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
$ H |: V) M/ Y6 R<p>复制代码</p>! m( G2 n; [% {3 Q+ H6 V+ T
<pre><code class="language-java">public static void main(String[] args) throws Exception {
, ~ g) `1 ~# `0 V, F8 p) d
: E; o, j8 X' Z: A9 |8 @8 g; L4 y2 J) K: {/ G
% |& h, g9 Y- w StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();, ~4 X7 [8 [7 y
k/ m0 L: M8 b5 R: p. ]# [2 J! p
//获取数据源
9 ]( Z; t i9 h' ?! h0 N
! S( [& d0 ^" l2 G5 Y+ M+ t List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();/ j7 [; p& {1 H2 P+ B
( N; ~; F: ~& M7 ` data.add(new Tuple3<>(0,1,0));8 a3 e* S: J- ?) C2 D
9 l7 r6 B# l( x; E9 p6 c7 ~
data.add(new Tuple3<>(0,1,1));/ s0 t5 ?* a0 j8 T) j1 H/ Y- Y
2 r8 X6 A9 `1 B" X
data.add(new Tuple3<>(0,2,2));, ?5 l2 @ ?" t( d- H5 g6 x& X. d
6 `6 ?% \9 A/ @8 L1 p4 u. B data.add(new Tuple3<>(0,1,3));
& ?8 I" ?! j4 D) A6 a( e. G1 t3 R& f- @
data.add(new Tuple3<>(1,2,5));* t" q0 W/ n/ d& {8 d: [& [
9 N* i( i- y; f% {2 k
data.add(new Tuple3<>(1,2,9));: n: O8 y; Q X( y0 ^, v; c8 Y
# P' ^& |. y6 P2 f3 v# C data.add(new Tuple3<>(1,2,11));
6 i5 G) P3 d/ m/ W( j y( V* c6 @6 Z) v0 m) }; x
data.add(new Tuple3<>(1,2,13));
. ?1 e Z* D# O7 r5 _6 V# |
+ p9 J s, p1 W" V2 _: I* d
; v3 }. D5 Y# M+ R; `* @7 [! p$ C; X+ U- o/ d. m
1 U3 g9 ~" r3 J+ J: o" V3 _. O: J! ]
* L# y$ g- u( f- u
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);8 d7 p9 B1 S6 C* U' H1 Y# \
& J! g2 l1 D0 D. \5 p: @" Y! t& J' z# s; E5 s* J- L ~3 W- l/ R
% Q8 |3 L8 k* G6 K4 _6 F6 F
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};- R0 U x3 g$ Q8 R9 O3 k4 K
8 h0 Z/ z1 I* y1 |
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};) t( O9 v# y+ G( ]
W5 H% s4 s6 x6 J8 |
1 k% p# {; ^7 n8 H. | @6 Y
: A/ B7 `. I, J* h; t! m
5 H% f5 b) {8 |
$ e3 T6 B$ I, U% Y% B) ? SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {( v+ x; `2 k/ b/ A& Z
: n* [. k; r& i7 g f, q
@Override
' Q" w) u& e$ Y5 V
" @2 {, @9 ^6 \- o public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
2 [. {, d' z$ x* [6 I, R Q" d1 W3 q' a" q' j$ d
$ w: m2 q! @( C6 D0 E
6 I/ H; o' T* H! w if (value.f0 == 0) {+ h0 u3 s; [- r
( [* }4 w, i/ J! I2 B7 S ctx.output(zeroStream, value);
& y$ C* G6 ~3 N$ t
& `$ N; Q* K) ^7 m7 y6 p3 i0 Q- U. @7 l } else if (value.f0 == 1) {( {* z- o/ }$ s
( v) r8 L5 P7 h/ H, W! \ ctx.output(oneStream, value);
& x! m5 ^9 z( a! l" \. [3 Z. R3 K) y
}
8 a8 R+ J) B) I, c: ?, x
( h+ G" ]( O$ N" m9 b6 Z }3 s! e+ o2 C0 J7 s2 F' D
& r# @- D g' [4 a) k! N. G7 S
});
$ s# i F n1 F5 L1 Z: B; @
% S9 y, l v; I Y
3 V9 r. c3 v a- |5 S% A7 l" I- a9 e( z2 D: h" D, Y! i- M
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);" i6 w* C. F3 K0 V6 C" ]5 H
+ L" I+ m8 T: y/ g. F X+ F
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);% h% `! e; o' M, B! X% T9 q
$ ~4 Z3 v+ B, `8 X; I
g' E6 d& ?0 S) y. e: [& q; J: F5 F/ D/ ~
zeroSideOutput.print();9 {# a1 [3 e- r4 N/ U
0 J2 u2 @# X7 {: v
oneSideOutput.printToErr();0 U4 c. x' ^" e9 m C* _4 [
3 n7 N3 {* V' b2 B/ E
2 ? R {8 E: D! h1 Q4 L9 o0 p
5 C8 ~& `; B0 P6 q" W" L* B+ u5 K/ N: G7 F$ I3 d' {3 t! ?, U
9 l# I4 U9 v; ^. h1 h" K4 s //打印结果; U W) B# Z# Q; |. Z
& L1 q2 S1 G$ J9 @- s) M% y
String jobName = "user defined streaming source";7 M! z$ I% e, z, z$ U5 E
* y8 e* w$ b, ^% b% U: ~ S
env.execute(jobName);+ B6 m# j3 s1 h% t, X
( J& q; |, H ?. p}
& |" k) Z, L- t</code></pre>
8 h, Z( g3 z, T- G) }<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
9 @! l: ^; a) ~1 y<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
7 d5 C- n( }$ U! |1 s( w<h3 id="总结">总结</h3>
/ l. E& @: i! D# ^3 U# f: U/ L<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
* c7 a7 L5 h$ k7 c8 {<blockquote>
2 @0 Q- M1 k+ T/ B/ ~+ ]) |' S<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
: S2 \+ k B. o2 W) [" n</blockquote>
7 i; g3 M% m+ P+ ~( T# ?3 s1 Y9 t) I! @" U
|
|