|
|
5 x* X- @4 |" f<h4 id="flink系列文章">Flink系列文章</h4>5 m0 N7 b0 w' X8 n6 U7 s1 W% n1 g1 z5 {
<ol>
8 v7 A% f T5 p- a5 j/ N; X<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>: B4 {% Z; z/ P1 A
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>4 J. p% m: Z9 J1 q
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
" Q; q( ~" F _2 U! V6 T+ B7 a<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>" E0 C, x" R5 K' R1 o
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
0 T$ t0 k: s# M$ x. S3 ?1 m<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>) z: h$ I$ Z! O; k, N2 ^
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
/ V# `, K9 B: n/ k<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
1 r8 }: R; z4 m9 D$ ~. ~ @<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>6 I; H! M0 S: C) t
</ol>/ l8 e% E' A# i$ `
<blockquote>" A$ T7 a: F& K! z- Q
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>; _7 s+ T" I: v/ W6 N3 I
</blockquote>
, l9 N5 q1 [+ j( U! r<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
2 j2 e- T) h$ b- }0 q7 n<h3 id="分流场景">分流场景</h3>
) _3 E$ V9 ]4 L- \ b. G<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>: U" ]# r/ X5 |/ t4 N5 O
<h3 id="分流的方法">分流的方法</h3>
1 d( s0 N! d+ ?0 p# }<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>4 b* X2 ?3 \7 `$ K
<h4 id="filter-分流">Filter 分流</h4>
# P g+ }" `( S<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>" ]8 Y9 ~" N6 M6 y: A0 c) @' g; N
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
. {3 a- }; Y. r# v5 P [1 Y<p>来看下面的例子:</p>' X% g: O/ a1 b b8 h- j* N4 y
<p>复制代码</p>- s* H/ q2 g: d- R
<pre><code class="language-java">public static void main(String[] args) throws Exception {
4 t( G' ]% S9 q+ |3 t StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();# _6 o( \: e+ q
//获取数据源5 C; u. Q) @& j" k3 ?
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
9 V/ [" \' \: q data.add(new Tuple3<>(0,1,0));
0 _/ [# [" ~# q' R, } data.add(new Tuple3<>(0,1,1));
# k9 ^- L2 [$ g) ^# ] data.add(new Tuple3<>(0,2,2));
% [6 H6 x* U" f* Z" n, F5 D data.add(new Tuple3<>(0,1,3));
1 v/ ?' x4 v0 R6 r& a data.add(new Tuple3<>(1,2,5));% B6 l' W: v- _1 f. w
data.add(new Tuple3<>(1,2,9));( R1 O) R2 Q, L5 u
data.add(new Tuple3<>(1,2,11));
3 s* a# ^" @' E" ? data.add(new Tuple3<>(1,2,13));9 b! E! ^/ j% _3 V% _# t
9 h6 ~8 f2 X' j4 v4 T' A8 O
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);0 m* O1 M; q( [) l% J5 ~
* ^, x @! r# Y& y# f5 U h
) A q Z' `# J% H
# O5 W0 c) S' {0 Q4 [3 O SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0); ?- V' T( X) c. Y/ e( H$ l
6 U" a- \# K9 X+ O" @' w
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);. p* z1 x ~+ a. G3 i
: R- d# X! z$ v1 o
0 i% ?& o8 r2 v- L* ]
; T# D( L R3 e# J8 q2 b zeroStream.print();
8 a: p4 d* o: N+ v6 K, L. W1 c! q P- |3 u
oneStream.printToErr();
, F1 W7 Q c: X8 c5 f# n
c. h' |. W% [$ `6 e8 T+ I, G: h" W8 c6 E5 o" R7 p; s
. e9 {, Q9 F" Q# i( Y! a" ~: E6 ^& y$ a
7 Z* C, {( @# [1 t) p
//打印结果
" H2 _7 v w4 v7 F# v# ]$ c1 R8 l6 T6 C( c' r
String jobName = "user defined streaming source";6 ]; {( j2 y3 T- q& S9 i
1 @! C9 x0 [5 I; c5 Q: @2 t$ _
env.execute(jobName);
, N+ @8 S; Y5 m9 G" H
4 l8 y2 m$ B6 k}2 Q- h& H H1 e3 r9 ?2 q* R+ g- B
</code></pre>
3 q8 I( P3 t# g& I2 l6 @( s<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
- X2 j6 R) C. r1 ^8 Z# `% J& G<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
# v+ d: s, Q+ g$ W+ z1 N<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>4 ]6 A( s! a/ L4 _
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
0 U. z4 G6 ]6 M( f<h4 id="split-分流">Split 分流</h4>% O# B& H( K7 h8 q8 Y7 `# A0 h: |
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
2 L* j: w6 o8 G& m( l9 m8 A* @<p>我们来看下面的例子:</p>
- p) Z, S$ a( n# V<p>复制代码</p>
7 W4 U, R7 R) z<pre><code class="language-java">public static void main(String[] args) throws Exception {2 d+ c2 L) W' O% Z0 }. J4 z
, I" ~7 j1 i" z) S6 J. E1 [# c3 Q# f4 B) W" [4 N/ s. N1 R
9 |. j0 O2 U9 V( t
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6 \$ n+ T$ n, o
7 Y Z9 A4 x, ^9 p; r! | //获取数据源
8 C `* a6 ^% w* [5 L% Q* z/ ?' b' y( T K. k3 k
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
8 X+ A' j0 i* e ^: W0 h- }! q4 p0 s/ l. v* G
data.add(new Tuple3<>(0,1,0));8 J, q2 m1 _; v( s, P) u
' Y: f& ^/ S2 H) b data.add(new Tuple3<>(0,1,1));
% O( Z3 a) d* m+ ~/ s6 I9 K# ]: w
% p- R& Z2 I/ N( b data.add(new Tuple3<>(0,2,2));
# y' r+ X, R$ e1 c3 h0 m, E
! u8 |4 r. F* N) j/ X data.add(new Tuple3<>(0,1,3));
& D" p3 o) Q% j! H0 e, F' w* q$ N" Y) M6 {/ r- k# S: L
data.add(new Tuple3<>(1,2,5));
8 Z7 x! o. u8 O; V I3 p6 A/ A# m( y: s6 ~8 U" d6 A
data.add(new Tuple3<>(1,2,9));( x- q- @0 n6 Q2 _; ]2 H
8 N3 g9 B' }9 Z# {
data.add(new Tuple3<>(1,2,11));
2 k i t5 K9 L6 O
, ~ N2 y! W5 u: j data.add(new Tuple3<>(1,2,13));
/ l2 h( ]* _5 `$ y3 ~3 | b
7 f3 f+ j! ?2 o7 s/ x
( U! M' c' E9 ?4 o1 b0 S Q
+ s" z- O7 ^5 e9 }, g; @0 V
i W, f5 A5 P
. Q$ i S& j/ @# t" S: T# T- p DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);& _; j* ?$ `: G- I$ e1 ~
! Q0 c/ e0 e ?- A/ i! c8 ?6 e
: C5 l2 b# j$ n; e' t* F2 Y0 Q
( z$ v& c# _( G
9 f4 A9 w' @, s9 i+ F
# M- v! g- ~9 }8 @ a7 N5 J SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
w8 }9 X& a( X+ C& _" L# U% a
& _, u$ q" E7 Q4 m @Override
, s- @ e* P9 s/ T0 [$ J" S7 e/ t m) w6 D3 W
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {1 Y( ^) x/ P g; f2 y- ^
2 p" p4 x/ `7 T5 Z5 h% @, a* ~
List<String> tags = new ArrayList<>();( x6 O7 a8 K' N" G
& y; g& U1 R$ h _ if (value.f0 == 0) {, s$ w) Z* p, w
/ R3 S7 _1 l3 n; Q6 v) }8 i
tags.add("zeroStream");/ i! U! s* k$ b2 Z/ o
2 r/ k8 }' k' K j } else if (value.f0 == 1) {& i+ @1 }, }- E. x
# m) Y5 @3 L# k! f. P9 z4 }. Q: J tags.add("oneStream");
- h6 d2 W2 S8 ^, r
% s( V1 F: }. C4 k4 U! Q8 I }
" h- q. [+ n2 u3 {8 G: Y6 I: O% e" n
return tags;6 j3 I6 e9 ]! }
+ b, X* a% _ H }
- G& J1 s, Y: ^) Z k8 m+ x) d% c9 H
});
8 K3 r. H- |/ w% u6 ?+ m& O! ^6 I: Q' i2 D; d
; n6 C, f; [% B5 I o- d
& z2 B6 v" v2 O4 x
splitStream.select("zeroStream").print();
$ D9 s q# y3 M% H" O
9 v9 U* F" Z8 b' r, P- R splitStream.select("oneStream").printToErr();
8 W& e; w5 l+ a( Q. d6 d0 @
" F5 W N) [6 V/ u7 u$ M" L" l
) Q; D: z K% O+ N
+ S" K8 e7 M( b; C7 i/ Y5 u: ~" Y4 q. u //打印结果" F# k: n4 H, E5 n
8 k) y- S5 _: a$ ?0 K! p. n
String jobName = "user defined streaming source";
7 f" J9 \' m- P0 G7 M6 l: \5 C6 ~4 W2 z2 V9 ?
env.execute(jobName); v- j. c4 T! x0 H# G) R
1 v0 Q2 P l+ W# a M}) V1 M& e3 l1 A
</code></pre>9 D6 j. ^5 U. j1 ^; S3 F
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
% T4 i- Z3 p7 H7 H<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>3 D; R2 p( C" f5 z& i& {7 m+ k, W
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
4 x$ z, q: A+ Y( T! D2 H<p>复制代码</p>
l% _7 [/ I: T q; `# B$ D" q7 y<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.& U2 n3 i( e) ?) C
</code></pre>8 S$ S, k" x j$ n2 N; O
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>) V1 R2 s" e* z, j' m0 p6 M- S
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>* _" A9 y0 ~% S% i
<h4 id="sideoutput-分流">SideOutPut 分流</h4>- k& [- {7 I9 A$ g/ V# _+ v
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
6 N( n; J7 P! P+ Z/ c& k+ d<ul>3 }0 \: e( h1 _7 k m6 W
<li>定义 OutputTag</li>
3 h5 c6 q6 @ T0 m9 e8 E1 }<li>调用特定函数进行数据拆分
5 M& Q1 y+ J7 N, G; \, o. k$ [- Y<ul>2 i* g- b% @* {" e: _8 `7 ]
<li>ProcessFunction</li>$ z( \9 o/ e/ b
<li>KeyedProcessFunction</li>; S* D# D, T3 p7 a T4 d2 I; E
<li>CoProcessFunction</li>6 b6 r( g2 q6 Y* Z$ `6 J; @4 c
<li>KeyedCoProcessFunction</li>
% ~: Y9 f" y ]& m2 f8 p" ^1 f<li>ProcessWindowFunction</li>7 q4 K# E; s H. X
<li>ProcessAllWindowFunction</li>
( q! H% U, \) S! y+ h) q+ d; x</ul>8 `8 h: O O! X+ J( i6 _
</li>
8 c ~/ Z- X+ D; q, K# [</ul>
( t; ~) R; |: ?5 {1 n<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>( j2 x0 m/ z5 L
<p>复制代码</p>+ J- H k) z$ o: [
<pre><code class="language-java">public static void main(String[] args) throws Exception {' m. ]4 Q/ E* ~+ Y, [
5 t* v- Q/ l. m. n) t+ P# X4 V& {8 ^" ~
* v. L* x9 m. {& \) {: Y4 E4 }# u4 K
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
) Q4 b3 I6 r- a. Y( b' A, F! V, K( x+ ]! W3 p
//获取数据源+ L4 s- t0 T8 c% Z& N& I
5 D# e) W: g" B* j, b2 B: ~8 z
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
( I. s8 V! R% J- u T, i! Q: b- a! {( G: O/ ]9 S3 c" V. f9 z
data.add(new Tuple3<>(0,1,0));
/ ?# K. |& D' c+ W! a* t* I& m3 A6 F! d+ K* x/ |4 z
data.add(new Tuple3<>(0,1,1));1 E+ D1 X" G5 U: B
: V$ N& a4 h9 n5 P
data.add(new Tuple3<>(0,2,2));& B. s; u( |7 Y6 t% n5 t
) L" A7 v0 B, c m data.add(new Tuple3<>(0,1,3));9 q8 Y2 t! p9 y4 d j
( H, v* i5 u+ M" J
data.add(new Tuple3<>(1,2,5));
7 B, e& n8 I% U4 a, @- i
8 L! F3 W1 F8 J5 u! I/ y5 L1 { N' } data.add(new Tuple3<>(1,2,9));
8 d- B/ I- Q( U1 N8 w7 ~, [. J$ M9 Y- s/ c$ k4 Q( ]
data.add(new Tuple3<>(1,2,11));
% W( _! `9 \; P: ]0 G. [
5 X- ]% m" L' n) L+ Y) ?4 ~1 m* g g& ?/ } data.add(new Tuple3<>(1,2,13));
2 s% k$ L5 e$ r, O* l6 _ A8 R% |$ ^* L4 l; J6 p1 L
" K: Y& N) e. U* T ~" c. l$ i; g2 x: C$ l( k5 E, j
7 |9 {* B& [6 E Z3 E
* L3 d% ~! f | DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);1 ^1 R; _9 ?! a5 c5 {8 L+ q' z
, t* T* ]& Y# b) T8 s: p' w* t. c) t% `2 R( ~2 X
* k0 M m1 e2 b OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};! _# f3 e% ?- X5 c, `5 M
' b1 b) j) P1 E& Z8 J
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};, `6 h B' l, f0 G) w
+ F6 O, i! Z% R6 F
6 W) N: r% p2 E& A$ E8 e. o2 Y
1 r( \! S$ X4 U9 E
+ i: [6 [. t0 h: l( e6 B4 s- V2 M9 T& n& M$ s
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {: X9 h! {- p6 {! q0 u$ P% u* B; a' P
: w+ e) `% x& F8 o/ @: R1 D
@Override
+ v- e: |) i6 r' _! Y; g9 M' M: E: q7 }; B& A& ~
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {$ G. Q# k2 C) ~' \
) W3 }, ]" Y+ _% i' u: m
6 A5 p! j- G7 Q* t" q2 G5 E i. U" |0 |5 `: g8 A) Z
if (value.f0 == 0) {, r/ F8 u. q0 \) t, W
$ O W6 F2 \7 M) ~7 L0 Q/ L
ctx.output(zeroStream, value);- n$ T! {. m' \' U0 P; F7 ~ i
5 }* k; F+ {8 [
} else if (value.f0 == 1) {# T0 Y4 u$ O2 \( J! n d( H
! V' ~! b+ x/ w' W% o ctx.output(oneStream, value);+ h, l* f9 ?& {0 a3 c" p
1 n8 g3 ~/ L5 q+ N0 U' Q9 I }# l2 B" Z |( S( v
" D7 E J5 G2 M
}
4 H8 E6 F4 s6 O8 r6 Y2 W% n" @* ]+ \. i$ z; u
});
+ X% j7 i# { Z, q0 h* x3 u$ ^/ K5 X& ]2 O7 p8 R' s4 w9 X( e- U
& r! Q h" c0 S" H: C! Z
! t* x5 b; B/ _. a' l DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);# o5 `% v+ U2 A
4 H% G' o) f0 K e DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);6 ?. k# B$ b1 f: n8 T7 L2 ?8 K% u
1 Q1 j1 j0 e: R* n% M- s
$ B" M1 P% \2 }" G% @
9 }- _* U) s3 K: a X
zeroSideOutput.print();4 P6 ^! q' ~2 C1 `6 u F( c& F; P
0 {% [6 G; r' M4 r4 ]5 G oneSideOutput.printToErr();! `* W) _- x5 h0 d$ ?% l+ k& L- l
5 R' q6 F+ }: h' `5 F" l, o) j+ o
4 _" @" A: `- z9 O ]" u& @* C4 c
/ `0 N$ l. h% o4 `
5 j. y! I3 r( N/ F( S8 X. E) ~% @2 k3 k, d% D+ b' X0 X* j* h
//打印结果# s4 r; ^/ ], Z) f/ U/ m
- O" l5 l, n! n String jobName = "user defined streaming source";- W$ P) p# s; n+ q9 B
3 p: o# r( X: k( M+ s( W
env.execute(jobName);0 y, b1 f* Z3 x$ i
4 }5 o8 [" n1 L" q% G}
- d1 n5 Y w' D- C2 w, D</code></pre>2 Z7 p$ |/ ~2 @; _ @' A2 V
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>. V$ M7 s- A( X0 x
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
( T/ g& k; H, r. Y' X<h3 id="总结">总结</h3>) E5 d' l& o0 L$ ? p. j. B* p
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>! x S5 m1 p( R$ V/ p- R9 O
<blockquote>. Q: e* A u% @8 A
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>5 [1 I B/ s3 I; z( u d4 o+ Y2 f
</blockquote>
4 o, o2 {+ U) T0 F$ @9 W! Q. ^! I& b" ~( Q% |
|
|