|
|
, d( e* B$ r5 u$ p2 c' [' I! ~
<h4 id="flink系列文章">Flink系列文章</h4>" Q8 X7 s* h K# P
<ol>
' x7 G" x: ~. i<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
, _7 o2 Z$ s. ~! P) `- u! s) k; h: r<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>, D$ L* M c5 W6 ?' V" @
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
+ z; `1 Z) j! ~+ C. x3 ?<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>& G" {+ F5 W t8 y* w/ T+ T6 p
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
/ m1 k5 }: D( w" o! q$ F<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li> z/ x( ~: N" V$ u
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>0 k& f/ y( L5 J, Z* X: a7 l
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
( ]3 N6 S5 s, I {5 D8 r% Q& H4 _<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>4 o/ f( S$ [+ S' a- F
</ol>
2 T+ W/ b% }3 R1 K+ F<blockquote>
" I& X0 i8 a2 ?5 ~9 u<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
7 g8 _) x' L( q+ A& u</blockquote>% e8 |1 @. G: w a, g, b; d
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
8 t4 `3 K$ o i<h3 id="分流场景">分流场景</h3>& F- @0 V& w# m* Q
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>( v/ s' y6 r; n" Z( J ?
<h3 id="分流的方法">分流的方法</h3>
" A% i0 \. U3 x! l<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
; l, C8 U) {' |5 h$ G. m' W<h4 id="filter-分流">Filter 分流</h4>: n6 b1 g" A) d3 [3 C+ I& [# R8 k
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>! \$ x7 m2 ]7 t r/ U& y- ]9 j
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
6 B, c+ X! ~$ ~* F2 g<p>来看下面的例子:</p>% q5 R2 l) ]% S" V2 U
<p>复制代码</p>; f' F+ J. l6 S; A, P: E/ n
<pre><code class="language-java">public static void main(String[] args) throws Exception {
' o8 F& b* T7 N& m StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();$ R" B/ L9 n' ]' T5 ^/ ?- X' W% `
//获取数据源0 ]' p8 K6 }6 w' J- ?. D9 A6 N
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
% W5 k' K; l) t% B data.add(new Tuple3<>(0,1,0));
2 O9 ^- d' e# O/ r$ m data.add(new Tuple3<>(0,1,1));- ^3 ^& S: N* ?& H, P
data.add(new Tuple3<>(0,2,2));3 t1 A- d# ^6 [5 x
data.add(new Tuple3<>(0,1,3));; q, I2 ]- {3 ]0 Y2 l
data.add(new Tuple3<>(1,2,5));4 \( x3 g) j( {" y9 |: B
data.add(new Tuple3<>(1,2,9));* s( S9 l" w* B3 n
data.add(new Tuple3<>(1,2,11));8 o) L( ~$ H6 ]# f, U* \: _
data.add(new Tuple3<>(1,2,13));9 R7 E9 }. q' z/ k1 _( N
4 W; S- u0 k+ g0 A# n$ |% O8 X4 V DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
5 Y t+ u8 f! K3 I$ C% c/ f+ q
+ N8 F" A2 Y6 K' C8 W
s P) e2 y, }6 i5 v" o+ J; w
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);: `/ f) Z; |" L$ {! b& L; R
! {8 ?: f% W: v: {6 }
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);9 q& P* S: O% ~+ T% Z
2 h9 O/ T' ^/ H
2 @: _+ V1 v+ {5 |% b1 X! _ |' P1 Y* r9 X$ V' v
zeroStream.print();
, C8 g* W8 g( r+ g5 K5 z1 k! U* D& Y" g: f8 G9 @) o: N- S
oneStream.printToErr();
/ Y% L$ ~/ n2 ^) W1 }- |1 _1 \, j& i2 ^6 N& [" B C( k/ t
# M' f* d( c4 b. ^" R5 j& X" N6 r' N6 K4 q V$ v6 H% Y
, m) o5 y3 q' m3 S9 `
7 a: `3 m/ J/ `& z4 E$ S* s //打印结果
4 r3 J9 N- D* t2 I# B
* i$ O ^" W) a; [) p9 d( K String jobName = "user defined streaming source";1 i3 {4 Q4 y5 U. g" |9 D
3 @- Y- d2 l; i* v {0 G env.execute(jobName);8 P5 S# m$ |, z7 Y7 N
8 X* N9 Q2 ]6 F4 j$ f4 I$ ` w
}
& F( q0 \9 O0 O7 g; q</code></pre>) o' d( }$ Y/ c+ ]
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>. k- y" L6 ?& \- y. H
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>- G) I! D; \1 J
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>6 K3 J/ G! f/ A7 n" i- s
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>7 x6 O. r, s) T& _! S
<h4 id="split-分流">Split 分流</h4>7 m7 D% R( u/ b) C9 G8 i& \4 P/ V
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
/ Q% w5 v' N/ [$ w6 m7 ]! q<p>我们来看下面的例子:</p>
! {* u8 f7 v7 K<p>复制代码</p>
2 X. s: c( M/ P a. C& h# m3 A<pre><code class="language-java">public static void main(String[] args) throws Exception {
" d% {: u& a P- |0 @7 p
# ?% |6 |1 |8 q, G* P
3 u* g- _: |. E& \$ w0 }7 @
1 _6 R" t( j* F- r2 e3 j# R StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
5 o+ i( H w% s0 g9 d# ^2 \5 F& ]! N8 ]( T/ C6 \* u" M! h4 c
//获取数据源$ Z T5 h: `; W F
, R0 [; w4 S" b0 V5 t
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
" O6 D/ F' H1 B8 |9 s i# P$ |" U' I& {: s
data.add(new Tuple3<>(0,1,0));! W L( G- u h4 {4 w
, a1 Q3 @7 |; r( M6 m4 t: v( Z data.add(new Tuple3<>(0,1,1));% j8 g. U. Q6 s! w9 T
1 s) D- { {$ J7 s( _) `1 } data.add(new Tuple3<>(0,2,2));( f' ]6 b) X& H$ M
& Q$ Q5 f; V% B data.add(new Tuple3<>(0,1,3));3 B( @6 [% D0 J8 m8 @) R R
) ~7 f2 b/ n W& b* Q1 Z
data.add(new Tuple3<>(1,2,5));
: ]6 }5 j2 |/ S2 g8 y$ }6 d1 ?. g. H$ S
data.add(new Tuple3<>(1,2,9));
' u! s& H: I- f( F1 ^* a! X) ]1 h2 s
data.add(new Tuple3<>(1,2,11));
% M- N, {' x! T; e" m3 ?+ z; Z
: O# d9 A/ { N2 ]/ r+ N data.add(new Tuple3<>(1,2,13));
! D# n5 b, N6 D! u. S0 @4 h- X1 p$ [ _+ i8 Y( s5 M, E6 E% Q
7 s1 p1 n$ h( H r: Y( h/ {3 {2 n9 x5 h' C
7 L+ f, _) Q0 J3 T+ t2 r
S8 s7 w( G* L _' G DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);* P: U' x" I* G( q/ Z1 x
O$ u$ \ ~% H) R
3 Y" ~4 z& s5 V; b0 C" Z# U+ h- d2 G
8 ^" Q/ ^# @$ J2 {
$ I5 x( w$ `2 W2 n/ R
4 q; U5 m3 o1 x# X SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
; p2 F% k8 ~6 g7 w9 |
. D) t4 s+ n+ p7 w @Override
& I9 e) c8 D$ d; u+ c1 n; K5 E- Q1 l3 W) w
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
6 l7 F9 r# j; I. l6 o R5 E4 N1 a( R8 x5 l9 x
List<String> tags = new ArrayList<>();
5 U, j+ a- n; }7 w
$ V' Z$ {& s; [' p0 j2 w, F. { if (value.f0 == 0) {
8 ^/ X8 Z p5 ], R6 E: ?6 t
' e6 H( H3 Z P/ d/ j7 P tags.add("zeroStream");
' O1 q1 Z: n+ B) v& Y
: `6 X1 I) m) p- o } else if (value.f0 == 1) {
8 y& ?7 v1 f2 P5 N( S! l5 C- k/ }
tags.add("oneStream");; D4 m. f+ B2 G, f- f5 h' g7 h
% a- l; {2 q+ k& G3 d! z
}/ T+ t# O+ N/ Q+ t9 v
# k7 D6 v6 Y+ `. v, t, }4 C
return tags;* w6 R; D3 V+ z, u( z" E
. m' B" K+ h) Z% N/ V l* Z }) r& R1 G% V4 j- p8 x' B5 F
8 W F2 Y& \7 s });1 D ?+ E c" ^- j1 t) F7 q K0 b% I
6 E7 ~3 F) p) U9 ~' x
8 Y. f% ?: N5 ]7 j9 R/ J3 S- o% n* [. w
splitStream.select("zeroStream").print();
/ ~7 p( f/ q% M2 L* n9 [% W: }
" O$ q# ~6 J3 Q; `( m3 b splitStream.select("oneStream").printToErr();0 D( J- @: O6 Z. e5 \8 A
* S! J) }; F: i& G) T. m
) @8 y0 H, A% k$ Z0 L; ~9 i5 o( G; ?4 V) z4 O q$ x8 ]9 {) u7 O1 g
//打印结果
4 v% H! {0 i7 ~/ w
! |; ]1 j' e& ?: J String jobName = "user defined streaming source";
, V" ^' @4 E/ Y. Y: E: `6 q3 Q$ N+ U. L+ F5 N
env.execute(jobName);
( x6 Q- H) Q5 l% x7 U, S' p4 W
: B' Z: n1 `, K}0 a- K) j# U, E
</code></pre>1 e, @& Z3 R- W$ N
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
- C% j; o$ M0 i4 W# u<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
7 M3 Z7 A( x3 K* x<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
9 @" e. J# U( o! w5 i: o; {3 U3 q7 d<p>复制代码</p>
2 K( Z4 \' V: L' {4 h<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.( g4 Z) i) I; K( ^
</code></pre>
# K3 q* g2 c3 x7 ]<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
' U$ P y+ X# s3 s/ R: e& b<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
3 Y8 R- P# I {" a2 {<h4 id="sideoutput-分流">SideOutPut 分流</h4>
3 z# n- E1 j3 x7 x: e<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>* `/ M6 H; m; ]3 A* f
<ul>& U6 O0 s7 \% o! E$ s* H
<li>定义 OutputTag</li>' X8 N( \) `2 Z. v% f) |
<li>调用特定函数进行数据拆分+ t* a2 _2 L- C f9 J$ N/ H
<ul> P1 ]7 N! k# k' L- p5 k" @- k% u
<li>ProcessFunction</li>
# r+ g% i. N& m) O9 W+ U<li>KeyedProcessFunction</li>4 {, S+ C( D) I" I# I# _, Q
<li>CoProcessFunction</li>
/ U6 \8 N7 T$ ]1 Y/ B<li>KeyedCoProcessFunction</li>
* H# R& ?! O$ }% p2 p* T# V<li>ProcessWindowFunction</li>+ [' _4 B+ ^" E# J
<li>ProcessAllWindowFunction</li>
0 Y0 n7 L6 F8 R* F</ul>
) j G8 Q, O! i# r! h! r</li>1 r9 U% C: U/ K9 l6 i% f
</ul>
- t( z( N" i2 Y) O4 P* Q$ i<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
% `% g, w4 D3 `! Y- h9 ]* d, a<p>复制代码</p>
' M! F$ W. N: Z2 t2 ~5 }. }<pre><code class="language-java">public static void main(String[] args) throws Exception {
1 w) U6 U h |$ k; d8 r& |2 g; @) @; ~. ]; R; b5 K" d" j1 _
9 A: R* s* t9 G$ p+ q
K. Z1 A# S2 M; T StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();" h ?+ ]+ j6 ]# f' M
3 g8 r& A; l5 j
//获取数据源5 [; f% R* C2 g$ ]+ U; t
" [; G# X0 \' r! [8 b0 }
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
3 f1 G% t$ A- p& T+ D
( Q9 `9 c; _' ^/ l( d data.add(new Tuple3<>(0,1,0));
/ ~. M {1 p9 j: i. [7 `, n8 n, x: x1 e' Z! Y V( v3 Z( b1 r$ J
data.add(new Tuple3<>(0,1,1));: q/ q; v0 W0 K: M# R+ q
. w- T% a7 ]7 I) S data.add(new Tuple3<>(0,2,2));" |! g' T b9 I: S
& {/ ?' D+ r: ?0 r; s
data.add(new Tuple3<>(0,1,3));
! E) ?8 ^ Q- Y5 Y3 @3 P) [2 d' A
3 i4 d. a7 ~2 b7 n- H data.add(new Tuple3<>(1,2,5));2 ]0 P' c' X P* u
& ~9 d' x; z( L
data.add(new Tuple3<>(1,2,9));
. t9 D2 O( K& A5 Q7 j& W1 R/ c0 N' J1 D/ A; i; B4 p) N# F2 q5 l
data.add(new Tuple3<>(1,2,11));& n3 K o2 i' P
: T; H3 A* t6 y) P data.add(new Tuple3<>(1,2,13));- q. u6 H, C: K9 ]
" T3 o+ e' A# r+ g$ o
! T6 k4 N5 v* F; z& Z# C! f+ c
" k3 g: Z) ^5 L0 h8 U. o7 a5 q
: {; Y8 c+ m3 _: L
: Q& J* {: G' a* V( O1 L DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
: q, y9 f1 `6 u; i5 l5 P: c8 j3 {+ ?7 L3 L7 C
7 e- O$ L# ~. \ ^* }
) R& W7 ?) U1 p+ g/ t: t5 J
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
1 C2 y- u! D3 p% s% F5 }: | `$ d* j4 Z7 q1 O
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};: r, S7 i C% A* b
8 J3 }; }! O" o! \* p/ _ |/ \/ m0 i j
- t; p: C# w* `- S
+ N2 m( K+ J0 c- f7 ] s V+ R i; }& e% Y2 K }. P
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
( H$ ^8 F1 T) L- u2 b( L: A8 v
@Override
" g$ g( _. f! |9 I, w& f* C
|0 a* @! }- _0 H$ z public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
8 J2 \$ c0 F% D5 B& v7 P1 x% W$ g+ X$ g1 K% X& Z6 F
# v6 _5 ?% Z3 Y
3 R; M) \8 t& X4 H- q% u; x( e* b if (value.f0 == 0) {3 F9 v/ `) V, z
; w! r6 y6 j6 L. V/ I5 [( J, l) C
ctx.output(zeroStream, value);+ j, [4 d+ \7 D4 B
# b: |/ M! {5 e4 P
} else if (value.f0 == 1) {0 F, ~1 x8 L% }% |/ x
S+ M) S! E# J K
ctx.output(oneStream, value);1 C, V* U) x, H/ }/ j9 ^% P
6 E, w( V9 S5 m) E2 p' f9 [ }
; s# g( K2 e, G! F1 C9 K8 |- h1 x4 |
}
4 T6 @6 g6 s0 f
2 W+ W4 |1 B3 }4 g0 _; ]" C$ f });; F- r# d6 B. i% |, x: G
; x9 K7 l& T: d+ ^+ E5 r9 j( Q
# e) z" s: V% h7 z3 D- N" S
& _4 m, b4 b( ?# \- ~9 W K# n DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
9 F' `& K0 t" b6 |7 o2 p' v3 n# r8 |( F4 c' u! h
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);& [0 e- H) v" [/ d3 ~/ H
# K; n5 J' A- |& E4 e4 l d' t& w
* d' P9 n) n4 ]7 I. U
( `3 T# d W$ V zeroSideOutput.print();
0 u3 ~: T% H: w, b. k. |, S4 J
oneSideOutput.printToErr();
/ R1 D1 e! X7 g" H# \' @
% J# h2 o1 X/ Z- Z# D% I
2 u2 V; x! ^! Z+ Y- y: s' o& ?9 b. ^. v
. T! i* q {% |8 M5 M
9 u" m9 Z2 |- W. ^4 c2 ^5 \& r3 ^3 J% O; O+ @- V6 D1 h
//打印结果+ t5 f; R( S+ @% e* w. {; }4 p' ?; J
# {7 |3 z) M6 c, R* S
String jobName = "user defined streaming source";
% W# o9 H+ w! D7 [& D! ` d5 f$ e B( T" f
env.execute(jobName);
. @. ^4 C# ]* H4 f# M( e/ V4 Q) r" X5 g4 L% N0 ^ p9 H5 Y
}7 F& b4 z4 x. g: G
</code></pre>
2 C. R" f" B: e- A' x$ z<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
, L2 I; t. x1 _. t<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>' Q T Z: L7 r* C: Y8 L" s* O8 {
<h3 id="总结">总结</h3>
' B* ?5 I: m# c/ A+ m+ a<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p># l( K2 T$ ^0 ^% m0 A5 Q& v
<blockquote>, I B# t8 J1 R+ A4 r
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
2 [5 C! v ]" M0 I</blockquote>
. x4 z3 c2 U( H- P* u7 O+ `, V8 v0 j% _5 ~
|
|