|
|
; f0 v6 N4 {( w' G' K: N<h4 id="flink系列文章">Flink系列文章</h4>" a* j) |- S3 Q" H
<ol>
! s6 `7 b! ~4 @! J. N8 B) M<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
/ i1 R2 y4 ]7 y# x. w& e$ N<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li> d. U! Q M7 q0 w$ l
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
- m9 a. ]9 X* H( v9 l<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>1 ]8 U: m6 W& D' r- m' l8 K3 X
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
+ N2 _+ h. y: N8 M, X1 H$ J% u<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
7 S2 ?9 r: I* U% A( y4 `<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>9 }* M( V# j" M+ C |# E/ x" E# i# X) M
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>' c* f/ e% l/ `$ ~' E
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
5 t& b4 f l0 t* @2 {% I2 a* [' S</ol>
9 l0 S# U5 Q# D( Z<blockquote>
: l" ?. H; i* W- v8 P<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
" S5 W. E" g4 p: c</blockquote>
; W6 h' f2 B. K7 ?! R! ]<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
& ?" F0 l# a1 h) g; }/ W8 h<h3 id="分流场景">分流场景</h3>
+ e5 K+ B2 q( R- W) F0 A C" V<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
6 L. q3 J3 k, k+ b% c7 ^<h3 id="分流的方法">分流的方法</h3>" L; A8 N( f$ I9 Y( P
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>8 G$ r6 Z0 |& H2 M
<h4 id="filter-分流">Filter 分流</h4>0 S! x* K9 y/ [1 K
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
: `8 K* Q3 b- C9 s' l& y<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
* v( E+ q2 z5 \' X<p>来看下面的例子:</p>
9 H5 S& r% t' }2 N: M. g0 D<p>复制代码</p>, j+ e# p( f1 z' @
<pre><code class="language-java">public static void main(String[] args) throws Exception {
- t5 o1 E& D p* E$ I2 \ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();' i) V6 o" F! E2 F
//获取数据源9 l, j. i1 ~: @+ C
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
8 a, U( p8 |% q0 I4 j8 H. S' ` data.add(new Tuple3<>(0,1,0));
' `; T2 d/ E7 ]5 w' e! D data.add(new Tuple3<>(0,1,1));- F& R3 @6 N3 `) ?# v! T b/ u
data.add(new Tuple3<>(0,2,2));
2 Q( s. ]. L* z5 J7 B data.add(new Tuple3<>(0,1,3));3 D, @, d7 B9 o' L% {
data.add(new Tuple3<>(1,2,5));) n! C0 P0 u. }4 ?; K& c' z
data.add(new Tuple3<>(1,2,9));$ j/ N( I& y' T8 r u
data.add(new Tuple3<>(1,2,11));. `* ^( r- x5 f$ R
data.add(new Tuple3<>(1,2,13));
: x- ^6 Q4 F# m' i9 M" U
+ T, E# W( L% n! q, J2 V& L DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);1 t/ N7 l$ H% I. y& f) `- ?
; `$ G7 l( p! s) v$ v: m9 V
G" \. G3 {$ d( k6 k4 U' _7 E7 D4 P2 A( w! c( I) u0 X0 A
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);( I3 p$ M- A9 r& ~$ e
- F8 n0 j" ~1 C SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
8 T5 t0 k8 T% P' e' [
7 l! y6 Z0 D9 y9 B( l+ c# t6 Y/ E8 ]; r# [9 X
7 h8 s7 _6 F5 {( A# Q, V
zeroStream.print();
* x3 B- r* X9 B: w; B7 I! Z
+ _: W* G8 I* ]8 M6 D oneStream.printToErr();
' o- O$ h' ^' ?( Q7 l" h& X5 u; [
+ v! `* z: z1 T' Q( a9 \: z) p! D/ w( Y) _* |! F3 c4 t
8 P/ Q6 a$ i4 `- [2 {
' E& F$ Y* O5 @9 t J; Y; V* n `
2 B* O H. E( o4 t, }1 Y //打印结果
3 F% w5 F" p f4 W( H9 x. _4 i3 b5 |: Y n8 M4 c
String jobName = "user defined streaming source";0 n3 Q5 M2 W1 z. j$ e4 M) e8 P
% @0 | D; m5 B# o/ A) ]1 H t+ ` env.execute(jobName);
: Y: R' R) l) `" ]) L3 u% g( ]! _) ^) L2 F% d% l
}
6 f( F: t$ U% o9 D</code></pre>4 `/ o* \0 a3 |9 ~
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>' l# P1 ~9 `3 T! Z o
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
w1 N2 I$ Y# m7 m<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>: I; o: G0 s; t: Z% U ^9 Y- `
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>3 E5 q- m" A$ k9 G; L* V0 i
<h4 id="split-分流">Split 分流</h4># a, d1 V( ?1 v7 C
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
+ ~* ?& s" U e- |* ?* s9 x<p>我们来看下面的例子:</p>
$ C+ e+ x* j v/ W<p>复制代码</p>5 y- Q& h8 a) s. d% f, k& Y
<pre><code class="language-java">public static void main(String[] args) throws Exception {( Q5 ], |) G" E" P) |& G
3 r$ Y J+ l1 H
# O" t! [- \) Z8 U! u7 l
6 I0 \( T6 X6 i) Z) e StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();) Z0 [; {0 ^# ^( ]* S' d3 c; S5 F
1 h* K7 }: x1 W //获取数据源7 H( u/ B+ A2 A A* r0 w/ R$ {2 d
; c/ J- U6 E5 P" z; r& m
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
% Q9 t' B6 s3 k) s- D% w* F- u* F4 q
data.add(new Tuple3<>(0,1,0));
2 o) J; u& _; z6 ^0 Q, k% b2 h" ~7 n& o1 E, A0 r0 L/ r
data.add(new Tuple3<>(0,1,1));+ m0 h: a$ J+ R* ?
3 }) m; X( V3 W/ f" a5 p data.add(new Tuple3<>(0,2,2));* b6 I' [1 |2 o
8 \, w# g! t% e; {+ q8 E
data.add(new Tuple3<>(0,1,3));
0 X; X( e* \# x" Q1 B) g8 R1 C/ s6 n5 d9 [; A5 ~, E: N
data.add(new Tuple3<>(1,2,5));
2 [' y& z2 x8 C
J( _/ o3 ~0 C2 r; P data.add(new Tuple3<>(1,2,9));$ _0 \. E) t# ` t( V6 a
- j G5 o5 N2 k; E
data.add(new Tuple3<>(1,2,11));4 j u" m6 L4 o/ |9 u' Z9 _ t+ { e
1 `2 c$ y8 V0 }+ f9 _ data.add(new Tuple3<>(1,2,13));
+ r4 |( G# K. B: ~8 n
- n& p# j' E, _& w: V" z( _! r; N4 e8 ~# u# W4 K1 o R( i H, S( f
- }9 e7 X3 ~) }) |+ B* t; x, f4 b0 X4 R9 x! t. K; P3 Y
5 q( T: |/ ^. j& S" @' U5 E
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
: g! ]0 D5 m6 R* m0 v( d) P& x' a, L6 _& \3 Y7 P( T3 P
& }8 k. Q- L8 U) r, ?, _/ e1 O+ _- P$ k
) k( C/ r" \/ X2 t2 w
w9 O l. m3 _! I3 _ SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
' n3 J# C# S! J) f L1 C4 _3 h7 n( a- z. n7 `+ z" c9 C) |4 D
@Override
& `' N- o3 ?, e, J: K, f0 R# R: n& m& l! n u3 k( l, J7 R
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {( N% V: Y+ r! \$ X- `( i
$ O$ `/ p: |# s3 o5 w& Y" v+ W
List<String> tags = new ArrayList<>();4 P6 k7 _% s9 e
, c# m2 ] A- r4 Y if (value.f0 == 0) {
2 Z( F# E/ R4 l; w {" Y7 r
; P% i4 Y8 r/ F3 _2 V0 K; @ tags.add("zeroStream");" O" H& }* A9 Z! q% W; z# p+ @
/ B9 J# `% k+ m5 h. Z& \
} else if (value.f0 == 1) {. p4 E9 K- N5 ?3 w- W6 G3 O# h: B
+ F3 G6 t' E' ~ tags.add("oneStream");# I4 f; w/ a1 L9 N9 ^
* M2 r( K3 }" W( [& b5 [
}, f. t7 V$ t, Y) k) |3 J- ]
% w1 q6 T- u$ M* n6 C! X' r
return tags;' L: r+ D* P% t- q/ x2 ?) g2 Q9 z! t
" \0 o4 p0 j& t/ Q" V }
f: D+ v8 S, T1 e
/ Q* _9 D) a q$ R });
' {1 ^- {4 X% r) h7 H, ~& ^' o' r3 `+ b' L Z, o; o* `8 a+ d1 I
4 C2 ? m- g* V; ?
& d' H7 X' h7 | splitStream.select("zeroStream").print();
* v& Q& {/ M6 M% S- f& x+ x( i% I$ f o# q
splitStream.select("oneStream").printToErr();/ [8 U- d5 q( z! I8 v7 h* O
$ { P( C8 x2 B" d
d, h, x7 \9 I3 k$ d; U; X6 G! N- G: H1 t0 ]$ L" c) e
//打印结果
; @: u4 \3 h! x1 x$ J* X" I2 ^2 I7 a+ w
String jobName = "user defined streaming source";: N9 K( A0 O7 i7 n& ]
9 H" J+ D3 Y3 S7 S( m1 ~ env.execute(jobName);
. d1 x" {; A0 P! `' o& [. U: N/ B. o0 u8 a- B
}
6 K. D3 A2 b* {" a* J* o9 f</code></pre>
! ~5 ~+ W. Z8 d. Y<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
* Y+ l- y8 n" F# j* j& s$ a T<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>$ z! r8 E% h/ ~& G
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>; w! w( c$ y0 S0 Q
<p>复制代码</p>
, N! f1 j) }! q/ K& L<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
! z( u4 ]. X" g% F% E( a</code></pre>4 ~' _9 a% D5 n/ q5 M8 p
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>: S% S; X1 N8 r) m2 N# L r
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>0 m1 b+ b- x+ Q* D7 ?4 U
<h4 id="sideoutput-分流">SideOutPut 分流</h4>* J9 n& @/ a, L0 \+ }$ ]
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>7 i [5 [2 L* ?3 c! ^; X
<ul>
" ?) q8 W3 M: J6 S8 `2 Z# [<li>定义 OutputTag</li>
# V7 g' u C$ S& W; L) L+ X<li>调用特定函数进行数据拆分% E: V# d+ }9 L- c4 V* f! d2 b$ P
<ul>
1 N3 b& [* f9 P; Q9 v9 Q/ Z<li>ProcessFunction</li>; t! n" I% b- z8 d
<li>KeyedProcessFunction</li>
! w1 o2 _1 @# c1 Q0 J, R<li>CoProcessFunction</li>
1 ~& q2 d; W7 q+ k8 B, j<li>KeyedCoProcessFunction</li>
. E8 C$ b' Q) l3 Y+ I" C<li>ProcessWindowFunction</li>, H) @3 B& Q$ P; m8 ~/ T" N! E, K
<li>ProcessAllWindowFunction</li>
2 r5 F% h& M7 f9 E! }1 ^</ul>
: c e) V2 @6 q3 }/ F* s+ ~' G</li>
* t8 k. r, E; t# M6 ~</ul>- j1 w+ I7 N2 j# p; Z0 |
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
0 J" s9 w. @/ E3 Y<p>复制代码</p>" F3 ?8 }! {' l1 K
<pre><code class="language-java">public static void main(String[] args) throws Exception {
2 R# T* s' p6 j5 u) L
8 S3 p# ^& m6 l X W2 p E& s" ^& r8 E( u! r- a# y
% s8 E, _" ]0 a+ G- N, A StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();+ E5 Z4 w8 G# C S
& u% J, U y9 e9 c //获取数据源4 s1 i! N5 e: U* Z6 ]# O$ U0 j
' I5 a9 G! M0 T) Z4 @; @6 K$ o
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();* `( X; q$ S3 l2 r$ w! Y* w9 v
" P9 x: ^- c; t8 k7 j3 ^/ C6 @
data.add(new Tuple3<>(0,1,0));
) `- s: H" D7 z+ p$ X) ?
0 P" J8 D* R: O* D; @" D: |8 v data.add(new Tuple3<>(0,1,1));
5 c! _' @8 c5 k: q3 V$ A3 R% I) q# J" }
data.add(new Tuple3<>(0,2,2));
, m- f+ g J/ N2 e( n3 _" t0 t3 @9 ~+ ~$ u! {" o- @! W' f
data.add(new Tuple3<>(0,1,3));8 A* O2 z$ N: h, b& X( p1 R1 D
: P3 H* e0 s( Z* l data.add(new Tuple3<>(1,2,5));' m9 |, [! t! @" [6 b. d: K
6 r, o% `1 O9 H. Z; A7 n5 h data.add(new Tuple3<>(1,2,9));
! ^) |1 b2 N' X7 k3 ~/ E% W3 n+ Y
. @1 G' K- U/ \+ I3 B, M Y5 ^" F data.add(new Tuple3<>(1,2,11));
/ n8 F+ r& v- F% [
* s* ^+ f- Z, e: E2 ]% `. J data.add(new Tuple3<>(1,2,13));) g: g) ^; s/ L, J
5 b- w) _- }- ^7 Z( a' h2 }7 a
; k' M$ P8 S* X' u. P) D4 h2 P* K' r& Y
9 e& ]3 X2 k% I) N7 {$ t# f: c. x
\' f5 L- O3 q h DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);% x! p% t* U) J" G0 S4 m
2 Y1 _, _" [ g. i( i2 T6 @# q6 f. F$ B3 J. c, _+ z
6 b5 g7 L# |# t% F$ P OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
4 i3 w- ~: ^; L# |: W/ ?$ b% t- z7 d
- s+ h5 `8 a) X. [( s5 u OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
3 p F+ g9 _. ? r: w9 M9 p; e3 D6 X& V% u4 {3 x) A
1 ^( `: e8 f: j3 I4 I
$ v) m0 M) g0 h' O; I
8 S* r* ^# n/ _" ]% a( J! F" g! B; T3 e
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {. R3 @0 ~* D# \2 A( o8 [8 R: F
* y' v/ ~# P, Z1 v
@Override
' S' ^8 H7 x3 g( z) _& t
& F7 |; S U( V! Y7 Q% F+ b public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {8 A$ |1 @% C9 ]+ y5 O, E
! Y1 R: u7 K0 a, ^( h% j g# }( v
- p8 a% a+ w, [+ D
o6 | r/ {( m& o/ ]+ d9 C
if (value.f0 == 0) {
9 u7 I3 \; D9 l* @
7 D9 E4 N3 z4 G F7 l$ d ctx.output(zeroStream, value);
1 s! o, _! i0 G! l5 X5 o) ? h, _6 C* s. H( i
} else if (value.f0 == 1) {
9 c p: B# N1 N/ v0 ]& m0 X' e' O i Q- T) E( D r/ S
ctx.output(oneStream, value);
3 O, U; O( D$ Y) ^ j& D q' K; }# f( ~2 b
}$ P, i1 c; l) C- q. V; |4 D) @" `
7 Z J8 B' S. _4 i8 _% w
}+ Q5 _# o- V! u" I) u1 @/ x3 Y
* N2 a5 Q# v3 P8 x% [# a+ s# Y });
/ T z3 N2 J8 P% J8 }% |- }6 T, [, d0 i/ @! Q2 u" @
: W3 D! G! u0 { \: t8 W
$ j% \+ A) V7 e, F6 |
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
8 j' P! c7 c5 a t h9 D- |9 m" u( Y# F( K7 l6 C7 j
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);2 e& v9 K% H1 p
- w7 P5 k2 }+ l& \; E% T
6 w4 l4 n6 j+ M6 g, g2 p
* R1 m7 R* c. ^0 L, q2 V$ f3 t. X zeroSideOutput.print();% b! @* L* |4 y! z0 @0 [# Z
5 d4 I3 M$ {) O0 |6 `
oneSideOutput.printToErr();
[ P b |1 ?' \# I3 z6 \1 w: v
- R; y6 V6 v) M& I [
2 |6 x' a. c6 r! `9 S8 E8 U: b( ^4 T+ ~! h& d3 g4 ?! u" s7 Z
- y& Z. S) P; o& {+ x. x
5 t/ A0 }2 a" y* j, n //打印结果
& f& H' z9 F* V) E
- |& U/ b1 V# y1 Q% _4 x* q5 q String jobName = "user defined streaming source";
2 z- s. `( {# W0 s' A! i4 V; I" m7 ]9 j; |& R+ o6 R a: ^
env.execute(jobName);4 t, T9 M, {+ x D$ W
6 N- P9 p4 ]9 S}( B" P; ?0 w6 X# R! G" _9 Z
</code></pre>
) t$ n+ g& T* i9 v<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
- [2 k. ^% R' \' q1 W+ [' U<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>7 {2 |& ?5 O3 |9 `6 y$ f
<h3 id="总结">总结</h3>
: e( P8 A5 n/ B7 h( m5 z<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
4 V$ C) W$ |$ p<blockquote>* W' z6 g! ~+ M) S' l$ q; X
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
& r$ g. }; f' p& g# Z- f5 H1 G</blockquote>8 k" T3 Z, ^ B. b% E X
( S0 W- L! R F2 k; c |
|