|
|
- B( @ ]2 i0 Z<h4 id="flink系列文章">Flink系列文章</h4>
+ v2 C- r# @9 R9 ]) T& L<ol>
7 _6 K- y% ?2 Y( u) [<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>7 e% p) R4 ^9 |0 P
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>, w' ?# b9 v% _0 d5 N6 _. x
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
( F1 _$ Q( i8 I. U8 ^ V3 n' S<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
5 ]( n' U. A6 t4 z<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
; D7 E5 |+ S2 a4 } E<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
4 y P5 H8 D9 H( Z+ u) }4 d8 x<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>. _% h6 ]! R* h! p/ B7 {- r
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
% \# D7 C7 E% z, w# M# _! M' s8 F<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
+ M1 i, G9 _8 N6 f5 d5 g</ol>
j( K9 {2 Y/ H; a7 b<blockquote>
" h9 B) Y2 q4 [: R& G* g- A<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
7 y2 M6 P c5 S+ I</blockquote>3 o" i2 @5 R: X. u3 c( u9 H
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>' Z0 u; m* t, P9 o- J8 H9 I- l
<h3 id="分流场景">分流场景</h3>
# R4 _9 n* w6 D( l* M<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>4 O6 g) _1 l: C0 Y
<h3 id="分流的方法">分流的方法</h3>0 S- r! F' F @; v, `2 W
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>. |0 ]+ _) \7 I0 W2 H' F
<h4 id="filter-分流">Filter 分流</h4>
8 }& ^9 {: u2 ^<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
* p0 x3 P5 E7 C4 s) B& `<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>- G6 b0 f1 v! @% O* w
<p>来看下面的例子:</p>' h# w) L Z3 k
<p>复制代码</p>8 B) E( P V8 x: Q4 p& D! @
<pre><code class="language-java">public static void main(String[] args) throws Exception {
# p; p" Z3 s* X6 Z( h/ J7 E StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
4 D$ }5 P- i" G" f" D //获取数据源. z, i5 @# c: u" i; }
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();3 x/ [/ b0 V$ i: t, U. q
data.add(new Tuple3<>(0,1,0));
% A: K: T2 H- Z \ data.add(new Tuple3<>(0,1,1));
; f& C3 n7 ]: f- S# B& p data.add(new Tuple3<>(0,2,2));& t& @# V$ e; `0 _ }) P, }0 F4 V
data.add(new Tuple3<>(0,1,3));. X! X# ^" B% b; W5 y: c" f
data.add(new Tuple3<>(1,2,5));: r% l4 H) G+ S" i' f
data.add(new Tuple3<>(1,2,9));
4 O+ c. s) V4 ^: |8 ]. J1 \2 Z data.add(new Tuple3<>(1,2,11));
7 O5 S0 o' z5 n+ p! |1 Z data.add(new Tuple3<>(1,2,13));
! [6 l& X- Q. I l- o5 ]! N: z1 W* b3 w- T
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);, r$ i3 n5 S1 M; R+ ~8 _
& G/ W# f5 f) q
. o$ j8 d5 B- H: g( R' G9 C( ?
W; i( ~+ J* ] s: a5 ~% \ SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
$ F3 ?8 N: C+ T, m: E. h) h
& `, K9 h3 F/ R4 D SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
" U& L$ Z7 k6 q% \( N3 r# Y% q$ E6 J! U ]$ `- R$ Y g) [
1 h: P3 @3 _+ l; o, t/ Y& I7 U1 s7 o& ^
zeroStream.print();* `* p; D4 F: M0 `
7 h3 L" S% c; j/ L2 H oneStream.printToErr();
3 y6 g' N# ^3 h3 ]
# ?1 A# N9 R' u* h
& b7 q! O" ?1 D0 Y5 O' p- }6 l0 C0 h8 v; S
8 ^" R" t7 w" ^. A& a' e' `4 I
' M/ Q: l B( b( r$ ]8 e //打印结果
8 o Y8 M3 y4 F: K- _' k, u8 A3 @( h5 Q* i# N* \# W6 c
String jobName = "user defined streaming source";
8 e, o2 n- G! d8 E+ ]" B6 d; E% f0 }7 ^% X- A
env.execute(jobName);
& e7 m7 N) g7 s, k$ p0 O
- ?5 K( i% n% Y. f. e}
' R! R) m2 E$ Y2 j6 k) u</code></pre>
- X6 s, P: }; P4 e7 U<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>7 P, V9 ]! Y0 U1 n) N* r
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
& r7 h, p' C5 [& i7 P4 g- v<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
5 `2 ^4 o# x( ?9 \; o& O<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
/ H, y& X8 Z9 m+ Y o B" X<h4 id="split-分流">Split 分流</h4>
1 S- \6 D( m! }" R* W<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
9 K0 r. N# L3 V& G<p>我们来看下面的例子:</p>
- V3 t. f% Q' d2 {( E( R<p>复制代码</p>
: i$ d/ e1 y p. S<pre><code class="language-java">public static void main(String[] args) throws Exception {
+ M5 V c, y O7 a1 |# @- O! u1 d6 ]$ q
' j: L! _9 I/ \
# a4 ~) ~6 X, F- s% i; b- l9 J7 n' w7 [, L _
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
" B5 p& W4 Y: t/ r: \, o4 q+ y
/ z2 i( C; R/ i //获取数据源
2 x5 I5 c9 ?5 n3 ?6 @/ c# Q8 A6 O' o/ r
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
8 U o% ~4 {, [4 m; f# e7 W$ ?. b- ?, e- I: V) I
data.add(new Tuple3<>(0,1,0));
0 E; ?# D1 U" L" ~! k: D/ ?& T8 \6 k
data.add(new Tuple3<>(0,1,1));
# V( o0 a2 C) A7 x
. k6 Q* K3 v* B( M9 s: n6 M% q data.add(new Tuple3<>(0,2,2));( x: u7 P: j0 y; n) b! Y+ h
1 u1 T% G) z* M data.add(new Tuple3<>(0,1,3));
. J; y3 O$ Z& f
8 [2 L& v5 [' [9 Y/ X( w2 g data.add(new Tuple3<>(1,2,5));& L5 d* _9 R2 b8 j: v
8 z$ i' v: m! o |; [- R8 z7 _ data.add(new Tuple3<>(1,2,9));
9 x( p" c0 Y% G# _" m) i( n( Z& _. Z. _
data.add(new Tuple3<>(1,2,11));
( m" w; ^8 F1 k! B( s1 ^) w2 R/ W7 |, w: W; S3 A- e
data.add(new Tuple3<>(1,2,13));
% _0 n/ R7 ~4 ~6 [
7 w- Y) Q2 r" e. ^* m9 g# i
6 @% s- C$ w9 h# p
& Z7 S. `" }7 N% O3 v8 a1 p
; k q# ~! \! T$ `" l. W4 z* u5 i) Y2 F
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
! N$ n q4 E3 ^/ R6 H4 T& S+ I' F# a- t( _( t
+ P! @4 p& G. B( S; i
* \* x$ @; e& _5 @7 ~( w6 I
* h- J o; Q+ D4 ? S
W# U$ W7 R( U- ` SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
2 l; k. L, v% E& j& B+ E8 u, f+ y1 O. u, G% S- Q# E
@Override
7 o% D/ i2 k9 i2 s/ {* W* r# T* u% a- d; S) g& {1 u% b0 w
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {8 G* c ^8 m( Z z t* B
- u G) r# y9 h9 e) w# z
List<String> tags = new ArrayList<>();! F. G4 B$ z& B* c- a
! }$ P6 ]. _4 l6 D4 e0 Y3 D9 S
if (value.f0 == 0) {6 p7 f9 ]' L: ^( y! R& n+ k& j
0 S0 b& |5 v% s3 B' W D
tags.add("zeroStream");! U8 \) c/ h2 N+ W) `9 T; K
, n5 d5 f e2 d# g: A& A+ y$ G% |
} else if (value.f0 == 1) {, K9 Q, k, ~# G( G1 J
% b* \2 D3 N/ } tags.add("oneStream");
* B8 z0 i, ^& X+ ]* M2 L8 X3 o% h- X5 ]' e( u" v
}
2 T" B* D d; W% q5 X Z
: H6 Y1 W/ I0 w/ H return tags;
" K* _7 M! q! [) s9 ~
1 {/ \$ B- [8 i, [! p }: c9 W- _" ?+ Y& G$ B$ }' G0 p, m
! {$ x4 }- j0 v7 H });1 V4 K# S7 s" `% I- \
7 U& R, h. z: j
) x' M& R/ \4 O" [4 ~" {$ I# y! D. G9 T6 t* a) b& d
splitStream.select("zeroStream").print();
/ c9 m, A! N1 u
" t# F# L4 _6 X3 ?: ` splitStream.select("oneStream").printToErr();
. x+ ~1 {* R$ c, O
8 i9 x9 G- Z! ]& |
) z; Y" [0 w, } q6 V" G
4 i7 S/ Y7 f, x, g //打印结果
/ a3 B. t' @- R2 L' b1 v, P1 A" K+ c9 ]) T
String jobName = "user defined streaming source";
8 y3 Y8 c% l+ ]! Y9 J0 h) L) E
, r) T2 n3 m1 r4 |) [9 R env.execute(jobName);- y; b! Y3 h8 T8 @2 V8 c5 C: `, n
" M) A1 T$ j) E6 W! Z9 w3 `
}
1 c+ e0 O7 X* g: X7 x$ s* A5 U8 I/ ]</code></pre>
# ^+ V0 L! x8 X9 X5 B<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
2 w$ I5 \" H( q i5 M2 f, Q/ z<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p># p1 k8 Z% }3 Z
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>% u9 A) |4 B' I
<p>复制代码</p>, `& N3 r5 F( l- v
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.+ P) x! x9 [$ g( m1 Y) V9 F
</code></pre>4 _( G- F( e! R! D: [( I( w
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
2 x& j2 o0 W! Q$ p<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
0 k3 q3 n0 Y0 O( }8 H8 _<h4 id="sideoutput-分流">SideOutPut 分流</h4>
* |4 v% J5 `* Y8 V<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
4 D8 ^, B. N2 d<ul>
. |% N2 a" f) w7 h+ ^# Z5 i<li>定义 OutputTag</li>7 ~ m- c1 D+ Y/ U9 }
<li>调用特定函数进行数据拆分& V+ d0 H; j* K2 j! e* w: S d/ d
<ul>
$ a, H8 c1 c( m+ D9 T a6 W<li>ProcessFunction</li>- q" D& o R% B$ T
<li>KeyedProcessFunction</li>0 W5 P6 I3 Y* B0 L
<li>CoProcessFunction</li>
' d0 B* {: i* ^<li>KeyedCoProcessFunction</li>4 Y& S4 @ m/ H. p. g! F9 n
<li>ProcessWindowFunction</li>
* m% {, p. z( N1 o; l<li>ProcessAllWindowFunction</li>& e3 w, I/ k* Q: c2 X) o* \# P
</ul>
# r' w6 @% w, D; b/ J</li>
. D9 x0 c2 K0 C* u, t7 B</ul>
& s& ]& X. E% I/ W2 y<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>7 }- ?* c, k* f* r
<p>复制代码</p>
# l# c" r3 w+ K0 l6 Y" O/ e<pre><code class="language-java">public static void main(String[] args) throws Exception {. h4 w; ^, ]& W$ P+ d
# w, h. ]% z$ u5 w* Q( @, ~3 T' H9 T' X( b$ C
+ ?1 S R& i3 c
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6 V8 l6 h# m1 f. `5 u, I; a
2 j0 N( s* _0 H8 K% y& R8 m; D# p //获取数据源
5 W) J b- `+ C1 R8 K. J0 @$ s6 k4 L
6 Q5 u! L9 J( P5 x3 { List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();! g1 B9 {$ p+ W4 \6 V) p2 O
9 t$ _% ?- O6 R+ w+ b6 n6 f
data.add(new Tuple3<>(0,1,0));
( v3 V) p7 {3 t4 t
# @% E) D! Z/ D( Q5 D- X f data.add(new Tuple3<>(0,1,1));
' R# m/ h% s! O( h' T: c `
, k% H6 N$ E+ u5 C& |8 f data.add(new Tuple3<>(0,2,2));1 `% C% ~ W7 L/ e
1 g- O8 X/ @6 A6 U7 b data.add(new Tuple3<>(0,1,3));
: n* l! B R: B+ \6 x9 ~9 |6 P( }5 G& M- q
data.add(new Tuple3<>(1,2,5));/ X( ]# e; y# {
4 `2 D& a, H7 W! R
data.add(new Tuple3<>(1,2,9));" r! h/ @5 u8 I, u4 _2 h, t1 W
: V/ ^0 p( C/ B& ~9 K: S
data.add(new Tuple3<>(1,2,11));$ P4 v. ? x# j! h [/ N, N# O
! x$ G& Q" E+ H% }0 \ data.add(new Tuple3<>(1,2,13));
2 ]& T3 F9 B1 N: n# `6 i$ L3 b6 x1 M- n/ u4 A
+ {' R& Q2 `* G K) d0 O! ]6 G& q) B$ Q4 B; q7 n8 Z7 L
% \ ~+ C( T3 P- s% I# ~
, f8 Z( H! T% Z
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
- O, n9 I- s% x6 U3 `! e5 C
! V; H2 \& [8 t/ p( r" S. u9 ]: M: P9 y+ B6 K s+ l# B3 I, S
8 r! C% O/ q4 r: k7 p, F OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};* h9 ]- N! g0 |# J2 E: k
- w# r3 V+ {. {; T* K) o. Z2 `
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
* @' m- @' Q/ ^1 t# p0 m
& n1 p9 r) y/ B( ^. @; }. p
# A* U# [: h+ G; r. x& H
' U7 u k4 q9 L( I! s* t* M& o; e, K# \ G) k' i1 l, H3 ^7 L
4 ~" X) \2 h. _* K1 T; j8 l
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {8 ]: S# M; R# [# z0 l6 j
) K( }; F, k3 V" x @Override
, s/ C. {! k( l5 _7 u8 E& m d- e- X
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {, |' G% \3 k# ~ ^/ k
4 o2 G$ s; G: P
( Q: t* U7 Y" F4 _2 w5 k% B+ y7 X# w, _6 J, l6 R/ E
if (value.f0 == 0) {
5 t9 }9 Q- M# d" n' [
% O% x* a' ~6 t9 K1 S ctx.output(zeroStream, value);
" s0 y5 B- s" }% Z/ T, y) f% y) E
0 j h3 n4 N! V0 C' g, g } else if (value.f0 == 1) {8 c- u! J2 P5 x: h( Y9 v
4 d' `; [4 j0 D/ w0 t& ] ctx.output(oneStream, value);; l' v X& E3 U! l4 a, W
" \( c; q* ~2 G* [; I4 H; e" c4 U
}
( U. G+ L* G- ]2 A% Q( B
7 h: p# a- n, G0 d" j }& C* }; l+ @2 V
- e" u0 y; F* d3 U( E5 `
});9 b. _3 r" ?4 I
$ \7 K q" C* C j$ [( }. o' x
2 s& {9 B6 I' k
0 c) [2 X) f- C. u* a4 M2 n, e! q
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);/ p" u6 Q- F9 P
# n: L( X* E5 m: y" t8 [ DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);9 y& X5 ]8 n z) K) Y
- v2 _- y# B. l$ `7 P& G
8 V* r) |: @. \) ^
+ [6 ^- N4 i! D* t
zeroSideOutput.print();( z( x; h) R# ~9 v( G) |
( r1 p! j. P P: t: r
oneSideOutput.printToErr();4 ^+ Z- R" w% E8 j
8 q% O$ C& P" R V" U
5 V" g0 U5 T6 v; @" e7 _+ b( |
2 j6 h) a+ ~' E0 Y
: f+ v8 g4 X6 n9 N8 N1 a4 Q
& G: b1 ]2 F! e$ G% q0 K6 t2 Q //打印结果6 U% i8 N- _3 v/ \
. Z1 Z0 ~2 ]8 W
String jobName = "user defined streaming source";. d+ q; x( ^& {- J6 d: D) R U
% A2 H1 Y& A- _7 l# D( z# x; S6 U
env.execute(jobName);, f! d! j3 E6 C
/ ?7 D! [' N/ P+ i0 n4 q
}( x. r% }1 A. C0 R
</code></pre>% s+ [! r: V$ w
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
" l/ [3 o/ P7 X0 n6 \1 ]<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
$ [- v0 x/ T- r$ G) S2 E/ z<h3 id="总结">总结</h3>& j. w9 Z$ D2 ~, \& k9 C( f
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
7 x$ y1 e8 q5 J- Q! I6 T<blockquote>2 q# ~. @, L' H- @& A2 \: A. ?* H* t
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
$ } h+ V7 ?2 O- f</blockquote>1 Y, u* F) @. S: I H
: y$ r- ?5 ?" u5 _
|
|