|
|
/ e9 K9 p5 G& Y
<h4 id="flink系列文章">Flink系列文章</h4>* v6 G' |6 _0 @! T: o
<ol>/ O) y; l, T* W# g2 k
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
% T+ ]" k, a, E# _. g4 d<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>' j( X: {/ L. J. J1 I( r
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>1 z9 z3 U! u f6 `3 d
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>0 i% f# V1 P9 o2 @1 I2 j
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
6 J3 u! L7 v3 Q( J) M$ q<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
3 T5 s9 `0 @7 F$ |9 {" I<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>" O7 p O8 Q6 y8 g: W2 A3 w5 Q
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>, M6 i9 s% Z1 F: n4 w' s3 k
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>: f8 e4 f% ]: \0 _2 V# a- ^
</ol>
' t. h0 q% ]6 i: m5 o$ P! d<blockquote>
+ W- g, D) m! e9 e' E; j<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>% L9 d4 Z9 n* t2 R
</blockquote>1 w/ n7 V2 d" L+ Z
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
/ G* b( e5 Y0 j<h3 id="分流场景">分流场景</h3>' g$ S7 }% A9 N- }
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
$ L7 I# f% E1 u) `: w<h3 id="分流的方法">分流的方法</h3>! y+ F, p9 P N7 P5 ~- L
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>' x4 T' @( S5 d' V N) A+ m8 J$ {- ?& ]
<h4 id="filter-分流">Filter 分流</h4>
) _* u2 A, D8 H( H<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>' I0 L# \" T+ |% Z# a* k
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>% G; m3 a. e, M; c# W: V
<p>来看下面的例子:</p>( M! f' w( B* ?2 e
<p>复制代码</p>. z3 t6 a# R- N9 p+ @ l. w
<pre><code class="language-java">public static void main(String[] args) throws Exception {# R0 G) l6 ^) d: R
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
, ]* U' P6 ^% J- g //获取数据源
/ ^; [4 a$ s. p6 R! E- W, i% Z List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
& }& _" X" L {" ^" t! L- f9 {* s data.add(new Tuple3<>(0,1,0));
( _8 g2 f) W8 n- [. T data.add(new Tuple3<>(0,1,1));3 ~4 r+ `) q% O8 x+ }- w
data.add(new Tuple3<>(0,2,2));$ u. m& ^/ H/ h
data.add(new Tuple3<>(0,1,3));/ a" y1 ]: @; I& e/ t
data.add(new Tuple3<>(1,2,5));
, r6 l, z5 n) U1 L data.add(new Tuple3<>(1,2,9));
& c* V( @$ P+ Q! C9 ?2 ^- a, M data.add(new Tuple3<>(1,2,11));
% d7 O0 P/ Q* _ p( @ data.add(new Tuple3<>(1,2,13));% P/ V9 P l" F5 L6 n
! _. e4 j" ^6 [ B
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
! c9 o. e" y6 F3 d9 o
6 J/ N) ^8 p4 R# z2 p: _7 {4 N/ d; g5 F, K& w. g; F
! K2 V% A) ?+ G' v# U8 K
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);8 ]: `2 g/ o% {& C
8 ]1 S7 R8 E- e4 H0 q; a
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);6 g8 j- S6 W6 d5 w( L2 u! h4 J' S7 H
' @# P( ?1 i, b+ |; _" Y* h- t$ f% P( O1 ?- E/ `& n: O
5 z9 A i3 V" }5 ?0 q( y zeroStream.print();
( f7 Z, @0 d" _2 g; E
: N5 S0 M& w' z oneStream.printToErr();
# V, C0 n1 |/ F1 u
. T( o; q! M1 z. X0 f6 Z y1 t, P' H
3 v, k" H! U( N/ o3 _' p
" ?3 C( o5 E0 a9 A0 V
3 m* {8 B; [; R8 u( w
//打印结果3 U# w. k9 k9 {! I
$ h0 h5 R5 k5 }1 X String jobName = "user defined streaming source";& s, y6 i# T, E1 d& w) y* i X% |
, t; ]. x6 ]# x% U env.execute(jobName);9 x. \9 z! y }: j, c( D0 [2 Y& c! K
1 m+ s C1 y% M7 l: g; ^! s
}
! R1 M6 w( Y# ], @; t</code></pre>4 E, A3 N4 p2 @! V4 ]. c) u& J- D
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>4 c# r6 W8 G1 H, i1 E
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
@& Q0 o6 }5 M! g' m<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p># R$ b0 I2 c# P I
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>& {9 t/ \" M7 {! U+ G# `
<h4 id="split-分流">Split 分流</h4>5 D1 [. o# ]" m( @3 H8 a. n
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
! ] l% `' [" d2 c* X2 R<p>我们来看下面的例子:</p>; T' ?# n& y2 Z+ e
<p>复制代码</p>
5 b. v7 {6 f6 M! v<pre><code class="language-java">public static void main(String[] args) throws Exception {" k3 y( D3 c! x/ W! b: o6 s2 l
8 P% q8 n/ j& T8 [
6 @* A" c0 @" A" O7 s7 q: I7 ^3 y
* d e4 ~, F* v StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();: x9 `- f# V' ]
/ T* A3 k) Y3 D, R
//获取数据源2 }8 @6 f7 R3 y& ?) n& N e* X
3 M+ Z! k3 w! V6 t, S/ i0 U. ?' | List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();6 K( W# p" u7 n: z$ u. W- |% _
$ S c. z) p2 Y2 e; u
data.add(new Tuple3<>(0,1,0));
& N* k$ Z9 `) u
" `* D% U4 a. U6 V, m6 D4 s data.add(new Tuple3<>(0,1,1));
1 Q* W) @- ?" q# s* c5 T* c; \. Z5 y. p# j! t5 l3 T7 m
data.add(new Tuple3<>(0,2,2));5 I- j9 W$ `: l. o4 t
# Q/ H1 y! m3 ~" J3 R r data.add(new Tuple3<>(0,1,3));
# p8 v" U6 |7 J8 Y/ t1 r% S! G
2 k: J$ @: I6 n; {0 ? data.add(new Tuple3<>(1,2,5));
0 H* e3 p# @7 ]8 Q& l2 {! F2 `; ?$ Z5 g5 q* Y
data.add(new Tuple3<>(1,2,9));. O& ~; M2 ]1 f: n8 P; v
# n) x/ n) C; v% `* R
data.add(new Tuple3<>(1,2,11));
* D( s4 z9 c4 k: E' k8 Y6 F( h, ~# t
data.add(new Tuple3<>(1,2,13));
. `7 H7 a5 g% k+ f8 k
$ V8 b) B0 ~* J$ I! G0 h8 }5 H" F- I/ p9 Z5 S/ E
' X8 P9 k( f9 S }; H5 x
, b+ c5 q' K) W. E1 a, }+ v# j4 \: z1 [4 X$ M( ^
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);/ @4 w. X' G9 J
5 E* M3 O+ ]; a) {8 g, g; C. I" _
y" S% j/ W7 h+ t8 |4 f8 b& u" j8 l# ~8 W
6 ^, I& r3 I- P2 r9 l# W8 m. c3 ]3 q
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {$ }8 E6 p+ T8 z
& E9 w6 O+ K& G$ E4 [" D% l5 p @Override
+ C6 ~7 S* t2 Z; |
4 J7 q1 \! y' l) ^6 Z& N2 e# t public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {0 H0 s; k6 o' N. m; n3 L
; A, u) {" E- {
List<String> tags = new ArrayList<>();
0 J8 |7 W1 i) n. j3 u& ^4 R3 T' E+ ^9 \- `
if (value.f0 == 0) {
6 A$ x6 q5 T2 K+ }
, ~5 Y" }( y* H+ b' p8 p tags.add("zeroStream");
$ O+ n4 ?4 z1 s2 ^/ \' W9 h2 o3 Z$ P. u5 W: b4 R% s# Y
} else if (value.f0 == 1) {
- l7 B o" R$ E# `, {; W( L+ u4 m" O7 B5 ?5 A& `% J+ k
tags.add("oneStream");4 h' C( d4 M6 }
J. c/ X) N, s. D }
. F9 H3 E3 b6 s3 X0 g, T5 j7 o" M P9 v+ Z* f& z
return tags;
9 N6 B1 G" h/ U1 @
* H) d$ E1 d* r y% K) Z }' ^) u5 E- p* o
8 `5 j, z8 A* D5 \# M; V
});
- |4 L8 ^9 W0 L! \# H9 C
# M5 W& ]8 L9 j/ e" N, @
, h9 b4 T4 {. a) z) ^% m( |
3 C, C5 D) r: c2 _# t splitStream.select("zeroStream").print();- X: b0 T) r" P. j1 b7 C* G; \# t5 e# F
5 ]8 H5 G4 W. l( s3 n splitStream.select("oneStream").printToErr();
. ~/ r! T$ Q$ K" v5 I9 K5 L! r9 h% a$ N5 o3 ]( L$ E
9 \2 R% q' M. f5 E+ N6 ]0 i b. B
( ]. v' z1 P8 ~ //打印结果
: J% D/ j5 n7 u, }
% ~& M# z$ b+ `" \# c q" Y String jobName = "user defined streaming source";% M3 D! j' x5 H2 u/ W8 e/ c5 s$ g$ W3 o* K
1 s9 b: O/ |9 m- C9 P. c% a( q/ a
env.execute(jobName);+ Z$ H5 L3 R9 e& |( @+ ~
: a) d. I2 t& p# V7 V( a$ M
}
2 c6 b) W: e1 V; F7 K</code></pre>% o. e3 D1 [3 n8 r; Q
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>1 p( D! `' q8 u9 G6 e
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
& x6 `8 r( d; i9 z& `/ [<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
. t5 j+ k% g" p/ i1 ]% L<p>复制代码</p>
7 z A$ a |6 l4 c, g<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
1 t4 m) Y; A* { {8 k# Z</code></pre>1 `$ O) o9 x5 h3 C0 D* y
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>6 y/ o+ r' ]' C8 D9 J S9 q
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>' X: X- s# j9 g+ J: N& L" P
<h4 id="sideoutput-分流">SideOutPut 分流</h4>
# g( F! q; ]0 R* x! e7 x% C; w<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>" N4 I3 H% [0 V) }3 W! T- F, e; W
<ul>: V5 }8 W) w- d% r
<li>定义 OutputTag</li>; }' |6 Q9 c+ J( G7 r/ b1 T
<li>调用特定函数进行数据拆分0 w5 f+ y4 Y' @; v ]9 V/ `; z
<ul># B. \. C% D* f
<li>ProcessFunction</li>$ g n3 e" v+ v0 E& q) H# l8 c8 |
<li>KeyedProcessFunction</li>
! m" m1 b4 S" H" y<li>CoProcessFunction</li>
]' n) T H8 ~9 p% S<li>KeyedCoProcessFunction</li>/ y$ C: S- J# C" c/ @2 \
<li>ProcessWindowFunction</li>6 ?2 s5 ? g: o- h: _0 ]# T4 x; x
<li>ProcessAllWindowFunction</li>
4 N3 R! U! K6 O% }9 G+ w5 s</ul>
. [( Q% C" Z, W</li># M3 l# a0 C, V# w0 {
</ul>; h4 [8 e# |2 @! ^2 J
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
9 E# j' y: N* U% ~4 R2 e8 `4 T/ W<p>复制代码</p>
8 W7 Y6 x9 \/ G5 ?<pre><code class="language-java">public static void main(String[] args) throws Exception {, V' _" J9 M! x7 y
' a9 b/ N& E0 k/ e' j) ^1 @4 m$ P: U. f8 n' a
3 U0 l2 g% ~+ g: h1 p) s4 _
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
( c# n$ C& ~& x; ?4 H- k6 I' O# b/ i2 m
//获取数据源
& b) @; S9 {+ Z+ d/ {+ _
" P+ O; G; Z! l6 W/ F- L' V List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();2 P0 o+ S. r# h* t4 b, l
9 Y1 t4 n F( s3 G; J" Z& X, m+ f; h data.add(new Tuple3<>(0,1,0));. H0 h- l- @9 F& _ Y) j' R p" S
) Y' Q# v$ K. I5 m/ \, t data.add(new Tuple3<>(0,1,1));5 m$ m7 S: m& \- j$ ~( G0 d0 }
: t; f9 F& @! O& B7 {: \3 C
data.add(new Tuple3<>(0,2,2));
q1 L9 `$ \6 W6 l- D1 `% I
$ p3 ?5 N" P) F! Z% Q3 v data.add(new Tuple3<>(0,1,3));
5 v! L8 p& i) T/ ?5 {* o* L7 ]- m. z1 t% k7 h" E' k% ~5 g
data.add(new Tuple3<>(1,2,5));" Z" c4 A* ?" }$ k5 w% X& k
: e8 S3 ]) F2 s% j/ V( \! [ data.add(new Tuple3<>(1,2,9));1 U! b& q; a6 l9 N) v
6 X' F) ]8 ~3 a+ {3 s
data.add(new Tuple3<>(1,2,11));1 q' [+ Y r! G3 w7 C" j1 p
* O7 W1 W6 [$ g, d1 }9 N8 y8 s data.add(new Tuple3<>(1,2,13));- H. o7 s- M- X# g. N
7 N% C" P/ K+ v( R' W9 S" W! v
7 R% G6 h0 i2 v, h, }
) U! H/ X& J5 \( L
0 T7 Y4 g9 i- { B: V7 T3 Y) L# V- L
# h0 C8 w. G6 i! t/ z DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);2 G m! R+ |# p1 G8 {3 i; N% N4 u. r7 n
- G# K( m& K4 m* b) U+ a# V
" ^' U6 l: p# D! G$ [
* u4 w* V5 O7 H' K! E. t OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};9 H0 j' N0 E: L M
" q3 ~# }( g' |* e, G9 Z4 E OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};+ k7 P# C5 F, y' M9 F/ F' I
8 M3 Q- V4 l# P! H: \. d+ D1 t7 _1 ?: e4 N) Q- z) q- D
6 f3 R# c3 \7 L. Q; H D
$ P$ I/ [# p5 o
9 x, K& R% t; v7 q$ y/ r SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {* V. s" c# I( ^' D6 L/ G
, L0 w9 t/ k) q3 d3 s @Override
$ R* G( O, S+ J* G/ j& j
! j2 F! Y# J" W" w public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {# q/ {$ l0 r" P
" X+ A: Q) g8 t4 z" D
* x! V. U9 L) e9 m+ ^3 f3 _% w+ M0 O9 I. e
if (value.f0 == 0) {# Z' K. E+ q5 o# x0 A
2 v/ f2 H+ y9 W: L1 w% Z ctx.output(zeroStream, value);
6 S% ~% \7 @1 v# s9 c! C
' K1 ^2 y3 b) x" j/ s } else if (value.f0 == 1) {
6 V& c% V& X5 P1 E" h2 o7 v# i4 Z' ~' j9 C
ctx.output(oneStream, value);& q" e z5 V4 l' s- H+ Z
5 n3 ]6 b" j4 a5 l5 B }5 i+ r# @8 u L/ I b! }
! |& c/ a D$ h6 ]- [$ f }/ u c2 D( C: F& x5 b# B0 c* q R
% S Q: U5 `) k r! X0 f });
; r: q/ j; h, H, {# ]+ p- _
# ~) {; B, O. D$ H: Y
( s# f3 s" f4 D/ y& u2 ~6 E/ Q p, _* `. k
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
. A) i( i" c* S- D; d3 l
. P2 }' t- k* A% J DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
. G. K# B1 p- G8 L) Q% l
7 W- R) c/ \. ~0 Y. F( N" ^/ T* @. [( s; d& k0 x% E
! d! s9 Z& }, Z) {. o: T; ] T
zeroSideOutput.print();) k( f9 ?$ g: ?1 R
! s p" t" ?" \, v6 P5 C, d oneSideOutput.printToErr();
4 r; f% g; S8 I' Q4 q" W) R" D) a% G; f8 R0 ~9 |
/ j8 |; t' [$ \/ p4 G1 {' \- l- b: E( R5 S; x1 a
: N9 w+ D5 b3 t5 y. E
6 ?2 w1 m# c- M+ w
//打印结果0 _( q. G8 p+ \$ i0 l+ h
/ I9 M# ^" y e, B) N String jobName = "user defined streaming source";7 ?$ D3 U' g! n4 z+ P
8 f9 E" B2 i) }( p9 ^! ?3 o
env.execute(jobName);
( l+ S% W; t: `" s0 v( Z1 b
" a6 b- S, c4 W* }- T1 K6 ~7 F}$ w: a9 T6 F% t0 j. Z
</code></pre>( y! F, ?) Q2 S4 ]0 H" I
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>! E% A7 F2 b9 Q9 K7 s
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p> O5 @ e) c ?# c
<h3 id="总结">总结</h3>
) }5 P/ O( k3 G8 _<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>/ O2 S; I' W$ L) T) [! V" `
<blockquote>
9 B9 S1 [% Q- _. g3 h<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>2 h8 B) m) Z$ Q( u% T
</blockquote>
0 q( X3 U5 u& U% b
. b$ Q# \4 T1 j, D1 A9 Z |
|