|
|
9 h: ?( H! p [& v<h4 id="flink系列文章">Flink系列文章</h4>
D' M, [2 {: ?9 J7 v3 }7 m3 R<ol>0 H: |5 e% y2 b- a: ^) o
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>: U" g' F* w7 t3 Z3 O4 Y" S, i
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
( h3 X8 _7 U; y<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>% R% X5 p# `5 ?
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
9 x" r/ n' J G0 f<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
& e: N. X5 M5 Z& R1 S<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
2 W( K. ~7 H5 D<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
- I* H! R. `2 ~# A' S2 T<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>$ J4 s% M9 M- g
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>: g/ p# _1 j+ m0 l+ V+ D5 ]
</ol>
x0 [/ J6 H) m, n' k$ M! b<blockquote> ?: N$ x* k8 j5 q
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p># k9 H2 ~) S( k; i, K, @
</blockquote>
# A% g# `5 Q, V4 v9 E/ \+ N7 x<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>$ y8 b$ q" r* K, P; W2 p6 o
<h3 id="分流场景">分流场景</h3>% b) ?5 {3 F0 w
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
; B, j0 ], V* \& O2 u7 r<h3 id="分流的方法">分流的方法</h3>
) [6 z" w+ x! ?8 a: C0 h<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>1 d" b X! l) L0 d* n' J
<h4 id="filter-分流">Filter 分流</h4>: t. u7 [* l! G$ H( C
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>- O* l5 n# i# }/ I: H- N8 R9 z8 p: @
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
8 J! f0 d' t! X# K, U3 I* w+ s<p>来看下面的例子:</p>
- Q' d" v5 a3 u E: {1 a( i<p>复制代码</p>
H9 N6 ?) X$ `$ W7 D<pre><code class="language-java">public static void main(String[] args) throws Exception {
' M. |0 J! Z% R: T StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
$ w& G$ b4 G( N# S) _9 l, N0 K //获取数据源4 p9 G/ V2 E# Y# U
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();$ w E1 i+ L- G2 F, J1 Y8 i
data.add(new Tuple3<>(0,1,0));* d9 g2 s } o- r0 d! M' V( J
data.add(new Tuple3<>(0,1,1));6 K8 q. T$ _) f, {4 J0 H' w
data.add(new Tuple3<>(0,2,2));
+ x, s' s; n# k! A- `5 e$ f1 O data.add(new Tuple3<>(0,1,3));; c6 p, K5 i# W9 W
data.add(new Tuple3<>(1,2,5));
7 Q- Z& Z5 J2 \. i! l. F$ w data.add(new Tuple3<>(1,2,9));
- Y7 _! b+ g% |5 J$ |& b& Q data.add(new Tuple3<>(1,2,11));$ i8 Y5 I4 I& u
data.add(new Tuple3<>(1,2,13));
5 t+ ~# y4 q( e2 J& E: g" O
" C! q0 t* c" _& f1 M DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
8 B. ?9 Z X) N9 g: J' o
# E# y0 w/ n, J& G+ ^9 Z3 H( E9 }4 Q7 O
' _3 l/ I# F2 a& ~1 g
( T* a$ A) N$ s3 \8 g t SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);: W# B. \2 {9 ^6 K0 O. \) `
+ O) a6 F( d: R3 v8 M7 y* Z
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);, ~ d) |* H2 ^+ h/ [# u! P
3 {! B) y! T8 U! q6 [
6 [ p' M6 P8 E* f F! \! E5 _
) |! w6 \# G" d: C9 T zeroStream.print();6 p2 K. D7 y2 J m6 w& y; z+ [
4 l3 z' E9 s( [7 N Z
oneStream.printToErr();
. ^3 z2 ?- s8 t) i# x' V3 v0 i8 e' F5 e9 A# h. M
$ F2 B% m- e2 G3 \- w" y& }
) c' i* s/ h% ^8 c. x0 j: f
. D+ G" h: E# n1 |0 U4 t" D1 X' m
( h" g0 X* V" P& Z+ b //打印结果6 X+ t4 `3 [. P2 z1 j( B. A- [! ?
$ g1 q* W! M# N& C4 q$ I1 ^" Z) Y String jobName = "user defined streaming source";& i3 f. N) \) ]* _( w7 @
' B( o) c+ G2 g% k env.execute(jobName);( N. S7 |) P5 P8 V4 q9 {1 K4 v7 v# T# a
/ q6 ^/ V) J E: N}
1 W% `- s8 `) H5 k' w</code></pre>
# F" t1 M( ]8 k3 f<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
0 l3 N$ w( @* v) i, j<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>1 m0 q! U- b! I0 a1 R
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
1 K$ w d! N0 s! I& p1 [- P. S<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
4 G+ j+ J9 i& X" f8 i+ E" J<h4 id="split-分流">Split 分流</h4>
) M" Y/ K# ^8 b" F2 G6 E3 O9 x, I/ H9 E<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
1 P0 B# e4 L' X$ |( G% k<p>我们来看下面的例子:</p>
: b h) e6 @# X( b<p>复制代码</p> e9 n8 b" E) t9 Q/ s- X/ o
<pre><code class="language-java">public static void main(String[] args) throws Exception {. k! h. ?* h; ?
1 f8 I0 F- U& u% U* l0 R. Y5 d& d0 {
3 q. i+ J ^( f, b- e3 [! W% s! B8 o9 V& f5 l/ y# K. J
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
$ u- Q: }8 z- b8 c. |( X! j9 Z2 B0 a9 b. b
//获取数据源6 h0 k! v6 x: n8 W
: o/ D! a: L. K5 \, Y. s' { List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
L* \2 q6 K7 M0 ~- I. B: F) M& r* _! W' T. ?
data.add(new Tuple3<>(0,1,0));5 F" I( C9 T7 a
- d9 O( T, \4 Q, x data.add(new Tuple3<>(0,1,1));
- y+ E9 N6 {" B4 {. U0 N" s+ I/ s8 w& J
data.add(new Tuple3<>(0,2,2));
# z, L- Y X( t: t: h4 L- R* O1 K7 I9 m* d- ~6 o- X; M; O: [: n
data.add(new Tuple3<>(0,1,3));! q" _+ X) q/ j- o. Z) }8 Q$ S
. Q% t& X( W( D* O
data.add(new Tuple3<>(1,2,5));
& T- Z3 i. \' C7 W1 \4 C
. r: i" Z7 m% s- E: l6 g data.add(new Tuple3<>(1,2,9));2 X# ^+ ]; N7 l$ n- d
4 ]/ c! t- Y0 \! [' ? data.add(new Tuple3<>(1,2,11));1 a- @. `: A( W. [
$ w5 s8 l [' @9 @: I+ F
data.add(new Tuple3<>(1,2,13));
3 {6 @* H5 r3 r- X+ i
8 z/ }8 \1 F) a) G9 |( N
/ k( f" J/ y: V+ D4 M( e
( c* p& ^! B$ u1 s- U( v2 O0 {
9 [5 T' Z. K% h% u: r
$ ?% o7 ~* Q; g2 @/ B I o' V DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
' h8 S8 W6 m' K& u' J5 p" ~, H( e/ ^$ |
/ ^/ r2 ?- x; |" b. S" S& s- P
8 Q3 K- S; z) y m/ c& ^0 v5 m
6 G r* H6 c" s9 m' a) K w3 \
2 c3 p; S9 \ L! j7 {: } SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
* E% S K3 j% X
8 i( |9 m8 N4 A% E: Z @Override) J2 Z- @5 F$ G
! n( n' W( n, Z# y: g
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {' J' r g) ?3 c# t! ?
; [- W& c9 D$ F List<String> tags = new ArrayList<>();, J) Q, j, ?0 A. h0 \6 L# k
, v8 ?. w6 |) U5 N! z$ d( h
if (value.f0 == 0) {
# d L+ z1 M7 ]% P4 i7 c) `
% W/ p( J' E1 U/ S tags.add("zeroStream");' k8 P: K, J% `% `9 i
5 g) R: [: W' N* S5 i7 z! G } else if (value.f0 == 1) {8 {8 K. J% L0 e. k4 X5 ]" k
. s' f$ z7 `% y" ]: g5 W3 L
tags.add("oneStream");2 u$ m5 ^" p- b8 ]" V* S
# t7 e4 z: y( L4 K }
5 H( `+ W7 e# M) Y# Z* ?- {
6 e3 O7 u: j1 @, } | return tags;: s3 w9 l$ b5 W( _, } S% Z# z
' g, @) h2 T- Z8 H A
}
- p! s4 @0 B' ?" ?* z3 E* }9 ]/ n' q
});) s ^ q, z. F7 W
& j5 m# K+ y% x; {) l! l' Q9 [$ d+ \0 L; \8 s2 H0 k% z' v
' f+ z1 b0 P; r7 e' e splitStream.select("zeroStream").print();' T; k2 b: C( c* x; p
. J2 [& e( o8 p3 r splitStream.select("oneStream").printToErr();
! h e' f# V& q- w9 P, X D; e
0 b, N8 e! N X S# ~; k
, N: c/ k6 j! _$ B$ n
4 o% h2 ?* M' R# J7 h/ `! z //打印结果
: K( z' J2 F2 L9 f2 I9 `3 x( |1 n* q3 f P9 R
String jobName = "user defined streaming source";
2 q& K) D0 Q6 M
& C3 g( k r y* ^7 L, r env.execute(jobName);5 C- @) f; p4 ~) b
0 z( L6 b- z; n9 @* q c& K}, X H2 C+ t# ?+ s U4 b
</code></pre>; p( w n$ u% c0 r+ J) T
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
" ], q6 O8 T4 T( L! x<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>! C) p+ J2 ~) m5 G# r, o
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>2 d6 v7 Q7 A+ x: z2 `5 t4 p. d
<p>复制代码</p>
& \$ D: v9 D4 k$ ?( H<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.3 e' w; }3 L6 G; E
</code></pre>
: H5 O. T# Y: u4 H4 L+ _5 l<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>% _( S" @ v( C- a2 ^: D
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>* K, E. C% A* v5 `1 D
<h4 id="sideoutput-分流">SideOutPut 分流</h4>7 Z+ q$ _, }& b$ ~, T' `. p/ [
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
* [( M# P, x9 I/ w1 U<ul>
8 b) H# C, d6 M' q6 ?1 D<li>定义 OutputTag</li>
( X# x! [7 v/ [7 D<li>调用特定函数进行数据拆分
, C/ w5 I" e- @( @ _<ul>
+ `1 m; V" s. D, V( K<li>ProcessFunction</li>( A6 Z3 J1 A I8 \2 o/ ^# q+ S: B
<li>KeyedProcessFunction</li>
6 z+ I, d& a9 _$ _; |6 a<li>CoProcessFunction</li>
' T" b# `+ m$ `# X6 _# S- @, J<li>KeyedCoProcessFunction</li>$ E/ ?5 U; ?( A/ W
<li>ProcessWindowFunction</li>% |7 A( k9 u: A6 d+ ^
<li>ProcessAllWindowFunction</li>7 t% q F' Q% k D- U* u
</ul>& \4 e# q# P2 f( B+ a8 T
</li>
, K P5 k0 L: T7 t8 s. j</ul>4 q! Y7 s! R* O! x: M
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
4 D: I* S/ V. g4 I. d" ~) }<p>复制代码</p>
& t4 B* r; }# n9 W5 g7 B1 k<pre><code class="language-java">public static void main(String[] args) throws Exception {
: v- C6 n' v: F$ H$ q0 Z% b5 @6 k- p. x
+ P3 L0 A' ^8 Z/ I& i. I( E
2 p5 T, J* \$ J( L6 O c StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
: v. X: e% p6 _; l4 b
" R+ H0 [& q+ _; ]5 M //获取数据源
: b3 Y* x9 d: |! D$ ]2 ~+ D. E5 g( L4 L! N
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();5 _9 N9 m$ f% g' G+ R# n
9 J4 Y4 ^! h& W* @$ l data.add(new Tuple3<>(0,1,0));+ V+ B+ l8 T' i+ _% J
. x; E+ f% R) n) h) [
data.add(new Tuple3<>(0,1,1));/ A$ D! {. ]2 ?: ^2 |! ^; F/ F( Z' |
3 f+ L, }# k, f- x- Z$ N6 _ data.add(new Tuple3<>(0,2,2));
+ `- e1 ]" U* A3 K. u* q: s
/ p3 h( k/ r K/ k data.add(new Tuple3<>(0,1,3));4 M/ A, n8 \3 J( O: T
8 ^+ F0 ]0 |6 ^& `% U data.add(new Tuple3<>(1,2,5));
: ]3 e/ d) s9 S8 h
/ k( r$ Z9 g5 o9 A7 d/ e) k data.add(new Tuple3<>(1,2,9));& a9 o. O7 ~1 }) f- m3 `
4 f% V0 \7 Z& |# s data.add(new Tuple3<>(1,2,11));
; @; l- T; Z6 S$ H. _& S3 H! K' c% R& w* a. N9 @) `
data.add(new Tuple3<>(1,2,13));5 g1 ]* K' ^) }: x' W
2 ~7 Y9 u. T3 S( |# k6 N4 E a( Z
2 _8 ]/ }4 S/ x. K k7 C: x
W8 F! ^) g8 @1 m. q! J, B
6 g4 { I4 T( N8 Q
/ \+ F+ u1 ^/ k$ x) V6 s' Z DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
5 W2 m1 t7 _$ x. F1 P0 [ s" z" t* v k1 Y4 a$ b- j
1 d% H0 I7 L, A# D; z
( l5 _+ M' m' Y. X/ p, s OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};4 r, P0 \- X3 Q% W& r
% q3 i( b r' u- d D, o5 l OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};! q& K, O5 Q2 l4 A
D; Y/ s( W6 c' e) w9 q* Q2 ]2 S* `
% u) j1 I) u& L. d2 M! G/ c
6 e0 L0 X3 r# ~ a9 F" u1 b' x% K# X
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
: p( _/ D; `+ ?) U1 E3 z' I
! M# ~ j3 z" |- {& C% | @Override
% \- r. \0 ~6 s& Q* T9 e
. Z2 a4 A' ~+ |/ q- `+ e) h public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {1 @& b# Q$ f/ y! i
! a" y4 G" d. s
# k1 Y- \ c- N1 W- |
' [4 m' O# A' Q# U6 b. z# G2 D
if (value.f0 == 0) {. e" ~. h& M, u" }$ d* a
( I3 E. g5 `& n1 n/ ~# D. W
ctx.output(zeroStream, value);; g% V" D; \7 @ _
: @3 Y6 H# l& s
} else if (value.f0 == 1) {
) N! b9 a( ^0 i( b! y9 d8 Z( [" q& N+ n8 q; y
ctx.output(oneStream, value);
' G% \1 z: V! d, S/ N- e4 f7 y4 o) ^ q7 z- C! p7 Y
}* y/ w8 h8 w5 w* E( u
# D6 ~, _( P6 k! r3 M }
1 W# ~/ Y2 z* W& E; Y; |! W" x
! c, ?( [; E- I* f });2 _8 M1 J; S9 ~* F
9 t" H. y* {& s9 R' U( W! m
- \. }; ~- k) U) k
8 \) N8 @7 H: r+ R! K { DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
$ j( [- d6 c" E+ Z7 D4 a: U- E, c
* a9 h1 \2 r* l0 V; N DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);3 J, H# _* R1 t( |- P' W7 `) u
. E! j9 Z6 T7 j) I7 t/ i5 r- K6 N( N. n1 S7 l6 J
& x; F9 x! Y: Q: z/ H, Y; ^ s- o zeroSideOutput.print();7 s" l0 m& K# K5 A" P& c) }4 k
$ n2 ^" Y" H- M( M) M) T& g/ ~
oneSideOutput.printToErr();
9 k# P4 j) s! i4 H5 Q, t- h: s
+ h& K G Y9 u9 c, I5 B
# F! H) J- |" Z3 b- ^$ F7 s" |0 a
& V* R# E0 o8 K
+ F8 U9 f4 C" s% d
//打印结果
- w( ?- I+ @+ A& _9 ], L
& o, S" Q0 s+ n( T) [4 `+ O; L% { String jobName = "user defined streaming source";5 t2 C. K. v/ |: s9 |% F
7 L; J- S1 g* u7 g+ y* E
env.execute(jobName);/ |; m& g( A j+ k& }
0 S5 A9 K3 k* g}4 D. B' ]1 q' I | _1 L
</code></pre>7 ]) w' M, Z% J8 e, E! ~
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
: ^6 b/ y( i: @5 K, ?% j" l<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
3 ^# P0 J& L+ z! ]& d+ C9 A<h3 id="总结">总结</h3>
- W. e" x& E: J; P) d; h<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>, I* T- S; t% m8 l
<blockquote>7 j' U2 K1 ?: {7 S$ d
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>6 G3 o9 ^% Y/ g, N
</blockquote>" L; v" s& H' ?7 T5 I! i+ L) ]
; G3 K: ~( v& Z# n. S& ?! t |
|