|
/ n( o* F) D& b<h4 id="flink系列文章">Flink系列文章</h4>' F6 ?# n" k v* J4 v
<ol># V+ V( E5 _7 Y( B' [ F" }
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>3 j- }" d% N5 I; S& u l
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>" P' a8 e8 w( E! z9 j( s
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
# R) O2 W( i2 s6 }<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
7 p7 ?4 s4 J& b0 N: k' I! x<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
' h7 y2 p; ?0 K8 U- S {, i<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
/ ~/ x8 T; F- J# j. ?, l<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
2 i0 `+ }( b! k7 R8 a7 }0 P. n: Q8 ~<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>0 ^# t$ N2 v5 E' {
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
6 h) |: `2 ?% s! j) ~' @) H/ H1 h</ol> c0 {1 g1 q9 V
<blockquote>
2 v( }: G( H5 r+ [<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>9 |' X7 v* j8 X9 h; [0 C$ ]) z# L
</blockquote>
$ r& V3 Y, R8 i6 A: j) W: s9 \+ D<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
# G& W+ p* a( L- }+ F& P<h3 id="分流场景">分流场景</h3>
* y* ?+ @# V7 P$ t. w<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
0 B9 N P! A; _& J4 N" _<h3 id="分流的方法">分流的方法</h3>
0 @: D9 [" m* e) [/ ^<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>2 f1 K# E+ f; h! S7 P
<h4 id="filter-分流">Filter 分流</h4>7 p- H) T$ T4 T0 I; \
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
% ]" Z% B4 Q& w0 V. X<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>4 {& O0 I9 J9 I7 O
<p>来看下面的例子:</p>
: m+ t8 u, j$ @. C<p>复制代码</p>
/ L! D ~0 C s# ?<pre><code class="language-java">public static void main(String[] args) throws Exception {- q% e2 G _9 f! }) o
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
' L8 I5 R- l) J, G9 [3 j //获取数据源
4 I h3 F3 h8 u: D7 O List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();" ]* ]# |4 a7 v# F6 ]- P
data.add(new Tuple3<>(0,1,0));+ T X j; Z. w3 `- Y
data.add(new Tuple3<>(0,1,1));0 e8 z& g4 b6 W+ o4 |+ Q. V
data.add(new Tuple3<>(0,2,2));
, n/ w: _2 }/ J6 u& E% R: G data.add(new Tuple3<>(0,1,3));
9 a3 `9 M% G* {0 G. P data.add(new Tuple3<>(1,2,5)); J/ d; D5 ~7 g, l8 C3 k
data.add(new Tuple3<>(1,2,9));7 o, ~. x4 c+ t- N: K& r
data.add(new Tuple3<>(1,2,11));
/ w- d% U% |5 Y1 a' C% P3 r data.add(new Tuple3<>(1,2,13));
4 R! P) N# d4 `& Z3 Y0 u: \$ o' B' j# Z
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
$ L* a# ~/ d% z6 o" w. q8 M
% B0 @5 U7 N& M, U0 q# N; d/ q; U
( t, m+ o; O Q; a" M
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);( j2 d S, c# S; `# G
6 U/ l2 g9 \! @) v3 b X$ K$ ?* g
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1); a I- [: [ \: Y, O; R
! C$ |8 a; Z/ c# h+ I) u
5 W* W9 E) E& x5 h9 |. Q2 T$ L
# Q/ L: @8 R v$ {8 _3 `* q zeroStream.print();
6 e% c1 ^2 u4 r
9 v7 x- F% v6 }1 A) l, ?, a oneStream.printToErr();
7 J7 X4 M; Z/ N
$ S4 s3 M& K0 L
* u' ^2 q7 A. J: d/ ~
6 z' A* ~+ }$ q0 b S' z6 d& b3 D; g3 ]2 O& t6 [# G6 X. j8 H0 T; x
2 s L! ~) t1 K+ ]7 M5 @7 S) s
//打印结果$ Z1 Q" c( u$ c) |* o8 e
$ P; L' S7 ~: |, s0 \9 m- Z
String jobName = "user defined streaming source";' f! O# [& J7 C# b0 V0 ?
2 b1 D o1 `% t5 W [+ T) K# s
env.execute(jobName);
! O/ a$ U& @ U( A/ |3 T3 a7 z+ [) A/ W( x$ D1 I
}" b5 M/ s5 I& d( }" v! z
</code></pre>
: @: T7 Z- E. Q6 s0 N; y( f<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>) A. L7 J( S; f/ g
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>+ [# ~2 E) ~; \. T# s5 v0 [0 g
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p># z9 M6 O$ e! s
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>8 n4 U$ P, P+ u* X
<h4 id="split-分流">Split 分流</h4>
5 v4 X+ n/ \8 u& H1 K6 p<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
6 l5 O! N3 h) B# r. K<p>我们来看下面的例子:</p>7 C, m8 u7 x$ U" x
<p>复制代码</p>
) p2 e: s' b* |1 S. S, ^<pre><code class="language-java">public static void main(String[] args) throws Exception {; _$ p4 q% G8 x- o. x9 b
. U6 ~+ M7 U ]
! x3 }& X9 v; f2 O( V6 \+ T
9 S& M Q( j% U+ t( ], o StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ k4 w2 H. }" ~$ B, H, O
1 e5 O3 v* j9 W6 V( A //获取数据源! u/ ^0 Q' d# e" Z' K7 u; V* k2 @2 {( m
1 n" R$ ^. j2 h1 Z! _& \ k List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
# s! C" x% @( W3 |- x$ L6 O+ D! `! ?. S9 o
data.add(new Tuple3<>(0,1,0));
7 l/ _9 S' I7 E8 R |+ L4 ~, Z( X2 X
data.add(new Tuple3<>(0,1,1));& w0 j& j3 H* l' _+ a6 k9 q9 A5 p
4 b3 n. ?' P; P9 b# w$ D/ ] data.add(new Tuple3<>(0,2,2));/ a) }6 D+ `+ D& ~
; }! L" M8 B$ u% O& h$ ^
data.add(new Tuple3<>(0,1,3));) U! o# Y1 C! c
! g& b8 ^) I" `; H! T data.add(new Tuple3<>(1,2,5));
- F9 F2 V3 t* Y- J# g0 a/ t% H' v0 A, j# C
data.add(new Tuple3<>(1,2,9));
, w. J8 F8 A1 M3 t
2 u6 u ~# U/ k+ | ` data.add(new Tuple3<>(1,2,11));
' B; z, X& G* m- l2 e) |! N$ |& U. ?- i1 t
data.add(new Tuple3<>(1,2,13));: P% y5 D; q% w( j" L
' o1 `* `% G H: D& e' R
- S( T& @4 ]! F) E% ^& `7 @: J& {4 ?$ d2 b
: F1 K) R, \# `* c% ^. q8 ?9 }( K0 x5 K! S. m
$ B" N0 ~: [, K DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);0 o( g' n3 U. C& D
1 K. W b* B+ Y" k% h% `
+ r% H# i7 I# T, U, O0 m
; m0 p l; W6 G5 f7 o' k9 ^ G( @% f) Q% f9 F% j+ S7 X+ k
) Z1 N( U! X* [. r5 J" K7 w SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
$ h1 k5 | z, C+ g2 m y
h4 f3 s u" o! k( h @Override! O6 o+ r) [" _2 s, E9 T
2 p0 M; J* Z p$ n3 x$ d2 x
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {+ ?9 W- o' O8 b: M7 l# I+ @
( I& g" ~; o; {/ k% z$ t
List<String> tags = new ArrayList<>();
/ E9 V0 N5 M5 Q, h5 ^4 `! v X, h4 W5 g# c4 S, R0 k7 z1 D, B
if (value.f0 == 0) {
+ X2 F1 T% P# l: Y/ _9 n. V7 {" |2 F4 I7 w1 _
tags.add("zeroStream");
( q8 x, z, k+ n: S
8 M0 n/ H& H5 k2 F, M5 ~ } else if (value.f0 == 1) {9 S7 k4 N& p8 @
: |+ m$ R, b% h5 T
tags.add("oneStream");
0 q/ X8 b. y e/ o6 z0 o7 u' Y) Z( H( ~6 A% w, i7 |. w2 x5 S
}2 M* ]# z, ?% B
) a b3 M0 o4 j return tags;
2 \: P9 J' l/ {6 B
: x% G0 O2 D8 A, C8 w( g }
( C4 j: J. h6 S7 k$ O. q3 {
( v$ o+ }0 C) ]5 C$ h9 j/ L });, b9 F( V( h& v+ D9 J8 K+ s
+ a3 U, Q" |: t; m# r- W* @. s1 f! O' i/ L" T9 U
. X- z( q; J! {# i/ f
splitStream.select("zeroStream").print();' g9 v9 A* H4 _1 [& K$ o
4 z8 Q1 Z6 e0 ?1 P E; a: H
splitStream.select("oneStream").printToErr();2 D( W9 V: A+ W% f" r ~; c
, M; J! a) T9 V, ]% v
1 R7 f9 b# ?# Z" X- `3 [+ \/ p$ V+ c( F) B
//打印结果
. q5 y. [- J1 P8 a1 W; X' e* h3 \$ t9 F/ j; R9 y9 ]. `: n
String jobName = "user defined streaming source";) i( Y+ o+ j7 b" N( a- Q8 n
. B F4 C- X) }" e3 y' ` env.execute(jobName);
' s: G6 A5 P2 V
, w. z# @/ y6 q! m}
) U# e- W0 E2 F: j7 e7 p) ]</code></pre>
% P O9 B3 R% w- P<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
' F2 K' y$ m5 O1 h<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>1 A; F& b: {3 p, i; `/ w1 D
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>7 F1 }5 {9 I9 F6 k4 v
<p>复制代码</p>+ x1 r$ |# F0 S5 q
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
/ s, v* g3 m1 J) G6 b</code></pre>, a' y2 r- p `0 l) ]- T
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
5 Y# w/ Q( V2 P8 r4 o<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
9 ]8 H9 H( M$ e9 N0 O& W% s<h4 id="sideoutput-分流">SideOutPut 分流</h4> v2 Q: f2 m1 R0 T
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>. v# C: Z, U6 D0 Y
<ul># n- [8 Z: p4 j' Y
<li>定义 OutputTag</li>
# H) Q. d; t9 b& Y7 q<li>调用特定函数进行数据拆分
$ ]4 Q1 p! t! |% F4 Z2 X<ul>
! }& n: w6 M$ w, F<li>ProcessFunction</li>
2 D. P' |% _! K' z0 E<li>KeyedProcessFunction</li>
5 F. W0 P- C+ C2 I& i<li>CoProcessFunction</li>& U4 Y2 O3 b) r. F
<li>KeyedCoProcessFunction</li>; T' t e* T$ j: z& m
<li>ProcessWindowFunction</li>. Z, g; o) r0 Y! [; T* |
<li>ProcessAllWindowFunction</li>
6 d- R. j* O) D2 P8 r1 v</ul> |4 @" b( C# _7 r( J0 `# G2 b
</li>1 Q: r& w8 F1 H) L* r2 Q) g* Y6 R
</ul>
% K* i+ t& E5 |+ w0 |<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p> W9 F5 k) r: D. z; H9 a3 ]' r
<p>复制代码</p>+ m' v+ K# F5 J6 w2 h
<pre><code class="language-java">public static void main(String[] args) throws Exception {8 H) d6 p7 Q6 N' T+ w, t4 |
& {/ X/ n1 ]/ a# o3 A8 [# P& K
1 x: Z' Z: |6 G9 j: l; Y' O- p9 k0 _( i
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();, R) ^4 T3 n4 \$ y J) ]7 w0 n
; M2 r9 E x3 x! ?( w0 c% Q
//获取数据源
3 T I$ i' [( u P: n. T3 o+ `$ t
% A: t ^: I0 K6 S List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
. X$ m9 f' w! ~; e: w% p* |: R/ I3 k% Z8 t6 \4 m
data.add(new Tuple3<>(0,1,0));
, ]$ [7 s, J' L; w, g. \8 j+ m+ Z- k
data.add(new Tuple3<>(0,1,1));% i+ h1 u K0 {6 a7 {
4 }. E( Q; f& O8 M' l& i, e9 r
data.add(new Tuple3<>(0,2,2));- d5 Y6 K9 n; P }+ ?" D/ d" {
3 m) t8 w/ _: u, N p
data.add(new Tuple3<>(0,1,3));5 T5 F2 _) W/ i1 @$ n3 v
. Z6 c0 `2 U- ~! e8 ~0 _
data.add(new Tuple3<>(1,2,5));7 [& w- R* j$ G% J) w, R# R
. O( o0 Q- @( ^# _. T5 ] \ data.add(new Tuple3<>(1,2,9));
2 [4 N0 p; j5 M- y: h
6 q A5 n8 o1 p/ U4 ], r data.add(new Tuple3<>(1,2,11));" E* q) @, O! @- q
$ O8 l' d" C. d3 W& f( H data.add(new Tuple3<>(1,2,13));, X B- g. C& V" n' j
0 v9 G/ c$ x7 B, y: t
p, X! e. {- _( P# F% j
1 D) _9 l/ [" c7 s4 h+ W
. ]3 E7 M2 F9 Q% O3 Y2 V7 r' V2 o- X* F# E3 w1 m4 C- e- l
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);$ E! o- U6 \9 C
9 b9 M9 c/ j- I+ b7 G3 J
5 |# i! R" P/ A3 @. m
4 U$ d9 `2 O* d; l
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
9 M1 |- N- H: I* c& s7 W; G
+ g$ O, z" `$ I/ i2 `5 J OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
% ^1 T- z( `' m% O0 A9 H2 i: N. J9 H, G
8 E' }4 ~4 k( Y2 D* F0 W; d. }4 F+ L2 i/ L
( a' Y# }6 h) X6 @0 X* E
5 \+ A6 V) e( h8 R. o- w+ w; t8 a7 n
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
! V3 x+ e% v8 y* T, H2 ~2 V' q# _! g% D
@Override: e+ S0 p" D' b1 n& D& X+ H- u
; y' v, r1 U$ A; A) f) L public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {* P( S, g4 T$ S
3 S, `7 n8 I0 e; n- @
D+ M5 Q- h/ F1 U: S
' d- x3 R' M" Y# Z if (value.f0 == 0) {
; y! N" F$ l2 M# o9 T5 p4 d8 p: u" n6 S9 N% _+ `4 \5 t! f
ctx.output(zeroStream, value);0 N$ \' r% ~7 y
) L# o" [( P/ O4 k+ M* ` } else if (value.f0 == 1) {' h8 n. R X( z2 J* }
" Z* V3 z$ V- y( z/ T- d# k3 T' i, T
ctx.output(oneStream, value);9 J' V/ z) o/ y8 L
" i- [! T& y9 u4 W; t; } ? }* E) n' ?6 d) o1 C3 U+ \+ x
3 W- P1 j: N8 h { }0 C0 l7 a/ L4 A7 f( ^8 c/ P! l1 g
$ \( r! C/ L, O/ U
});$ \* r+ ]) ?$ x" @0 k/ p
% w2 K( b& |( i5 u: u" Q2 W6 F+ {9 f! n! {6 a' n
+ q y3 b( {0 I2 s% a DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);, g0 L) j) o8 l O
9 f! d. A0 P- n6 K% G5 Y. Z DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
! x$ ~9 C. F4 m/ L) v
" p8 h8 ^% j6 e# z! e$ C* C) |! |6 V- j, @+ d& i
' a( q) f* y5 ] f
zeroSideOutput.print();
- x) S. j/ M1 D$ h* W
: o! R( c3 c3 V oneSideOutput.printToErr();. R$ i$ G+ m% a
8 d7 D" ?7 u$ d3 P4 ?9 a; e6 d! Z. ?' N. I& t
6 Z' J9 K7 d* V5 Y& j$ ~7 O/ {. S
# R5 j7 T+ M, S( J: h
/ \, S$ x2 X. S2 t) `" D6 B
//打印结果
' m# k! s8 M" q& j3 Z
( C: B; H5 n! i1 z2 ` String jobName = "user defined streaming source";2 L' f3 a% P- X
# Z1 W; S9 K6 F, N
env.execute(jobName);6 G' m6 T* k, P
, b. z9 Z. n2 _5 t+ k
}; x* i' F9 L3 |- K* o P, D) d: V. H
</code></pre>8 Y* R, P) X# S+ w
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>4 Z% d2 P( J4 L) q& Z
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>0 {2 t2 v' K6 V( x0 a
<h3 id="总结">总结</h3>
" t W! E0 i/ V2 z1 X6 u9 J<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
# S) `9 g" x( x; I' C- S7 {! A6 b* @<blockquote>
. `" a8 M# A6 _ N; i. b8 Y<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
2 y% k/ t6 }4 h+ s& R0 B( N</blockquote>
2 j; t# M. M5 G5 x1 W
" |$ j4 P( ?; q2 h Y) q0 Y6 k6 C |
|