|
|
. n9 |- j0 L9 i2 }/ S) `<h4 id="flink系列文章">Flink系列文章</h4>, F$ t. o% f: m1 q' y
<ol>
2 d5 Q. g. ]6 z/ C<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>7 j) h' |& D& @( g) ?0 z8 g5 m
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>4 a. D* m; l6 f- M
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
7 T" @+ O+ O4 l" g<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
5 i4 @ }$ h- h6 E$ V7 N; e<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>) R& H$ S: ~6 s& b# P
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>* L: M- g* c3 w* z3 b: ], O
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
: v; v% }, B: A7 _<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>0 }; o; d# U }3 N) s+ ~
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li> q7 O( i, J* d3 m3 r" `
</ol>
: R4 x7 O7 v; C% P/ ^, H4 _' T<blockquote>8 e2 \! U4 Z: m9 d2 d0 o" D
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>, @' t( t$ v9 G& |1 o' j9 x0 {2 N* q s
</blockquote>
: P$ ^$ H* n% Z/ X' O4 b4 J<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>6 T) l& e2 X& l, E3 X, K
<h3 id="分流场景">分流场景</h3>9 i0 X! b( D3 T% E1 S! l ~+ y. `
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p># q: s: v& C1 g: e4 j8 r: ^
<h3 id="分流的方法">分流的方法</h3>) J2 Q3 l, L) |% Y' G2 x4 O9 ~* @
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>0 T$ {" z( h( t
<h4 id="filter-分流">Filter 分流</h4>0 z# }6 v ?5 g
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>' p) M# E1 j, M$ @- p
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
5 \1 V( T M' n* A" ~& S<p>来看下面的例子:</p>
7 ]4 O5 O4 i6 \& D5 z<p>复制代码</p>
- y2 K$ K& i# s: C# d1 K<pre><code class="language-java">public static void main(String[] args) throws Exception {+ q" j6 l6 p2 W6 [0 l" S
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
: n* d! ~& M' v, a7 l6 r( i) q //获取数据源4 [" ~) h9 M, n _$ u) v
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();$ D& Z/ `1 c Z3 w7 k9 A! o
data.add(new Tuple3<>(0,1,0));! [, g& d* L5 P+ R) G4 J
data.add(new Tuple3<>(0,1,1));6 ~6 ]0 g" M. \- A7 S
data.add(new Tuple3<>(0,2,2));8 U3 k% R; {) t
data.add(new Tuple3<>(0,1,3));$ a9 E. [7 U6 n4 F0 y
data.add(new Tuple3<>(1,2,5));
) k1 ?' Y2 F' @ data.add(new Tuple3<>(1,2,9));
6 }$ M5 e3 M# |+ w4 @ data.add(new Tuple3<>(1,2,11));
W9 M) ?$ S x' }; \ data.add(new Tuple3<>(1,2,13));0 B! n- G D6 T
7 e- ?, p- ~3 v& c+ T DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
9 z4 k4 R% l) K. V. s( Q- W7 K4 K5 S
# ^8 r. Z7 @1 w9 _% u L0 F
3 G3 k9 M1 n- Z( H; ^/ x$ g2 G SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);% |6 {5 D, u; s) D! k; Z
+ t; n y# L& }" f2 O SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);" J. g# {. k0 ?+ o7 W4 u, @" @
' e- S+ W' ]% I: ^$ [5 e+ J; x/ g U- d, }( u% i/ j8 x, @( _
8 a. B$ E3 a+ ]0 q+ E6 o
zeroStream.print();
% v' v7 n- f! m- L+ W
1 D! O. y% @( l; }# n oneStream.printToErr();
- A& e8 n( T1 j: _; ^% O
/ S4 }! F2 B& Z% n# d* m
, t6 [. i4 d" r K7 t# e4 G2 P7 A1 h
5 t \$ P. ?/ M% q7 X
1 Y/ h) p" e) E8 e9 d/ a3 f
% u5 c w" e: A2 l( E r //打印结果$ X" C: a& l0 G9 M( O
- V, X6 w! a, Q& F# W3 W; f }
String jobName = "user defined streaming source";' k$ g3 p/ {, X% Z
; F0 Y+ \* y$ _: z9 f env.execute(jobName);
4 q) @- S' u& \4 r4 l4 Y/ N, X6 A- t/ w
}
8 D+ {+ M/ i2 h$ j5 D4 O</code></pre>
9 H6 l+ U+ D; ^" i9 {<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p># ^# ?: ^/ o$ H. I9 m& z
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>5 ?5 F1 M+ @9 p* z* ?4 E( L
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>; ^9 s1 q0 {: u& |9 U) l( s+ ]+ o
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>9 F( A6 W2 b* B
<h4 id="split-分流">Split 分流</h4>
/ a/ k. ~+ O1 t5 y- c6 F<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>& I& l+ B* g8 m* M
<p>我们来看下面的例子:</p>
9 \/ |/ F' J, _( T( a% `<p>复制代码</p>
7 W: o2 j( b1 k M" x: y+ H<pre><code class="language-java">public static void main(String[] args) throws Exception {& B- x! S5 |% [1 q1 Q8 W* e; A
9 A# N7 U8 \5 |6 Q
; {7 X% |2 F/ s X) E9 Q4 `! R/ B' r+ X; T7 i2 ^
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();7 U% E; R3 H5 W
j& J. \" H1 p
//获取数据源
) y4 M2 K9 x" ?& ^
/ M5 v# z. R* B% E. w' a List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
9 N; G4 l: a9 H0 N1 [2 P
7 j5 A& ?" B' c O data.add(new Tuple3<>(0,1,0));% v( |: A- c/ N* ] o7 B, j
# G6 y5 K' g) [7 I
data.add(new Tuple3<>(0,1,1));; u: Q, X$ f4 w/ `# x
7 m. k( o" u+ z$ V0 K. Q1 c data.add(new Tuple3<>(0,2,2));, t8 a( t/ m2 Y$ T: `. x
( G0 x; @6 J; \% }* u( B: B
data.add(new Tuple3<>(0,1,3));7 z( l' @1 B/ ^3 B4 l: l
7 l* \& n( c5 U$ Q: v7 q& p data.add(new Tuple3<>(1,2,5));" t; n3 V" n8 l" C0 C* G
$ p6 {. U1 L# v* X+ G* z% x) E2 X
data.add(new Tuple3<>(1,2,9));4 h& t E. n- o
3 e [$ N1 h+ _' W
data.add(new Tuple3<>(1,2,11));
6 s, {1 N0 X9 ]
3 y( h. S$ M0 c) R+ V8 Y( J data.add(new Tuple3<>(1,2,13));
* H$ e9 u, F4 l/ s9 |& h$ S+ p/ ?8 r4 b; G. d+ f
9 o7 u( j9 q/ X n1 T$ o
2 A$ P( N ?# R7 |: F7 Z5 x F
$ B; A* T9 a; W& G2 j
3 Z' Y, I9 ~5 _1 ^) v" ~; G
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
# f& ?7 H8 D( T( N; D( H3 J" }1 z3 _# T' E' y
/ H- _9 m( e# t# O' k/ ], ?& Q2 \2 n, k; O' ?% h
/ ~# ]: A% |; Q5 {0 ?/ t* O+ F* a3 n2 H+ w
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
7 [# F! R. v! K8 l
! @$ K$ o9 W5 `7 d5 q @Override
y, w- @1 e# @: ]* m- h: B0 I' N
; B `6 s; p+ _0 v% B0 m: ^# ~/ t# I public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {0 z% j6 v# Z( A! w% ?3 J
. j& j' E/ C& |# m* m List<String> tags = new ArrayList<>();3 b. A# q9 f# j ^6 r
7 ^+ a. {, C- Z7 ?9 X9 B/ H# a if (value.f0 == 0) {
8 |5 R2 ^4 F$ S* h; k% t- ]' x* m) N& x+ z1 A
tags.add("zeroStream");
# j$ e9 E/ b; q6 o- f
3 x4 l0 a4 |+ ~ i0 S2 O: z A: k" f } else if (value.f0 == 1) {
3 G- E- }) P1 _: W2 I/ V
5 C3 j) J3 d' M& ^ tags.add("oneStream");
: ] z) X5 c1 X; _/ m/ Y) ~( v# ?, J: O" f# W
}
& P8 K: U7 k/ C( a
3 ?4 k* d: \" b0 S" g6 m return tags;
+ Q, \! [3 J* U! D7 i) E7 }
: m' G1 u; ~) l" ]6 p1 W }$ p1 H v/ h$ c4 U$ u- V8 Z- i
1 F& ?% @! G `7 n$ J4 e });
( E4 Z& N( V9 A; _5 Q) G' v
% U* k6 ?7 @4 x% A! `) l
1 ?1 d4 L- n( c& B, M0 s* G7 e1 g7 n p3 Q6 ^6 y
splitStream.select("zeroStream").print();
! N: Q: b% [, Y9 c3 A/ e
) f8 Y: ^7 m p splitStream.select("oneStream").printToErr();
% b4 a8 c+ `# A) [3 l! T" V% n
' M, r* B4 z+ I% S% Y) w
: C, ?: e1 V: N& H
! x1 s& E% I: c/ n1 ], n //打印结果; G5 c; L; _/ _% A
8 i& C8 S' C8 t( c" {
String jobName = "user defined streaming source";
0 ]; L" o+ `# R! R8 z* u# |9 m+ A4 ^" ^; t9 F0 U
env.execute(jobName);
; v3 N$ M* u: `+ _- ^2 M' F0 J' l& x4 r' f+ C! O
}
e0 L8 V" X- e3 h</code></pre>
4 J! R6 Z/ U% J# J<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
; Q2 L6 n6 ?4 Y<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
( I( V) w; {6 f( u c' D3 K. J5 p4 p<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
+ R" \2 a1 o9 a. H<p>复制代码</p>1 ~% @6 A! ?3 V, n6 M& E
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
+ P1 y; }" m( w/ _( a</code></pre>
7 [2 F5 [# K5 k; |6 _+ r( {, u5 }# P$ F. s<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>8 g1 |* P) T! P: m$ |
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>' @2 K- a. r0 h0 t, m
<h4 id="sideoutput-分流">SideOutPut 分流</h4> t; {5 s) O* X/ y" n
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
m8 \2 M% }) g; Q<ul>
x( Q j$ T% ~1 Y- k<li>定义 OutputTag</li>; X1 }* i! J% F \
<li>调用特定函数进行数据拆分& V5 n9 A3 \& Q; q; k2 q
<ul>/ D9 R) o4 o" }, K' a) X
<li>ProcessFunction</li>4 n0 K9 A1 S& U- k! q' V
<li>KeyedProcessFunction</li>
" g. t! w* v+ |( o$ g! V<li>CoProcessFunction</li>0 o. L+ c; _; C% s/ ?( N% F
<li>KeyedCoProcessFunction</li>
0 V( Y |* v5 {: r& O: U<li>ProcessWindowFunction</li>6 j M. ~5 e8 r( u% \
<li>ProcessAllWindowFunction</li>- @1 g/ A- n* c+ Y6 _
</ul>
% R- `* N8 j+ J! W8 b</li>
. F- r3 q( n0 _& i2 y7 g1 Y, F) {, e</ul> b6 w$ W& f G* X" ?) ~: [
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
: y; i; a! U% v<p>复制代码</p>9 C# n, \# B- ^# Y- {. q2 c
<pre><code class="language-java">public static void main(String[] args) throws Exception {
0 b( f$ D9 I( b0 E
& f2 ^1 V: T& G3 ?. a% ]4 E; L
: t/ Y- T9 ^1 A8 O
+ e/ u( C4 Q2 x StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
% E) n- P8 v, w* a4 O z
) A( e( R& \& U( y q9 I5 P8 q1 Z //获取数据源
$ f) I# k% U3 g! w
& m2 t E. d8 K7 X List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
: Q4 b; D! P8 [) F1 K, @
3 j$ B8 j* X! M: D data.add(new Tuple3<>(0,1,0));; J8 l! a; p3 d5 P/ ~5 q1 j
3 b, E9 t2 x) N0 e' k! T data.add(new Tuple3<>(0,1,1));. _2 j7 ]5 T5 e# c6 A/ b' S& p
! w1 S5 m1 P% P: u! W/ i
data.add(new Tuple3<>(0,2,2));
0 v: k) O- p8 I4 C9 j1 R4 B2 B9 a% K' E7 w0 o M9 n8 Q
data.add(new Tuple3<>(0,1,3));% [, {4 W" e. x6 Y
5 [* y2 s: l, ~- }2 ?
data.add(new Tuple3<>(1,2,5));' {8 V& v: d. G' K2 ^
' K3 R" e9 J7 h0 G; A: ^5 v data.add(new Tuple3<>(1,2,9));, B$ c; T/ a. c2 t
5 n( N% P) ]; Y" Z
data.add(new Tuple3<>(1,2,11));
: f* N( H! y0 T; j
5 g# _; v2 u2 B8 T data.add(new Tuple3<>(1,2,13));+ p3 B6 @! s X
" z! F! Z/ e: |' B2 I+ f5 e
, ^5 _. P$ s' z% S
+ o: @+ X4 q+ ?7 W' k# q* |$ l" ?& j3 Y. `" d) s& h3 k
3 t/ w# r+ v# R9 v5 w% i3 e } S DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
1 e, C- F' U- P" |& t! x2 p& u. |! H# X, K: x d3 ^4 L) U
, K" s j* i# `' @" u
: Z% q6 M3 K. ` z
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
# f4 C* P$ M, N; c! Y4 y# O+ S# |- V6 F d0 J
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};1 K# |; |5 I! B$ v! @
U! A5 F0 h( R2 ?( J
: W1 x6 M3 K/ E: k' I6 w6 t5 e' h9 I2 K8 m; ~
9 G# Q6 K! ?2 V9 ^, y% t
5 s. a5 E2 G, k) ?- w, Y SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {* t: a* {5 Q. e- L" E+ o* r
# r' t1 n1 G$ b1 f0 N2 p @Override. I8 W$ T: L7 `' U" U: s1 o, |5 W7 c( z
7 z- w0 \& F B# y
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
- S' M+ F& Q1 B5 G$ c" Z' [3 A) y* T+ u* z
' R# O. C8 M5 m3 z5 t1 m
% ]0 R0 X, S5 N if (value.f0 == 0) {
8 M3 i# b5 `# C4 D. i% f1 n5 O$ M8 }+ p4 S/ x" b/ K$ D' P1 I: X
ctx.output(zeroStream, value);; N N4 h# o' p$ C
3 I9 ~: Z F4 @, c/ I } else if (value.f0 == 1) {
]. a& R9 o4 K' \6 H: r
/ D2 R8 w6 O# u/ J, h) g ctx.output(oneStream, value);
+ }8 f9 q( k" ^+ p
' p- N$ B; X( ~% r }4 y7 v" Z& ~. w z4 ^2 K1 P" f
! o' K6 x/ \( I* |# j! o0 b }, F- c5 r# s U( T* a; ]4 r5 C+ A& ~
\# _3 e3 H0 U( \- ^0 L6 a });4 B; g9 B' k, @
1 ]" d; `' G- ~: j4 ]
2 u# `6 h( u0 Q3 e2 z
3 o4 Y: F1 E5 F DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);- E! l7 V3 w" A6 |+ r
, M( L$ m, x) d4 X% I$ }1 R ? DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);; Y' d9 T% a; F9 v3 T
" \6 x& ^! q0 ], O
7 y! G y, E: ?. b/ O$ g$ {" ?$ {; D4 m+ ?% I* b& L& j& Z9 E
zeroSideOutput.print();
7 I9 E0 f, ~/ q. ]9 R- y/ _; Y! N, m( Z! V+ i4 D' D
oneSideOutput.printToErr();2 `: i3 y9 A R% v. Q1 h
9 _$ z9 k$ ~! N, t
- W( [& }" Q5 }( q! r3 t' i0 H% E3 x0 r( @% K. N! D. U
1 t8 g5 }' `6 ~1 ~: R. ^$ Y0 _
1 x' y( N, [( T: L2 R9 @/ j3 ~ //打印结果2 `2 A: S. ]5 E/ t; z, x ^
! }7 c* P, ?/ F String jobName = "user defined streaming source";$ a+ r' ~% c k( g: Z8 P% I. r$ z: l9 ?, e
% D$ P( G+ [, ]! ]7 K" z' w
env.execute(jobName);
9 c7 G6 E7 c+ N8 v
4 Q4 I7 r! [; g8 a/ ^}
. g- Z- S P k- E2 y</code></pre># O* I) V/ L0 Q' `8 |; G
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
% w; E- l- W5 v, k- x# J<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
9 J( h& z; n- G/ f( S5 e; ~7 ?<h3 id="总结">总结</h3>
3 z% k i$ X3 d: S) U! I9 r6 R( M0 q; t<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
9 w% Q" P: G1 Z" v4 n& w; ]<blockquote>2 L D. s0 k @
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>+ B$ ?$ Z- {6 K L p$ m. ?
</blockquote>
4 t; \. l( N# {0 {/ y) N/ \! H5 @$ f; H4 U: M
|
|