|
|
7 j3 h% V4 W8 t
<h4 id="flink系列文章">Flink系列文章</h4>
+ k z: T9 G2 {5 ]/ r<ol>/ |( r$ [: M, C, }0 }: c
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
; j; v' {" E/ V% P<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
) {% c }. I3 _ }<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>0 B4 v/ B6 B {3 ^/ u& {5 U
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
) t+ q A$ W8 z9 h<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>0 e* D* I, a" M" Y$ ^. n& y
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>8 S' l5 x3 f5 y5 r z; R7 [0 [
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>8 P( `! m. i/ @
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
8 s' T& z* F/ l3 T" ~- Z<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
% g8 \: Y; T H4 v6 ?' U</ol>5 ?) \$ A; U2 T$ d4 J, s3 v
<blockquote>
F% \( S2 p" U# Y<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>0 M8 [! q3 k0 d6 C$ R# ]* C! D# i
</blockquote>
" J! C) V [9 u5 B6 z2 I<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
" \& B. d, m K2 {. P<h3 id="分流场景">分流场景</h3>
0 j% C$ s( I8 Y3 u v<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
. m$ W7 k% u# x6 j+ @% b<h3 id="分流的方法">分流的方法</h3>
/ N' v7 q. [& |0 V<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
- H- B$ k7 l+ E d<h4 id="filter-分流">Filter 分流</h4>
& V+ p. g7 i( a9 \7 V. B! \2 m<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>9 H9 }3 D4 Q2 ?3 W/ \
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
8 l* U, Q* y \: s& f<p>来看下面的例子:</p>' W) ?- _2 ~, S
<p>复制代码</p> @ V7 ]1 s2 X6 q+ l% z# L
<pre><code class="language-java">public static void main(String[] args) throws Exception {5 H) X6 c( P1 g, X$ Z1 C: ~
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
: A' e1 ^$ L5 ~3 D //获取数据源6 L$ X5 E& C' |3 W/ ]; a( X- w
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
( e5 f) k% ~% J* i/ p, t/ { data.add(new Tuple3<>(0,1,0));
: l' q& C9 N' L4 w0 \; ` data.add(new Tuple3<>(0,1,1));8 ?! S+ e9 x2 y0 V& Z2 Y
data.add(new Tuple3<>(0,2,2));
6 D6 a( O* [; c+ n data.add(new Tuple3<>(0,1,3));
8 n( ^5 T, F. Y$ p. |7 y) { data.add(new Tuple3<>(1,2,5));; z% z0 W+ S: p6 A9 c
data.add(new Tuple3<>(1,2,9));' k4 Q' T- W8 p6 j
data.add(new Tuple3<>(1,2,11));' O+ Q& u/ f9 M1 z& H$ y1 D' f7 X
data.add(new Tuple3<>(1,2,13));" T7 p6 R F9 E+ M6 a
1 |- r o r! b! t" m, C! F& @/ J4 ?
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
) ?& T8 n$ b: M7 u5 z' q. Y7 Q6 k [& j* k i% `: q4 F
4 ^& E( e% P5 ^) r, L- P) p
, Z! \' Y" _8 t5 R4 h3 E, l SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
: i- w- k1 O5 B: \8 g {5 c2 k0 a, M! ^( l/ @! Y! r8 d1 E
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);. {1 N6 o, C5 S3 T4 Y$ C# T
$ @7 h: Y; F [8 k# l' `0 U
i! I" x" k' h% Y) h7 l0 r" S8 B+ w
zeroStream.print();
# J. w! C& e" E/ I4 C* z2 F _7 t* Q4 [' U
5 ^0 R/ K( I. v oneStream.printToErr();
& F; y6 p% e) ]( x: @9 d0 I S" q; _) l' i! _& C- Y
' w+ m9 E/ S# J- a" q4 I, o H$ b8 T# H: V, i5 ^: v4 z N
' I/ _; t+ F8 ^) i0 L S5 e" p
" t1 ^' v) `4 h. ]& l I2 ` //打印结果% M, u( p- \! {) v8 p$ V) p4 X/ k V! R
; v' R( x7 m- L' ]. y
String jobName = "user defined streaming source";) _( O* h; \% [* ?4 G7 Y6 g
3 Y; Q! }. C& g4 W: k! ]
env.execute(jobName);0 K9 r2 w$ r# G6 O
# ^- M! T% ~/ T4 O% G; h
}
& o) f5 @% ^4 r0 s$ |</code></pre>
- [0 r% u0 @8 Y; B y<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>4 k. [0 }: f y7 m9 N( L' {# @7 Y8 t
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
9 G+ n1 X" j! L1 U3 L5 m<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>' P6 m6 i4 x2 v$ a9 h3 y
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>- ]0 h, x/ Z; C. Y! \
<h4 id="split-分流">Split 分流</h4>* A, R' s' Z" {7 C/ ]6 V$ N
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>8 j% _& Q/ n5 O" l5 y0 F1 L, ~
<p>我们来看下面的例子:</p>
- X+ J$ p- i3 Z: s% X' `1 L* U<p>复制代码</p>. p2 K9 L& K9 J* E& x
<pre><code class="language-java">public static void main(String[] args) throws Exception {1 L2 P& o% _$ q$ X) L
W; F3 n) l$ Z3 @9 Z |" Z( e( T$ i4 k
) c4 n: Y. p0 A3 ? j
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
3 R: ?9 s$ F* C" [- A q; W5 F+ k
//获取数据源 d/ ^6 r3 L g& [
3 R0 Z$ y$ T9 U/ C
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
E k+ g* a8 [0 R" P/ M/ }- i2 W" a5 F' z, z) M
data.add(new Tuple3<>(0,1,0));& D7 n. ?5 l: ]7 E3 t1 ^& I% C( [
* c% x9 h' U- S: r5 M/ {" G data.add(new Tuple3<>(0,1,1)); S9 n k% T% y) _0 k z
: h0 Q+ G- @# h! g3 h
data.add(new Tuple3<>(0,2,2));$ w/ A- b# G" P/ l5 D. m6 D; _
& w* G5 Y, j& [1 U7 g
data.add(new Tuple3<>(0,1,3));0 A; _: z' L: {7 J: m X, D8 \
2 F- k b: ~/ I8 m3 Q data.add(new Tuple3<>(1,2,5));+ \, _. ?! h3 z& J! m7 S- m
1 k; t( A, Q1 _0 \ data.add(new Tuple3<>(1,2,9));
7 Q1 d8 r9 e6 l. O! I/ u$ w% o( w# C& \. B) R" @. k3 y; _# D
data.add(new Tuple3<>(1,2,11));2 k' c% i0 B5 v& m% w! G
0 {# y: y0 R% i W! G$ v
data.add(new Tuple3<>(1,2,13));$ O; P1 m0 G! v7 p
+ _6 I, n) j& K7 V1 G
, t. F8 r$ O0 s/ C' r2 C
) l# {+ X( B( @. d
% q* T/ H: |) C
5 l- L1 j- j U3 |, t1 _' J DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
( J: b4 c$ p9 l# X) ^9 ~- M
& A" g: y F0 C$ Z3 a5 L2 M4 [# e, Q2 K: i/ v2 a* B; X3 v6 I
* S( U* d! B7 Z, ^: |* d; O) F7 O* s, b5 v, n
8 a* Y( |# k B' t" k3 x SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
7 F+ T( t5 |) o& I9 ~2 o( V2 Y/ r+ e/ `! l' Q" s# F; P
@Override7 f+ E: t+ y% ?5 p
- {8 z2 y4 f+ Z) l8 E public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {6 z3 ?; W" A. D0 l
: e, _/ {5 c2 q1 D" J/ ^( i. @ List<String> tags = new ArrayList<>();! _1 I4 u. b- v5 o% k
4 \1 }5 Q9 t* _4 y% o if (value.f0 == 0) {5 F2 R: {7 F v5 E1 Y
2 E% j2 ^, C: Z4 H, V7 \9 f* K
tags.add("zeroStream");
2 E' F) r$ Z+ }3 g: A
# y( L$ V6 p4 r7 m# k } else if (value.f0 == 1) {
. A" w) \6 h2 S. i! j) _$ X+ c5 s0 |7 M( S
tags.add("oneStream");& r. K: p4 d( f- O. ]
8 T- C3 G. `7 O% H! K# J" r }1 x2 C5 Q) A8 l; O+ m3 ]6 C4 t& `
: p2 D" t. |# ~: T# U% @
return tags;+ t% x) S7 V3 P& y5 Z7 r+ o
, h' g! V5 h9 l' c% x }& n8 A S, ?! ` S9 [' R" W$ ^
, U& b( M, R+ @9 C- d
});7 H/ {8 |/ K* p. K0 a) q
% P2 z) I8 P* _& P9 v6 H! F" K' X; S
4 n7 L, P% p) \1 h n5 ^: b& M4 {
4 I0 p2 @ k% I3 n2 p5 u, Q splitStream.select("zeroStream").print();
; L0 b2 j# P% i6 F4 }8 [0 j' I. h( L% k* @/ F
splitStream.select("oneStream").printToErr();
- x, Z- ~' X3 Q1 r2 \" [, E
. R7 j& F( z |$ P: `& q! k! O% f8 V, u, n) ?" g+ N0 C
3 k& N3 M2 L5 R: O( f# I. m
//打印结果
5 h) f" `$ Z* I' E& G
, ]' U! ~2 h- L2 V String jobName = "user defined streaming source";
4 d7 Z' z+ R1 I3 H! Z) o" n
0 i" b9 b- j* c X8 W- c env.execute(jobName);6 V' W" J( W3 ~9 p X9 P# L9 W @
$ l! Y5 |1 d# B) P* m u. G}
. }$ ]" F2 K( r5 w# F</code></pre>
& b# Q- q1 G/ m" W<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
; z3 Y5 i0 k3 t; Y) q, o; ?<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
: x. v: b; m1 L, a2 j- `<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>: ~% Z! }" M/ N7 s# g, [$ c f
<p>复制代码</p>6 M7 v! `) \6 p" B
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
Y* T# Y: z9 Z9 R* ~) {</code></pre>4 }; n. B/ } y" w" X
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
" G1 t# a1 N* h1 e$ Z- i8 \9 m<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
% A2 W; p7 O9 o5 q0 u<h4 id="sideoutput-分流">SideOutPut 分流</h4>
! f1 Q6 d* p v, o8 g% e' s<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
2 i4 d# @6 A, D8 x2 l+ ~ v# C) W- E- }<ul>; v6 O/ m$ ]6 \$ M
<li>定义 OutputTag</li>9 S3 L- c1 v1 S3 H. q
<li>调用特定函数进行数据拆分
& \& D) A; Y, y3 B<ul>/ }5 ^0 m0 R% k) a! C
<li>ProcessFunction</li>* B$ F4 w+ r/ H2 j
<li>KeyedProcessFunction</li>+ m+ i1 ?# ` p
<li>CoProcessFunction</li>
3 M N( @6 ?1 ]9 P" W7 [) ?0 ]3 d<li>KeyedCoProcessFunction</li>0 p6 N4 f3 k" G; Q$ k* ?0 v% q! }
<li>ProcessWindowFunction</li>
9 y; S/ k8 X: v& J {% y<li>ProcessAllWindowFunction</li>
4 D3 K' N$ d7 e8 q' j; t8 B# |</ul>$ ?. a# j, R1 \- p' V) x* P7 w1 m
</li>; n3 ]. o4 x$ t' t
</ul>
9 ~, w0 y7 s! p+ C: g8 h<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>& T& {2 t& W" x7 R# Q
<p>复制代码</p>7 D0 N( l R K0 o/ Q$ }+ f/ I+ r
<pre><code class="language-java">public static void main(String[] args) throws Exception {
% o7 e7 a/ J& z* H* E2 K0 h- @& W% h2 l( k6 y% T
`- b4 U! _, I& L) \1 M+ A0 c7 g1 @0 r) i' }( K$ l
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();+ W# t5 W5 r9 D9 h8 ^: D
+ c6 [! X; B; s* b: A
//获取数据源, l. q, F+ h/ r1 ?7 ^
f6 @# B0 o, J' v( s( ^
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
: W8 h0 o3 W( `3 Q ?& v
9 \5 \) a. x/ ]; ? data.add(new Tuple3<>(0,1,0));
: z* A/ {2 d2 f& J. V8 y/ |8 M/ \$ i8 U: z# e; |
data.add(new Tuple3<>(0,1,1));1 I/ b4 v! a6 b4 F5 N) Q
/ t( }4 i' c- O# `- l8 v data.add(new Tuple3<>(0,2,2));
& j% e6 S! h, h' s* D, v8 C* W$ P, l6 N! X
data.add(new Tuple3<>(0,1,3));
# A! J: }" Q }9 F* b9 Y; X& C1 d) ~' \: Z" V- V/ `
data.add(new Tuple3<>(1,2,5));
! B& P# ]) }4 z. C3 K$ p0 L- ^( r2 e- a5 a; S, x1 o
data.add(new Tuple3<>(1,2,9));
" q* x6 z' l9 O9 ~
- n" V- x9 d( b$ O( O( d5 y3 J5 ` data.add(new Tuple3<>(1,2,11));6 K6 d d0 O+ O! n) f
% O" K! \' p$ ? P# q
data.add(new Tuple3<>(1,2,13));4 A5 C+ M5 L! a! \4 X% B" @' H
1 i# M/ J; p/ o# a4 Z0 l$ ^7 _
$ C1 m) W4 L5 g" b2 Z( \, q: z' A! B1 N) ]' }2 J
9 U1 [. m# T0 ~( d! h9 O7 y5 O; d
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
" }" ]! `' S# K- @
6 v; N! H. ]3 k: R+ `$ k" C7 D3 b4 g# n8 W! S
# t( g# ~2 K3 f* Z- G0 r+ q- V
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
K4 L) h# _& d7 R" C
& o9 S% \( R" T, w OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};0 F, I# W3 C2 n6 u% o
! k4 N5 p# q9 @" J) J) ~* O8 b* B) S
) k, n+ r2 ^; F7 M+ E# D1 g7 z9 c4 j0 H. y; K1 U1 b
; D2 y$ c1 Z9 Z& K1 u
1 Q: B( x8 n2 i; k: m0 _8 y SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
- `- z$ q- |' J4 v6 u* ^6 e: c! r1 A; `7 _1 f' C
@Override
9 b% g9 @% U7 _+ G$ }
$ D) u: k% G# f1 ?% h4 K; M/ z public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {5 t, M; u" I& j
9 c2 O' R }+ q' N
" u( I! R2 T, {7 C. | |% B
" i3 y* W' E; l0 N7 Y
if (value.f0 == 0) {
6 a u) X1 Q1 p: p) M4 A1 x, l% T. s( j' w1 r) U
ctx.output(zeroStream, value);; @1 ~# z9 a; k, v* k
# y, n% c9 F' h" {5 x6 X
} else if (value.f0 == 1) {
' d5 y" [+ Z c% `0 o: M5 _$ U% o0 }+ ?. Z) L" r0 l
ctx.output(oneStream, value); X) B( u) ~8 i2 a, U
+ V5 d1 M9 D' ]$ q. ?0 _
}/ a5 Z5 A$ E S5 ^9 D
% L: D1 a. a7 e7 j8 |4 | t( \3 r
}
- y" k* }, x! ~$ Z: b" o3 I2 r ], D5 j. x2 m
});) @% s% w/ U4 g7 v6 d+ r U+ y
, [& d/ l$ E) _9 \* X" y5 `. w" G) l7 X2 O
# u5 v* K7 I: G& K& w3 r
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
/ T+ |1 e* M' N* O3 S1 S
7 g( H( C ^6 H A, t- A DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
+ _2 g+ y$ O8 i. @
. J. A# Z% f1 b. Y3 V
% V$ n$ Q. d' `* z, j% o) m7 \/ j1 p
zeroSideOutput.print();2 U( B0 v7 q+ q
4 v3 a4 f6 U" i y6 _
oneSideOutput.printToErr();
+ @6 P" V ?* e9 S {
* F3 y' t5 b3 y2 M
: e; `* a) Z3 v7 S7 q# C& H8 `8 Y: c* C" P+ x0 x9 a6 m( x8 `3 _
?' d X# j6 J- l
1 v6 n# ^+ W9 a" ^/ @3 O
//打印结果, k8 E0 E' U: j9 y9 q% W" v8 J
; ]! p6 v% s* J' t: ?$ U7 Y* ` String jobName = "user defined streaming source";. e2 B/ J3 x! o/ P9 a; k* v( q
' x9 A. S" W6 b* f5 c% V
env.execute(jobName); m4 p3 S" J# }' l- p+ @+ M
2 y0 ^* P \! H8 M* f}! |- C, d" C; y: r1 u
</code></pre>
3 Q0 Q9 q( j; s+ b8 F. f+ A* _" u<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>& p3 q* X+ }8 b/ [1 z
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>; _* [0 B9 D7 }
<h3 id="总结">总结</h3>
8 x0 c* ^$ [, P9 S5 c* R<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
+ h3 m6 ]4 ^& f9 q6 P<blockquote>8 @0 `* X& r6 Y' C; p3 W
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
b: B- M/ U1 l1 Q</blockquote>
8 }* X/ [7 y$ s
6 m4 i& R' n# O/ V% ]' l |
|