|
|
0 z5 T) F# N) X: c/ j" h0 {
<h4 id="flink系列文章">Flink系列文章</h4>
) F% y# M$ q' x' e& [<ol>
2 e6 u, B6 u: h8 X% `<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
$ P4 B) \' S9 d& \, F8 ]8 n& }<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
. l4 U% }% U3 K<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
' n: [+ ?& ]$ p( P5 H+ E; j<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>. |( K* N- O5 ~, c9 \ x
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
9 [9 e% v3 t3 d, L$ L& k, b<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>; l4 f+ F2 r0 B2 a2 G+ Q0 v
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
5 Q1 X. I1 Y* a. w' t3 I" H3 z<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
' r- S4 U+ \2 U0 O; p<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>$ p* a+ k! F" F. x( n
</ol>
+ b# x9 J' P0 a& p# |<blockquote>. D: b+ t; N) ?$ d
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
8 G( t \& A7 m4 }" Y</blockquote>( Y4 E7 E5 K8 k$ d$ P' ]: a6 ~
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>1 m& y) V ^2 c6 L0 y
<h3 id="分流场景">分流场景</h3>, E+ X" ]- u3 Z, c, H, s& A2 F
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>* V# Y1 A& \7 g
<h3 id="分流的方法">分流的方法</h3> Z+ S5 ~( X) [; @% ^
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
' K! W+ N3 e* m<h4 id="filter-分流">Filter 分流</h4>$ D- y! v( s: s: c) V1 v7 O2 |1 _9 g
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
: v& U) q; W6 e; T0 F2 D<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
3 @! g2 W% W7 r6 S& t# |<p>来看下面的例子:</p>) ~( [7 ]2 A# O, g4 z
<p>复制代码</p>3 S% r. e1 W4 ~8 ?
<pre><code class="language-java">public static void main(String[] args) throws Exception {
" g- x; I8 g( d3 Z& u StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();- o0 s; H% v: Q: E5 T$ J
//获取数据源" V9 g; @4 _' b( |
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();- e Z9 W! M! [; Y2 }" O
data.add(new Tuple3<>(0,1,0));& }8 X( V" J: Y+ x' k) _
data.add(new Tuple3<>(0,1,1));4 p* n; O% n4 s# \
data.add(new Tuple3<>(0,2,2));
) S2 |1 Y( B8 n data.add(new Tuple3<>(0,1,3));
2 w3 Z' Z; q! V4 G' o) A; c% h% T data.add(new Tuple3<>(1,2,5));! W5 `0 w6 p/ f9 s6 ]7 e
data.add(new Tuple3<>(1,2,9));
7 x/ \; h$ M7 f! S- C data.add(new Tuple3<>(1,2,11));. f! c0 R- q& m V
data.add(new Tuple3<>(1,2,13));6 V- k9 r, x" E1 P* `
) E! v0 g. o! a
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
/ F w. Q P R) I/ @2 Z- V' \/ b' u* j3 f) S
4 M' S0 H0 c }9 T9 c7 ?7 l; c9 g
; M! w, t4 G+ _9 m" x; W# R
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
/ q0 t# x* V/ j) P, W3 k5 x% c/ y# z) [5 s: ?
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
( d! R, L- L& y' Z5 x9 r& s, Q) G1 _# K
3 _! D) s& k8 m4 g; ]' M, H% O1 X
1 s8 I# V" |9 n8 V: E
zeroStream.print();% `8 ~9 p8 } {
% B0 z' C$ l+ G6 `
oneStream.printToErr();' N8 @! ^& X; f0 f" O& i
* L: J' {' T- E4 T/ G+ W8 a9 V% `: U& z5 _7 L9 Y4 e
+ U8 F: }; k! @" F9 }+ C6 a5 A' W
2 p# A7 G9 s( a3 y9 L4 l1 s8 f3 f. e5 |5 m7 b" D% w* c7 E
//打印结果
8 T) [1 U. U3 a; y0 Z
4 b m5 j) H# l! W" n) }$ Z2 w String jobName = "user defined streaming source";
1 Y4 s! t3 K C, g: M8 B/ N2 Y% W+ q6 b! l
env.execute(jobName);" u" \1 j) b% R( h; u$ |- z
/ G i" q# @9 X}
. f' `# Z7 `* z2 |; T9 q# K</code></pre>* R8 K3 ], {4 Z2 h1 y
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
I# b. ]' b* s! J! F<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
: M& e. T9 }7 ]" `6 Y- l* j<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
' P0 K( g5 K* n- v4 s: O- a, {, B5 `5 u<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>7 O+ `. s# o' ?0 [
<h4 id="split-分流">Split 分流</h4>2 r J9 t% Z. n4 i- ]3 V
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
# w' @$ F' N- \3 E% G5 J<p>我们来看下面的例子:</p>
' g5 z$ C) M4 O6 F. g- A<p>复制代码</p>
, w7 b& H- s& _, G& g5 r" e' s* ^<pre><code class="language-java">public static void main(String[] args) throws Exception {
' }4 J# H2 K+ K' M+ R# Q3 }9 M0 ]# W
: O! h/ y3 I' q3 b& }4 h" `
+ E' h% \5 M& O0 V/ L2 X& ]( A StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();$ v2 u0 q& B' `
{ _: D% i5 S: E: j8 a2 _ //获取数据源
' F( } @& f \, D4 w, T' b# P5 Q0 ]. [9 l
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
, {) t9 C- z9 [6 N K* `$ h( W9 t
5 ]( h C$ S. A data.add(new Tuple3<>(0,1,0));1 g6 _: V9 [* |/ `: Y
8 `; W2 a1 G, E3 f$ P( k& b data.add(new Tuple3<>(0,1,1));
$ l: [: U/ X# `' b1 N3 Z' V) O
& F' @# B0 p, Q w* | data.add(new Tuple3<>(0,2,2));
$ O5 [, J8 M* v7 G9 S1 @9 j
% p3 \1 @7 d* R$ v' Q2 r data.add(new Tuple3<>(0,1,3));
8 d2 j3 G7 V# w& }) [
- A- [- ?5 u. ^) D data.add(new Tuple3<>(1,2,5));
2 l Z, P' W" |+ [- z
9 R* m4 e1 ]3 j8 {7 U. R. t data.add(new Tuple3<>(1,2,9));0 u* j1 s$ V6 I& u7 _3 z
. e2 [2 k# w; |2 |
data.add(new Tuple3<>(1,2,11));
+ X- n% A& x; V- l$ P3 ~- ~/ B& B" A' v7 J9 p: N1 f& n: m
data.add(new Tuple3<>(1,2,13));" l! |. T! F; u% x7 E) i$ B9 W% O
" O, ?8 j/ N# _& c$ a; w
: c3 s, ]! ~; w1 b* K
8 o& F4 K% f% n! B5 ?
4 W; n9 `: r+ S% f& E' W* m( j6 A3 [7 O( y+ N" z) w7 j1 E
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);$ n, ^) S4 u6 j2 Z
~1 t& l9 L4 U
* f! s+ q7 n3 D4 s6 W& Q9 }
0 @3 B, b5 A+ V, c8 \% k
0 f$ C! L+ v2 F3 G
3 p0 f+ ~5 d" O. C SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {8 I4 K- N7 j! M4 h' Y* ^' [7 x, k
7 }0 i0 B1 @, b, |# x& ?$ O
@Override q/ p4 Z/ N6 x8 Z2 Q7 w
! W9 U7 S& y1 s/ H8 I$ m
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
; H9 M; F$ Q% B0 H
* q- j' o: d: J7 Q. E List<String> tags = new ArrayList<>();
8 u! ^0 o' J b2 J/ `* L
* Q' I# `# K- O2 h4 O& W if (value.f0 == 0) {
+ }* `2 d) f" [6 Y1 [% P' Y! G( z# ?
tags.add("zeroStream");
# _1 e. k* S2 q) v
6 f5 E8 t% H3 E7 m# Y } else if (value.f0 == 1) {
! k4 F2 g) p- u% X1 R0 a0 c! K1 G
" c. Y1 j. @9 u) Z tags.add("oneStream");2 M' f' X0 S, E1 |# s. L S+ i
9 _5 g9 a: ~* {3 y/ A- l
}
. ], ` x4 c. g9 u) h& x. n8 q) W7 U3 r% J
return tags;
# B) S3 U" X3 s' @- T. D
& m! a. d" E4 D8 e) c4 c" z }
2 M. e! q; S, \
* ]% s; T) @' l3 v });
! t8 r d8 p3 \1 i G4 e2 l1 g
- p3 j0 z$ ^ |* T
0 P& K T* e) S- ^1 [4 P( K) d8 ^; P' C# D- C* z
splitStream.select("zeroStream").print();( X: i# ~0 `: v' d! j/ n" M4 I2 ?
/ o6 M% X- r1 w$ X; G+ x$ `1 i splitStream.select("oneStream").printToErr();
, a$ T6 d$ }2 ?" h5 ~. y7 N5 L1 b* F' T3 |6 r4 Q
' `; r2 c/ N/ W) T. t1 r7 q7 u
0 W6 t* ~+ c c6 [7 @# s! | //打印结果
+ l3 ?# _7 m! r9 k$ F
8 ]* D5 c! z) d% g+ k String jobName = "user defined streaming source";
, \) r" O% K; d8 @; A: }& \1 ]& w# i C- V8 I
env.execute(jobName);' f+ K+ M" M5 K
2 _2 s3 C w% V& C}
. U% @. K8 O4 m</code></pre>" ]2 R* W6 |! h; G1 M
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
1 j) p, S7 b9 ~0 [4 f) ]9 a<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>- u5 Y$ E8 G. _1 w- m. M
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
* }! i, B# E& R7 p<p>复制代码</p>
; V8 [ ?- [* L. d<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs., y& A' a& D1 X& |
</code></pre>7 i* e; I- @" }! g) [, h
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
1 i7 P& O( Y/ U<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
* m6 S5 W Q3 q( i1 X) l- n<h4 id="sideoutput-分流">SideOutPut 分流</h4>
* P4 L8 K" S, t8 l6 p<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
1 n9 C" v/ E6 g& M: g! j) f<ul>
8 e, v/ d! r9 u<li>定义 OutputTag</li>
. E, v* a. [; z1 s2 Q# S<li>调用特定函数进行数据拆分
; o0 w+ [) Z. }# A# d7 ?<ul>4 L2 I: z1 K( f8 A$ r
<li>ProcessFunction</li>
0 y6 O! W5 H& B1 U8 m" W; u<li>KeyedProcessFunction</li>5 p" G2 I+ F, |5 { ?
<li>CoProcessFunction</li>
: s- W% i+ c' U% c, Q& p! X<li>KeyedCoProcessFunction</li>
/ G( O0 p, }' c, O! d3 B- c9 d<li>ProcessWindowFunction</li>
+ W0 {- r$ n/ E& O L<li>ProcessAllWindowFunction</li>
) V' a6 E! y- \2 Y, Y8 @</ul>4 B, J: A/ b3 _- w( c+ F: J
</li>
( ~" U4 ?" C0 N( z6 {</ul>6 h0 o2 q8 N2 r4 Q# w2 d, r R- b
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>$ i6 ?4 X4 H; P V/ y5 ~
<p>复制代码</p>
% u) c) j7 t0 Z<pre><code class="language-java">public static void main(String[] args) throws Exception {
: V5 {8 ` [, r: `( _. n4 P7 P0 M- n4 T6 Q. i
5 C9 ^% Y5 B: h# D0 i* t
' p2 v4 \9 i O" I R7 A StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6 Y9 F' p9 H- s# m q$ z3 E6 _# E1 l
//获取数据源/ t) I! c/ s: Y, ?9 p! F7 B
3 t5 H; E6 M4 V3 ?* V List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();6 u( _; S; N$ S' V' P1 L# K
* L- t1 J; S/ |0 O$ z$ i r9 o data.add(new Tuple3<>(0,1,0));
6 h0 Y" D. J+ L0 ~( ^4 I+ d% f7 i$ m& i; s& y
data.add(new Tuple3<>(0,1,1));
" \/ N$ Z% p; R7 e% i$ {. d( T# H3 _; \5 a% O
data.add(new Tuple3<>(0,2,2));7 N) J+ A% _+ c. c b
/ x# F' k6 G! t
data.add(new Tuple3<>(0,1,3));6 ?% H0 E5 W3 p7 n- m
- W. t/ L, K/ j( g& _5 |" Z" c data.add(new Tuple3<>(1,2,5));
% U7 r- c- M7 d" G8 {1 J
2 z* X3 q! {0 O t data.add(new Tuple3<>(1,2,9));5 K8 O1 J( v- ~% d5 `" o3 }/ e x1 L
- g, `( M4 W' i( Q+ q4 s( L1 W
data.add(new Tuple3<>(1,2,11));. }5 i0 [. ^* A# s/ z( {& ]5 C
" W |6 c7 R" _ ~
data.add(new Tuple3<>(1,2,13));# Q: k$ r" J9 p( n3 u
! M8 _5 u$ G; X5 o
, {/ Q4 l# O# c7 @, A8 Y) c; s0 [
1 d% N( V$ r$ ]# M* t- w
9 c! G% R' }% n) f8 m& T; y1 ]0 w2 [; |7 C" g* S8 F* o; o
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);0 _: H- H. q* i p7 ?9 x
" _4 N9 C. S. L1 X0 q
2 G U; w, g( Y9 c: B! e$ L" _0 q# e* n8 X+ i
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
9 ]6 S" G5 w, L% m1 x% b# j/ m$ n' i6 G; O. @) P) L% i0 T5 o, E
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};* {; n% C( c. a2 H* \$ ]
/ z1 G5 E/ @& f$ |7 `# z; a) m& J
* u5 W* K8 V: \% v' x' ~5 s# M5 Z; V0 G/ u2 n" T
: [7 Q9 o3 L* X& S* n
^0 }7 L( d9 k SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
$ t) d1 @! a" I- @4 S+ N- S
7 ?/ L5 E- D& U4 `$ ?& h5 r/ X3 z @Override
) A% Y: z F# |6 L& D7 i8 E: P- j
- ^2 d8 k6 a, M U public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {4 v! A+ A0 h' m! b4 L2 ^
9 Q$ M W0 o" \9 W' ?- r5 o% f/ z/ k* B+ C: w
' L8 Q, H0 J2 @" Q6 L1 K1 h& u
if (value.f0 == 0) {
/ P7 O @1 d+ I0 P3 |! ]
* {* j0 ~) Y: b5 ?8 V) @% B/ Q" a ctx.output(zeroStream, value);
+ L h3 b4 U( I8 g$ N1 r' ~
; q; d( ]: ? ?: B+ i } else if (value.f0 == 1) {8 o* u# H/ M+ O, u% Z; ~
5 ^# Q& C$ ?' E5 N" w1 g/ ~
ctx.output(oneStream, value);0 }+ F: j2 h" @8 e, C5 m
, ?9 L& v5 \" Y$ y" {4 h! T
}% {* M( A' p: ]% D
3 e ]$ z# j, ^* j8 S s+ C; ^1 O }
. T4 }. q1 S" I7 `& ^% G8 z+ a8 K! g, K3 o d
});
+ H2 g, x3 D M0 T) g/ k
3 d$ i G. a7 V0 r0 |* k3 Q* R4 Y/ Y8 Z# t7 E9 P- `
6 b# `" N: R1 o, K% ^; U( V
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
& \$ E: d( x4 M" n6 |, X0 o( Q: M7 C' Q1 c ^" ~, b6 T
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);: g6 s$ K4 J6 }
2 L1 W4 u7 k4 ^
4 B' Y7 j) X$ C" @/ t4 `6 Q6 h7 r! v$ w) G# k0 ^. o9 Z
zeroSideOutput.print();% R- I- o& C" d- b1 t8 X
Y# A( q W4 e. r. R& l8 Y# B
oneSideOutput.printToErr();& n# {% [* q- ^
" L C0 u l2 B* l* I1 w/ s5 \( F& Q% X' s e; r
. T1 j/ O4 L3 V
* H1 {! {: a5 h- _- U N
4 I' a2 G, ^ x //打印结果, M& ?/ d" V. |/ k, x
' a) i" x* N/ a/ c String jobName = "user defined streaming source";
8 p2 q% F/ u/ ?6 H' Q) h Z4 t' {( H; \6 d3 n# @) D8 W
env.execute(jobName);
6 F5 i5 ~) G8 A. V7 W% E
0 [9 w# v1 b7 {/ O7 T7 l}: m& ^/ S6 h2 i# b) R
</code></pre>$ C# b' ?% Y) Z4 ~( E) r
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>* b% ]! c s5 X) |6 z! g! w4 S& H0 q
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>/ R! ~ y, e: ^/ K8 r
<h3 id="总结">总结</h3>4 z' d4 x" \; G5 m$ o6 A( ?- g
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
" Y" T l' ]) K) I# e2 G9 {- t<blockquote>! T! C/ F* z( O
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
( z' v7 V% f& C</blockquote>
, B' V9 G9 r, V4 T; a* I$ o* g" G+ e
- l, v% t( f* ?) S |
|