|
|
8 Y# f6 P: e9 b% R<h4 id="flink系列文章">Flink系列文章</h4>
3 L, s/ x% `( r" y. ]<ol>( R. C, Y9 [6 x5 v5 |- F9 o
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>& y# w+ D! G- j8 @% H9 U' V& k! W
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
0 w9 u* F+ K0 b% q# `# z<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
0 R9 {% F. Y5 I. ^8 @! u/ A<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li># J8 D# U9 n8 |' k
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>; M& m: L1 Q; d
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
* @! k8 H8 y6 q<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
( r9 Z+ g$ t' i# z$ P" h* W<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
) D0 v( _& e( V2 B( C<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
7 Y! a. w8 I, r, ~6 c</ol>
* }+ l6 I' ?7 `. j; e6 ^<blockquote>8 S2 G; N. | ?' ?7 b
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p># @ J3 Y( ?% e V
</blockquote>
f! G( z8 O s9 j. M% k<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>- }, \% k0 Q$ E; F; e6 u* N( Q4 y
<h3 id="分流场景">分流场景</h3>! r& e, z3 B; T# R0 i
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
+ p* z% U" p1 X% J, S/ t* O* I<h3 id="分流的方法">分流的方法</h3>
+ p, g. Y# W3 E<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
3 u) a$ Z1 ]8 z' v* E* |) {<h4 id="filter-分流">Filter 分流</h4>
# y7 `5 t4 w8 [% g, `<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>* U' G9 D9 f$ F& U q$ L
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
3 I& b4 a7 i9 _& X$ t" w<p>来看下面的例子:</p>
5 m$ Q% t- Z3 O- ]! o<p>复制代码</p>
$ l5 Q) z3 u# y9 ~<pre><code class="language-java">public static void main(String[] args) throws Exception { W( F: O( [+ ~( v" Z
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();+ X V2 \1 Y& [9 F- u3 a. l
//获取数据源
) G0 O6 N; [( V- c, ]' h List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();% ?8 S! E# M8 L* _ H+ C
data.add(new Tuple3<>(0,1,0));7 j! |* p5 ^3 F$ Y2 p: _0 f& T
data.add(new Tuple3<>(0,1,1));* W0 f9 e7 N; F. r8 d4 X+ E7 `
data.add(new Tuple3<>(0,2,2));6 @, V" N* a8 d" b
data.add(new Tuple3<>(0,1,3));. M& y# _1 A4 E8 Q, X9 l
data.add(new Tuple3<>(1,2,5));
9 K' T' G( w% r& t% I7 [ data.add(new Tuple3<>(1,2,9));; n3 U" e4 _" p! `; {4 y
data.add(new Tuple3<>(1,2,11));
8 g$ b9 U) b, J% H data.add(new Tuple3<>(1,2,13));
) Y) v) x2 O3 x# K: [; B( F5 _: L" d C5 K4 ~- J0 d- l
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
' _$ s0 b: r/ L3 ^ m T( p, _7 m0 {
@# o: K6 [' H; Y$ h: v/ v
* x1 i) R3 K7 f& G! y
) a9 V! s; ^2 T( m! f5 { SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
+ b1 h/ J- E' t" F. s) i4 w
* z4 S; T$ s6 n4 h4 W$ S SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
$ c% I0 z* h. @. G" @2 x& L8 o6 y2 w) V
0 e6 K+ `' L6 O# X+ C2 p
1 s5 G+ j! T" i& {! h2 a5 [ zeroStream.print();& _3 w4 `4 K% S( U
* B& I9 K& U1 |/ v( Q" N8 ?
oneStream.printToErr();6 I9 y0 f8 [" l0 z
6 A+ s( B* x( o( F8 M/ |, U& g f
! ~& }1 t" u$ ]7 i* s4 c! K
4 q2 A# u. U3 t0 j. @8 r
8 c& M* H1 ~. u
1 j! \5 D9 G5 F* |, F" y: n1 v //打印结果' a* ]' P! u# g3 i, C; y
8 [/ R( y: T; O+ `9 J String jobName = "user defined streaming source";
" U, H/ Q" e' i) r' G Z& o1 V/ N) |; ~1 }2 [# ?. h
env.execute(jobName);
& k, g) b0 p7 W s
: Z. K' P& P6 [* S}
2 r4 {) x/ z! }! ?2 ^) T5 k2 e</code></pre>% U( W6 K$ `6 M$ F: L
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>8 j1 K# L& R. D! P h7 Z/ L
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
- ]) y$ V5 y% ]' l5 V, J<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
- N# H) ~# t4 g<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>. E) h2 s; \5 y8 M
<h4 id="split-分流">Split 分流</h4>8 Q$ O* |! K9 P. [# N
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
: ~7 H$ c1 ]. ~<p>我们来看下面的例子:</p>
" x ]0 y" J4 L7 ~! n+ | V<p>复制代码</p>
6 a5 T) i! D" }: O: D7 m<pre><code class="language-java">public static void main(String[] args) throws Exception {
) ]+ ~2 G o+ x
1 k" u4 ~& Q3 i& T% I7 t( e: e; m& [( o C2 N" n5 _; g
7 J1 z U. O s$ X StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();: p7 Q% J. ~7 P- P, J
! y3 }6 \" t' `8 H/ ^
//获取数据源
; F" C: T% ?1 F/ I5 p/ P+ k
4 K( e( @/ _- E List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();6 X8 s( Y, @) v- @
. f' t* `8 Q4 t/ y1 Q data.add(new Tuple3<>(0,1,0));9 e& a4 j) N$ Q6 b
! o3 n( V4 a. e ~) y9 B5 k data.add(new Tuple3<>(0,1,1));
+ a4 F K$ n$ F
! a7 e$ {% S* v6 W- G data.add(new Tuple3<>(0,2,2));
0 Q& }: l9 f- D2 M0 S
! s* _/ m! L" |' W/ b* P' K& g& ` data.add(new Tuple3<>(0,1,3));7 {- h+ W7 y3 p9 R' X
6 h6 r; W! A) G( }7 h. M( ]1 h: L
data.add(new Tuple3<>(1,2,5));
) N$ m9 ] o i% T' F& g8 W, J$ _% }6 A9 y" i
data.add(new Tuple3<>(1,2,9));6 R& h/ Q/ M: N% [
) _, R2 x) \; A0 S l! B2 f C data.add(new Tuple3<>(1,2,11));
6 v, J0 t9 `8 G# a: x5 m3 p/ ~ { y/ i& Z- d) v* ~, C+ K) [) Q
data.add(new Tuple3<>(1,2,13));* m3 U, u7 s u. N# u; M
) y( d* @- o# h; R" E
: ?- d0 s& V9 D- `
' ]7 S. {2 n* [$ N. p; i
7 }; Y. e* \0 F, H& C
- n7 W' P9 J# M! g* V DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);) d. c' c! ]4 E4 _# `4 o
- i }; V) g) B: @, Y) u6 a. }7 Z4 i1 @) p
0 ~6 A4 X6 x3 h7 B: \
% y. ?: l* d- W7 n% k: I
, f W# H1 x) t' y9 b, o: A SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {% v; [+ R" A2 k, t$ ?) z, n
9 j' a: ^4 q! J' f* d2 b6 f- T
@Override
; v2 b# @8 z/ `" D" r% S
- `7 a; \' s O5 W+ J! W; T' [ public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {. N% ~/ T) w o3 C/ y
, m- e: i2 T% o- I+ W List<String> tags = new ArrayList<>();
8 t- S& t0 l3 J) W& T0 X8 w4 g2 y& X
if (value.f0 == 0) {8 t- ?+ `- C( I; V- u
3 w6 }+ P" [; q) ^4 q- U, @ m
tags.add("zeroStream");
9 V; a: Y/ K; A2 j5 K5 S
8 i) W3 O. a9 e o- H. g } else if (value.f0 == 1) {' w0 W. p/ X) E! x1 ]
+ K1 z4 Y. z# N5 n; T
tags.add("oneStream");" {! w" ?* ]3 T3 {; E" a! _8 Y
# Y( u: S) ]0 M* W/ h* X$ o }
& n3 ^' b7 D. K* l6 q" _9 \- O3 b. o' W6 l. _2 f e( _
return tags;
4 P% f6 g- r, H1 K$ c6 H
5 _: f0 a# L9 ^6 P: D' X }, Q9 E, i% h+ U
* n( p6 M8 V2 d0 _& h9 n6 S
});
* m0 M. D! D- M% |3 z8 ^% q$ L
9 M- M6 T4 q- x/ g
$ G8 w; I5 o, u0 r
) @8 X& g, `9 ~/ p4 W: C; Z splitStream.select("zeroStream").print();& f, J- a' R3 M) Y1 m
* e& k$ x# l, Z
splitStream.select("oneStream").printToErr();
! m8 Y8 B9 X; ~) k9 o$ [
% W8 U4 u7 P9 `9 l% G
) z! e, A2 ]7 S2 y
4 C4 C" n9 P, n //打印结果0 ?2 [/ A) b, _6 n5 X
8 G n( ~& f7 N' D; i, A7 S1 }
String jobName = "user defined streaming source";: F% B3 Y0 }% [) \6 t/ P0 h, J8 D
# @" Q. n# ]& V4 J8 w* d
env.execute(jobName);* Y, e7 n! o6 C# U9 B$ T* M7 T
4 h. i/ Z# t) L# M" _# p. P
}
+ F9 k+ Z+ h. a7 w# S</code></pre>4 x7 p' r f) R
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
, B U g, I8 y$ M<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>6 q, n# e2 O/ e! f, l* [1 c O
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>9 p2 b) h- ]0 L) p
<p>复制代码</p>
" u) ]: a6 p8 \/ P9 K; h s<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
' M! @+ W$ ]5 b0 P0 M* ^; d& K* J</code></pre>
6 q2 A( | r5 q<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>' E/ v2 e/ n/ f
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
# p5 F: a# P5 H3 I( ~( G3 W1 C<h4 id="sideoutput-分流">SideOutPut 分流</h4>, B' q" D# h$ H. b& {0 _
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
& A" k) j5 o9 W8 k$ Z' V<ul>
2 a0 o1 L% a* h, I1 |2 W<li>定义 OutputTag</li>
3 ^3 g' O8 p1 e+ O" t4 R, e<li>调用特定函数进行数据拆分
# D9 \6 L |& w( Q& I4 l<ul>4 M1 m2 @$ G* [/ x* H
<li>ProcessFunction</li>* P* I3 E) u- }5 `- o) D
<li>KeyedProcessFunction</li>, l: T4 A5 b+ e/ ^/ c5 Y, z
<li>CoProcessFunction</li>
E' }2 _" _, L; e# Q& P<li>KeyedCoProcessFunction</li>
* }# w% q6 i) I. f* [; { p<li>ProcessWindowFunction</li>& x8 V' b* w: k& l+ }3 x9 t6 G5 Y
<li>ProcessAllWindowFunction</li>
- c8 p$ ` ]. G) o; V. O8 ]8 l! f</ul>" _# @& U, \6 L3 Z
</li>
% l! W- M$ \4 j e( T</ul>: V1 `$ g6 p* W: K# g" j7 h+ S
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>, n1 }! C( b3 _+ g$ A* u
<p>复制代码</p>( V z, m3 d2 s
<pre><code class="language-java">public static void main(String[] args) throws Exception {
6 t. Q3 g! K3 A9 G! R- R9 C$ T1 d6 f: D$ q# ^- P
- K" y6 ^ V( @% f9 l( m
* I6 B! @* w, ` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();* i5 J* }0 q4 v, r( |
8 J5 h# h* N: L+ f4 ` //获取数据源
( B9 W. |! ~1 R G$ C
- r4 s: }. \. Y7 H6 x* |& D- u List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();1 A7 b, d7 }# U
a, h* j2 Q! e' D3 B- J data.add(new Tuple3<>(0,1,0));
% k- X, `+ U/ R c0 o+ t5 _+ |4 ~: X+ D7 r: E& J5 c
data.add(new Tuple3<>(0,1,1));9 I3 `- j, [, v" G
0 N$ W- m# g' w! }; [- ~/ e
data.add(new Tuple3<>(0,2,2));
. s: b" y3 ~# C& t! g" G4 D* \" {' ^' o1 V, d S
data.add(new Tuple3<>(0,1,3));
2 G, y8 F3 l5 H
8 y0 C) U" y9 w% w" Q data.add(new Tuple3<>(1,2,5));
& O0 J# Q, L9 a4 w+ H" b! `8 r Y+ V' w* i4 {& e7 s
data.add(new Tuple3<>(1,2,9));& X5 R/ P% w* J3 y; h
2 R3 q4 I- e8 p& K. P data.add(new Tuple3<>(1,2,11));* ~) n4 M1 M, l4 c
$ M- o3 [% I: ^: d
data.add(new Tuple3<>(1,2,13));) V( _ p2 W O( j
8 I5 K# D! V8 S. p' ?
; x! N, L3 C0 e( j/ S h' Z. V$ b! t" t/ j" @) C! O! F: e3 v
/ ^( L% M2 C1 t! j* z5 c, M2 E; m7 `# ]: {/ w# {# t8 v6 @
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
7 g+ B) B* N8 z6 n+ q% g
* p2 j1 t$ {* _8 v% x7 n/ ^+ P+ U1 M
2 M0 N6 b) x9 G T; r
( g1 N! H n+ G5 A OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};3 Q% t' R) U! a7 P, y. r
* O, O4 B* ~: R
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};( U9 Q8 [5 r4 d# @5 k
* q/ p- C; p9 j* o# n
1 T9 H" ~/ e. U$ ]# M3 `
0 X5 z6 d: k- ~8 x2 G
' E$ M+ q$ h/ ]# _" @+ D& I9 n8 @& f2 d
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {1 e# F* B. @, v" {% ^) `9 S
7 q1 D9 ~0 F+ N s5 v
@Override9 Y# }- G; r+ v7 [- x
9 @! Y+ i% V2 h4 w, ?* s
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {, l4 B" I* u4 {2 n+ f
" s v3 _. y6 T: z/ a0 K
O m# w/ X0 W% Z7 l# l+ B- w
1 F; b7 g5 `/ @: q# D if (value.f0 == 0) {
, N6 D8 n O; y
& h c. ?/ L: c6 r6 u2 I% G% z ctx.output(zeroStream, value);$ i; O' S8 r8 y8 ^- _
; c6 U5 O2 \9 [0 }) I! e m
} else if (value.f0 == 1) {( U& H, E- O% v" ]
- g9 _- B2 j/ y% \ ctx.output(oneStream, value);, Y2 T y' t. E. @ d8 L. e9 f% b
0 d4 c1 n/ s1 g. J
}
' ?) I8 P5 X9 O5 M& C: |
. }/ S: g0 }2 ?& v4 W; p8 G }
' s' m. C3 ?- g% E2 M$ h
+ G: S4 `. o. ` L });
* \, Z1 M) F& H/ d/ W1 g* B& W1 r
9 E: N7 b& ]" a$ ~
3 Z# Z9 v) p/ K2 |2 u
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
! i0 ?) ]8 i1 l5 E
1 \: S1 Z1 Y$ P DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
9 L, t; C- d4 P
, u) d9 w/ g( A
' a0 ~+ I- z6 y- L5 r6 r$ C) b
) k- x5 b9 k) U6 M& g+ U+ B- Y5 k% l) T% J zeroSideOutput.print();
, H+ C1 M% C- [
% J* o5 }2 y$ X7 N5 ~ oneSideOutput.printToErr();
8 Q( ?: h3 ^% r- x9 x; d( S4 J5 o
! ]5 E! }3 D8 O/ {* |- ~
2 W Z8 P2 } h; N# \$ F% n) b. M, E) j- i0 F8 d8 W* F4 @
* Q8 M5 X$ j' x `/ e V5 l0 @
0 b+ m7 P4 p# Y1 b
//打印结果
+ c1 L+ W" Q6 c4 p( w) c! u, J) s, p' X4 B% ], P. r
String jobName = "user defined streaming source";, E* d$ z4 \; V
. n9 v) `; B- O6 Z% P' Z& S$ Q
env.execute(jobName);+ g- }' Q' V8 k! ~4 [
3 Z0 D( d7 E3 t( n! l- W! O}
. P- c" N8 r: `8 _4 x1 O</code></pre>
+ W5 m" ]( r4 T<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
0 i& C7 q6 b. F. M) l: @1 w<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p> T; a+ p; u6 y
<h3 id="总结">总结</h3>
6 `8 x/ b ^0 d& V; H6 T1 z<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>. m- ]% d; c) A- A. V% Z# Q
<blockquote>* {9 j: {9 h% F3 F
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
3 I2 W" N8 A% b+ E0 I</blockquote> g3 G' X4 y9 {; Y9 K8 d0 L0 E
1 P' B$ O% q/ x! ?+ S |
|