|
9 k9 ?8 ^& s7 I# @! W
<h4 id="flink系列文章">Flink系列文章</h4>
6 t* G. a5 K) R8 H' h& N<ol>$ t5 s% x5 L8 H& F) a: ^. s
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
3 M0 E1 {7 _7 k) A Q9 c2 S<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>. v/ I7 X5 i/ x" F9 f3 D
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>% [$ J, p+ K+ n% ]8 l
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>% X& I, W b8 |! L' @0 Z
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>, j% s4 i0 F: j7 Y2 G
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>( D% e, T$ K$ v- F6 L! j' I% e
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>4 S+ r; _! w/ @9 J/ A- J8 O& d1 z
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>. o E$ W( j0 j, W7 h* e
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
* ]- p$ V' K. B2 j: T+ V/ V</ol>: y9 s( P( ]5 X
<blockquote>
( T8 s3 i4 F/ E1 e( b( T<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
$ ]8 q! L; x1 y% y' l! d+ K2 X y</blockquote>
! J0 n% x* q* \6 i k! C6 j<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>4 E( M) G, O8 ~$ v m" E
<h3 id="分流场景">分流场景</h3> o9 V0 {6 M( u" } K" F+ Q" w( v
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
8 b3 F7 H; J* a. I. R<h3 id="分流的方法">分流的方法</h3>
2 r0 d. N2 O9 x3 k e( ^# k$ R<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
5 h0 f$ @2 d, l2 }& f n<h4 id="filter-分流">Filter 分流</h4>/ [2 ?% O$ b+ k; E. b ~: m
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>, b" J" F7 K4 R
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
3 ?/ W q& c0 \! z<p>来看下面的例子:</p>
- @0 [7 R s8 ?7 U C; |<p>复制代码</p> s2 L9 E' x6 `
<pre><code class="language-java">public static void main(String[] args) throws Exception {
# U3 k7 D! }6 R7 {1 [& r; Q StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); [3 N1 O& W/ K: u6 ~/ w9 T+ X
//获取数据源. Q( m9 u. Y+ ~7 ]5 K
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
% n) D6 u E; a" l* J, ~9 f9 O data.add(new Tuple3<>(0,1,0));/ g0 g e: f/ I
data.add(new Tuple3<>(0,1,1));1 [8 u) `: o; Z) B
data.add(new Tuple3<>(0,2,2));5 Y: x2 l' _! c4 Y" j/ Q
data.add(new Tuple3<>(0,1,3));
1 x2 Z/ ] g6 J% N6 K: z' c6 V data.add(new Tuple3<>(1,2,5));
' u6 h' W. i! l* Y" o$ K; ~ data.add(new Tuple3<>(1,2,9));' c! h$ J0 N' ~
data.add(new Tuple3<>(1,2,11));
" y; C, u8 x6 E% p data.add(new Tuple3<>(1,2,13));+ [5 o5 B/ O/ N
5 G* r& K2 f( B DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);4 n% ?1 ~' L+ ~
% z& y& t6 x: `& K* V; ^
6 h3 N/ _) U6 _4 J) B& o- d" f3 \" O T# {) T7 @
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);& T" A S" z6 C
) G" S# r/ ~4 ?4 m. G9 R
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
, k3 k# Z" P+ y1 D$ D8 _& Y$ [# I5 g5 m6 G2 I5 z6 \( z
% ]2 I8 \6 l& n" z* c
+ c2 a( Z( q& ^3 R( G0 T zeroStream.print();# T$ }4 l8 G$ H- t7 ` q, O. f% y
0 c( a, ?5 G: n% ^$ Y/ P& ~5 m oneStream.printToErr();8 a6 M. N, S4 f4 N- v; P$ P
* W7 X" c& c/ L% T7 [& ?
1 E6 Q C$ o" c& g4 r
- w% i @% o1 N# A# p- f# y U6 q1 R. [6 x3 P7 K, l% ?
3 s6 d o" U8 [( o, h" d7 N
//打印结果% O. L) i4 Y+ f ?, Z
+ b- k' i$ B; k. S5 b+ w* L ^ r String jobName = "user defined streaming source";) K+ S4 C6 N2 K: { g
2 B( T& N4 _$ U6 f
env.execute(jobName);
' U: c: T- Y1 ~2 {+ C K6 d! w, p2 i1 i+ D8 g
}0 o/ ^7 E! Y0 V& v6 n) A9 W
</code></pre>7 W1 f+ P7 z2 d3 M; E# T' P
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
8 |/ X7 ^8 e: L9 ?" f. X7 L$ Z<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>- ]6 J0 t! e3 P# N# V: ]; @8 k q7 @
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
3 m4 m9 B0 c! ~% b<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
6 y4 d9 Q# h+ C0 j1 K) C* W<h4 id="split-分流">Split 分流</h4>
9 r4 O) W f& N6 y2 R<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>7 P$ L3 q3 d8 `# K8 v, C
<p>我们来看下面的例子:</p>" g' ^( k( E2 Y. {# I2 Z% V1 _
<p>复制代码</p>
/ y0 S; g1 E4 E6 w( P2 I, N<pre><code class="language-java">public static void main(String[] args) throws Exception {5 I: k0 h3 R) i, Y4 ^
2 ~8 P, [6 h- c' x3 n6 Y
1 Q8 l" i$ ?7 z/ q! z& p! Y. k7 Q% k5 L. m: c" |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
) C$ C, }: J. H# B# \
e8 Y+ F% D2 c( Y* i //获取数据源2 O y7 z {+ W H7 b
6 U8 G! {. Z1 i K/ P; |! ? List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
2 t1 o& v/ E3 L. u# c2 g. n( `, y) i2 D% S1 W6 `9 n
data.add(new Tuple3<>(0,1,0));. G) }! `! W1 u4 u9 t
- M+ b3 N% `& K$ W, [ data.add(new Tuple3<>(0,1,1));
9 s. M. i# t0 h6 R Q, J
$ ~5 b+ C5 M+ ?+ o: H2 `' Q data.add(new Tuple3<>(0,2,2));
4 ~7 A8 I$ ]1 |
- j2 r& F n8 C6 }1 d data.add(new Tuple3<>(0,1,3));, V7 Q$ O p$ d- P# b7 J0 J
8 c2 C# Q( b R# D+ j3 Q2 F data.add(new Tuple3<>(1,2,5));
3 I7 m$ m7 Z" u! A# E* R! H) E9 E5 `6 u7 h
data.add(new Tuple3<>(1,2,9));. M o! v" {' P7 f; _- V
$ N1 K- E/ a; Y3 c0 P. S
data.add(new Tuple3<>(1,2,11));
. ?* S1 Y" u4 v& D' e& r' h$ H7 H! r1 u1 C( e
data.add(new Tuple3<>(1,2,13));
5 m! `" {" i8 c6 ^1 u: [- O) ~8 l" ^3 V
. f! k: O5 O( b! }; ]/ s) K$ S
1 _3 k C# j M9 X/ M' B: V: e) |3 n
* e0 w+ U7 r" U( L" |; Z DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
# m0 m ~5 N: l! w5 @9 p3 ]/ R5 o
2 ]8 _, k o: h+ |' W
( y! T0 N, a' ~) w- w2 F3 |
2 | F& _% S+ q: J" u
) G& A% A% u6 f! i; } D SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
: Z c2 j/ L7 o0 D2 u% C5 {, a7 p: S6 |2 x5 o5 a
@Override( q; o+ }9 H; \. c! c3 j
: N+ U: N. m: }7 K2 Y) ` public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {$ j+ h5 h/ K# C$ U) T
& F- [( x! O6 y0 @ List<String> tags = new ArrayList<>();: v8 p: d/ _* { W: y6 z: R8 \
* ~# s) y; r" g+ D if (value.f0 == 0) {
+ C* `. |8 p! F, Y3 M
$ r6 k9 H! C. N& q5 g tags.add("zeroStream");) _, `8 _. X4 U- V6 }- o5 B: i1 Q2 G- ]
+ ]* w8 }: l+ L+ q/ }4 C
} else if (value.f0 == 1) {
1 ~( U; A7 j& `$ H; I- { H! Y- w/ l" ~* w$ ?& h
tags.add("oneStream");
4 `( W3 Y; o7 z
: R7 |5 N& G; T1 T4 }! F, O0 N& s }0 |; S* P* }& e# A* g) ^
% H8 P$ R! f6 M/ g$ r, R return tags;
" j6 m) ?4 n4 R1 _: y& R3 u4 k t; {4 _3 w& q' X" A. M
}% w5 Y% h& n" s) F( {) p3 n9 C; Q* _1 O
8 k ?$ ~9 A' f2 G
});6 @6 _, ^8 R1 `1 N8 c3 s. V
1 M4 n, N6 O! n# i! r0 ~) H" E: Y# h$ p' z& u. v
: @' f: |6 W. \- N5 ^' h splitStream.select("zeroStream").print();
q7 \' z% W5 i& q( [+ L0 R! A& z. u- L8 y8 ~4 U4 Q3 m( f# P$ d
splitStream.select("oneStream").printToErr();) n6 j3 W3 V' f' I+ n L
2 i# q6 a8 B8 A; A7 Q8 A9 l
1 I. m9 m; ?3 \) Z7 Q6 q0 O' d8 }& @1 ^7 }6 H4 m/ ?
//打印结果" q5 M9 d0 G3 T' B
9 W; Z+ r8 @( c
String jobName = "user defined streaming source";$ s4 K; ]# ]" u6 u k
- ?) U6 [8 S$ p* \* u& F. s
env.execute(jobName);* n* _' J. I. q# T3 U9 [
2 z" t8 M2 E8 t
}
2 \4 X7 [8 ^ W$ R) P* `</code></pre>. b/ g/ ]/ y! ~& T6 A# @
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>9 ~, W+ W- g3 k6 V) t
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
% Q$ o& H: t2 B' q* ?* `<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>' ^4 k, V3 G Y- b
<p>复制代码</p>
: @, p& ~# _" a4 k2 \' O<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.1 s& }/ v- D5 a
</code></pre> L" l7 f4 k7 r8 C
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>) T# N9 r1 e* Y
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>& X8 I0 d/ f5 h* O9 P
<h4 id="sideoutput-分流">SideOutPut 分流</h4>
( M) Y0 K' G: E! H, Q$ l$ u8 ^<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>3 p2 V/ S5 K y- J: t1 P! b3 k
<ul>6 _! K9 Y+ c( X1 ?& t; A7 P9 K
<li>定义 OutputTag</li>
6 t' W v9 q4 n& C, K6 B3 T& `<li>调用特定函数进行数据拆分
& O1 |2 g7 g3 C2 J3 n5 K+ P<ul>8 `. D$ U' A/ D- r$ w" `1 J6 h
<li>ProcessFunction</li>
9 D7 X- t8 a' N! s/ U: h! C<li>KeyedProcessFunction</li>
. i: h/ L) y# n, \$ j<li>CoProcessFunction</li>
! t; R+ g! [3 e6 V$ f C. A<li>KeyedCoProcessFunction</li>
" ^5 f# h& E, c" W3 N& @$ f2 \<li>ProcessWindowFunction</li>
9 {, e8 g9 K% K. {& A# M2 _3 @<li>ProcessAllWindowFunction</li>7 q5 n3 t: R0 o4 w3 e* W
</ul> u! i- D! r; \ l8 ?2 W5 E& ?9 G
</li>6 m n8 o- n$ p# o1 X Z, f
</ul>
0 T9 }! y4 k9 i7 g<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>, @5 z. ]+ d! E, @2 P+ R; c
<p>复制代码</p>
; f$ v* `- Y$ S7 D- u9 N<pre><code class="language-java">public static void main(String[] args) throws Exception {
( c0 d6 l6 n) I1 @: a1 a* Y- X1 c) B0 [% f
) P* e( _/ U2 _, F, E1 _8 }. z4 k# l2 ~# z* l0 ^2 ]
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();- { o8 H: R1 c* l; l. u. G2 x
4 J$ b8 R5 R7 d t/ ~, { //获取数据源
, n+ @$ G( u/ G
) f0 g; ~# r' @7 s/ j List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
9 U% |1 A% T' A1 t# E, ~7 M) n& f! X+ P6 A8 b* Q9 t7 F$ _
data.add(new Tuple3<>(0,1,0));4 ?" [/ A* `, l- R' } _5 [
0 j2 n1 M! G2 Z/ W+ A5 T
data.add(new Tuple3<>(0,1,1));
" T) M4 E N" k, P( m) m8 R$ O( ]3 F
data.add(new Tuple3<>(0,2,2));3 i( p! @& u3 F: Y. f; L
# w- ^5 S/ h, u
data.add(new Tuple3<>(0,1,3));
. r3 q" P% D0 }" v
1 Z( g( B( `7 y# B6 P) k( L data.add(new Tuple3<>(1,2,5));
- o% j: q% h; c9 F H2 [3 I, @: P% x( u" r. d
data.add(new Tuple3<>(1,2,9));5 e1 f, Z+ V5 W/ T( d: f
, r% B& O* \) G; V; L
data.add(new Tuple3<>(1,2,11));
( F+ l3 W; S l9 @. e: @* J: B4 P+ Y; m* q
data.add(new Tuple3<>(1,2,13));6 v7 U: h0 ~8 E, C8 i* k3 Z
- H# w% R* r! u% y" q( X t7 H, o! X. v
$ n# i& ?) n, B% ]6 c
2 q) y0 u( S3 ^6 @: Q3 [: X' |, G% A* ~3 K6 @4 J1 @
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);6 N8 K0 [# b9 c
' Y4 ?$ h$ F" p+ F0 F' i2 S( j3 ]! R5 o1 M5 i& g
" g- e/ b0 {' K2 [9 E# v
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
# W% L ?$ p8 q5 ]7 e( K* t' ?2 e
+ N |; j* s! Q) l J2 I& \ OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
* L$ E, V- p- h" K4 G2 ~
' K) k' h8 D0 A( U
3 ~% r- X$ d9 e J2 I5 _
* I2 Z1 F) z, F/ H8 ^( p2 V/ y! a7 t" x+ z8 h
* @8 R! B! K8 s7 q
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
( ^( N4 v3 \/ ]% x t( Z8 e3 w
( C0 O; d; h3 y( G" e @Override' Z+ F" k& n3 B) v
' L b. e0 B8 r' U0 q1 P
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
, X- ?" ~1 [7 u: e8 L8 H1 y7 N3 |( b' @& @# d7 L
& T) @# P4 d9 S: e' J
2 n! ^: T# o* r7 j if (value.f0 == 0) {: Y. o4 |1 }; _# T
# E/ C5 p+ c% Q ctx.output(zeroStream, value);
* K, j$ {5 R! M9 \5 u E) R* L5 t! B# L6 G7 o& p
} else if (value.f0 == 1) {
% w3 T: d& P" ~, C; I/ @* a7 H& R# D( |7 F; |, N
ctx.output(oneStream, value);
& ?3 M0 L, u8 R+ e' _. K
8 U4 s2 [: c+ b$ U }
4 W3 k- V: @5 ~8 U% w1 L# r2 M% ~1 L7 n
}: _, f5 O! q! e
0 w( `* F( h: n0 o. a {/ ]) U });
- D* I' Z$ H/ B) @2 j2 s( V9 c) j2 V% X2 N1 t8 w+ J! ~/ m O( a) X
1 }2 k9 t* R x
/ Z' V) ^. `, I1 H. k& `1 y0 W7 ?
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
3 N1 E" j! Z5 N( X, e& ^! [( o" U+ ]; d/ F5 O# N/ e5 O
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);0 M! a. o. i& n: q& q2 R# J2 o& A
% ]( I# _/ y! f
: n2 h0 y1 t- b
8 {. X) b' G; B" |7 c: X# e zeroSideOutput.print();
( W8 n2 E, s) R7 M* d
8 ^0 f7 y3 K* j" |: m1 a' ]( u- [ oneSideOutput.printToErr();* A9 f' e. {; f( A7 p' ? B: S/ M- u- w
/ t" D' L% q P% R. V! ?
* B# C6 [% Q$ J' K7 m
/ I3 Z+ o( ~! q' `* ^* S1 L5 p3 n4 C, [+ z' N+ t5 O* |/ V
& V. M1 @# q- R //打印结果
* K) L: o( E% g- x. n- ^) m) {. p# M) @: ?" t
String jobName = "user defined streaming source";" T, T* l: E# s0 h: C1 p5 K: f: B
! }& P# Z. T3 Y' z8 ~ env.execute(jobName);
4 S! R8 U8 q1 }2 m0 C5 L2 s( [5 m3 V/ Q
}+ x w" Q% h) o/ ~0 F8 a( Z! ^
</code></pre>9 f' @3 ^ q- ]* z4 r$ a
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p># ?$ Q) X6 S0 Z; d+ w x( o/ ~
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>2 ~0 g& v* W0 v
<h3 id="总结">总结</h3>' a# `, h3 E; F. e1 M( C* R
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
* P6 s9 s- `+ j, M( b2 I<blockquote>
4 r5 g+ l$ C2 i. ~; P2 R/ b& Q<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
+ C. g. l- _: b" S0 {" M) N5 z% v0 x</blockquote>
: }& y9 C1 i M! Q" Z
+ y* C! N9 l5 q$ S$ e6 L$ } |
|