|
|
: ^! D+ ]5 @" Z* e, C6 y( y* ~
<h4 id="flink系列文章">Flink系列文章</h4>) q) E+ B% P+ m) @7 B3 G
<ol>
, [$ |% Q/ \' C; K<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>- @* s5 v7 x/ }% W
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>( F/ k4 o$ s0 G+ i/ x1 Q3 f
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
8 Z6 v( u* e; [5 S4 j1 w9 R<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>( X& Z! u+ y+ f7 ^$ ?, B
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
6 }6 X6 s- b. S4 L$ u3 ]+ V<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>: Z2 ?$ r) O: z# ~" w3 v
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
' x0 e5 o$ l, J3 Z0 y7 x2 M<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>& L+ e4 {1 O& i- I' A% v! m8 ^& P
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
2 q) Y2 b$ j# J3 _6 W- m</ol>
% m) F- [2 e+ _7 W<blockquote>
4 O: Z0 Q0 n B; ~: i<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
' c$ a, z# q0 t* f</blockquote>
8 u* T0 A8 t( S2 f# }1 Y$ R' _: `<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
! h( t- M% q4 W6 y! f9 Z<h3 id="分流场景">分流场景</h3>) v1 y- o3 S5 m7 V0 H. f
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>4 d6 x6 }: f- ]+ \" Z J: n
<h3 id="分流的方法">分流的方法</h3>
! U; m9 F7 B5 X& m- r s<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>2 I" r: v5 Y" b3 y2 K2 ~ k4 E
<h4 id="filter-分流">Filter 分流</h4>
A' D" h/ B. p<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
$ s0 E+ j! m0 p<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
- [- E5 C6 z% K/ Q% i<p>来看下面的例子:</p>9 r" W: s5 U7 ~! {
<p>复制代码</p>" Q, t# {2 z& l4 I& p" j
<pre><code class="language-java">public static void main(String[] args) throws Exception {' i1 O4 [8 }' h( y' i" z3 @; ]* a- r9 u: Y
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
( V3 [' d( ]1 S( | v% Z1 B //获取数据源
" v& K7 P' \8 M ? List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
- q+ ^' U3 @1 X* _ data.add(new Tuple3<>(0,1,0));- `' ~5 H1 s/ |3 b( ?1 O, |
data.add(new Tuple3<>(0,1,1));
/ M# l( E$ b% u0 g* O* P data.add(new Tuple3<>(0,2,2));
8 l0 S7 N/ H) T, u' D; T data.add(new Tuple3<>(0,1,3));
1 u/ g$ g" m; P& x6 S3 K1 F3 Q( o) d" ^ data.add(new Tuple3<>(1,2,5));
) ? W9 j4 M- h, t6 P* M% z data.add(new Tuple3<>(1,2,9));
" B: o4 o7 A; Y! Z/ \% n% B data.add(new Tuple3<>(1,2,11));) Z) \: \; A! b3 B0 y
data.add(new Tuple3<>(1,2,13));
2 U' E, S' u% _" y9 V4 B f+ I% T9 x. F
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
4 D* T! g- m% d* B# X/ S
9 w& Q3 I) Y) M' a |- q. G4 I( p* _: C
- N$ w" d; }3 r
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
; S" s. z' f$ n1 x7 D, O- ]. n8 q4 y: B& |) S O: {( W z
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
0 ]1 n- a: e. |! m0 R+ Y( G5 ~ z! o' S8 ~
5 w1 I0 j* @4 i6 R4 ]7 O8 ^( u1 A5 B4 @
zeroStream.print();" s1 _0 I, u2 c4 S: i3 {% m
) L" r2 C6 u* w4 j
oneStream.printToErr();
+ V/ o% i( }6 m- {- ^0 l$ K3 Y
; S4 L. D |; Y8 \4 N4 _" ~8 Z; }( S2 j8 f. s! `7 M
0 `6 i* L7 n# c U8 G& |
. _' {$ c( y7 d! z. y+ }( N. R h* |4 C+ F0 K3 W1 E
//打印结果
" \2 _- {3 h0 H8 K& K4 p/ n; g% K! q/ [( n& @. x
String jobName = "user defined streaming source";
% t) h3 z8 o1 }& x7 }6 Q, o! v N/ d9 h
env.execute(jobName);
+ M. A+ X2 ~% n5 m; k# V, f' c- h5 c1 {0 T# r
}
& b N+ E( d$ B( M</code></pre>
! F, Q U6 ]" `" F& P- ]<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>" }0 B# h7 P7 w0 \1 k
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
" W. y/ _$ ^. Z1 P0 h5 c# }+ V<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>, b% r7 ?+ Z' T: K$ Y% o/ Q
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
4 p0 U. j' X6 C# [! \+ I<h4 id="split-分流">Split 分流</h4>
, \8 \/ I/ }/ ~4 _<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
) P+ Z; a, c3 e) o2 S<p>我们来看下面的例子:</p>
: B$ u4 @$ ~3 W& J; m<p>复制代码</p>
: w7 }) @6 i) i G6 k$ N<pre><code class="language-java">public static void main(String[] args) throws Exception {# U- U0 b9 d5 @9 m+ C
+ O% n' d; g4 ^
, `4 X& ~" F$ v& t$ O2 d
! @, R" D9 t5 R3 O, B StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();: T8 z* {$ o* _9 M# { t
) \) ~) u" f2 q [+ [% k K6 ?% s
//获取数据源1 e, l% d& \0 `$ r. E
I- e$ `" X% M
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();- m s+ g8 S. e! H5 l: w1 k1 d
6 |# |/ F+ F3 w9 D m# m' b1 N$ |8 c data.add(new Tuple3<>(0,1,0));
, W- P4 y& q2 ?- f- }# N! K1 l! l# {% U1 {% R x
data.add(new Tuple3<>(0,1,1));& b" U' G) M" S4 @ `& G# x
, J$ l* o: @; y data.add(new Tuple3<>(0,2,2));6 n. _9 z$ t2 U( Q3 E4 G) V
" t9 b a) C& s h; m3 W9 [ data.add(new Tuple3<>(0,1,3));& I* v7 Q' P; s
& a, T/ Y9 e: | data.add(new Tuple3<>(1,2,5));
( ^* Y2 j. a x8 d+ X) Z: g# j' p9 b, \) k) `
data.add(new Tuple3<>(1,2,9));8 s; _7 p( s; S$ G) I
( \4 N+ B( K1 R& G! L data.add(new Tuple3<>(1,2,11));" [, o6 Y; j! K4 y$ E$ P, ^! ^
$ y& F1 y+ b s! j; N' P data.add(new Tuple3<>(1,2,13));
, ^, O! c! X( u# O x- L- ^: D4 G2 d9 [. |7 X7 Q W6 c" ~
4 T9 A) j! w3 @: ^' m
/ m* ~. t0 f( r3 b5 G8 U* p
J3 N. U4 l t5 O0 [6 l$ u
- n, H. l8 A' t DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);% c/ [* B0 y+ S! D; e
$ \8 s) e* N8 W3 ?6 @/ \
& N9 M B/ c: ^" E; Q, k
+ x2 g, Z& |" z a, H, m2 b. s+ }) k+ {
4 P1 m$ {" Y9 D9 m) f
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {8 h. V. H! W3 ^/ j7 X1 k
: O7 l. W. N+ g( w/ O% s& j( l @Override
: |* f9 x; e2 F F$ U1 f2 p0 z
' b3 B6 B( j# X. i% ?# W public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {8 \& E8 X+ W& h0 j- s
+ n! Z# O k. g5 `) {, R5 R List<String> tags = new ArrayList<>();
7 b+ E( i( a; P" ^) a
* Y. r3 d4 X$ K3 U6 t; S! ` if (value.f0 == 0) {! x% e3 \. c+ p2 \' d
3 Z d9 ?* `" a% D0 ^1 B
tags.add("zeroStream");$ {# }/ H5 @; |5 U
! w+ o9 Y4 _8 Z' Q1 B ] Z
} else if (value.f0 == 1) {3 _1 O: }0 a" _6 n% c+ z
' E! w1 n( P) f1 P& H' K tags.add("oneStream");' U& |. p- k. K# I# u5 d6 a) |
5 v" P8 g# _$ X# H7 m
}
* H' V$ L6 }: f3 E3 n3 U/ S
0 O9 x* M+ b, s h, A \ return tags;0 g5 h1 }" P* O( K& D" N( {
7 B- a( a x+ b4 n2 E
}$ h3 B2 v' _8 z9 F: x4 A
8 L! l8 c5 E+ d! a% [/ w; R( q N" c, `
});- O7 m V. F1 \# l" a- [
e8 Q3 U1 p! K5 T9 Z; H8 x5 Y
8 A7 _# z7 M6 f0 Y- W
2 b0 q) A- s! `" F
splitStream.select("zeroStream").print();
. v L$ `8 w* u" w8 M* H
4 N0 g6 `2 v3 `! K0 Q+ s splitStream.select("oneStream").printToErr();
$ I$ {+ g1 s$ z3 u1 N* u7 K
% A( Q0 X }$ p
) `* `9 b( ^5 e* \. {- H4 ]+ r6 F/ L# [& h! |: t5 U0 u
//打印结果& h+ d+ p# t" A5 P# \! X1 T+ r
9 v4 D5 I8 c9 H/ @, g6 E% j String jobName = "user defined streaming source";
$ [4 r ?" t& x- U+ v) f: x2 ]. l9 t3 \" I+ n' w8 W
env.execute(jobName);; h0 @2 w; j$ K0 M* A, ]7 m% B
6 }! l9 y( A6 ] O) Q5 a
}
* X3 V# G2 m0 q {7 w9 }7 i/ w: A: ?0 |</code></pre>
6 Z! t& ^8 h+ r% e<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>; \& }0 v/ ^% L0 d- j+ p
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
- Z5 v ]' {8 }" h) [) J. _<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>) r4 W. @& q7 ^' E$ l( z" R: a
<p>复制代码</p>
" {4 I( ?+ e3 C+ }' [<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.) u* J/ n# r" I! m Y6 @' V
</code></pre>" J! k: C5 P, S: }, x2 [" i+ E1 \
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>7 Y% ^ g% C1 Q$ N" a8 R
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>9 O* X `) L% q3 j
<h4 id="sideoutput-分流">SideOutPut 分流</h4>
9 C. ^ Z' ?6 ?' k<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>5 G! f( }, ?, t0 {
<ul>
% l2 T" _: L" V' M( l; L<li>定义 OutputTag</li>8 L1 {5 J' A5 {" o1 b* H
<li>调用特定函数进行数据拆分2 c& B1 D9 T2 [8 {: d
<ul>
- a3 c! P0 _5 `3 Y, l( y7 Q<li>ProcessFunction</li>
J. C# A1 V5 k2 ~7 w1 `<li>KeyedProcessFunction</li>! s: J" G9 |- K( ?! J, l
<li>CoProcessFunction</li>
& }4 K! ~1 a/ A+ i1 O- D<li>KeyedCoProcessFunction</li># \8 c+ |/ U" l9 p/ {: W
<li>ProcessWindowFunction</li>
6 o/ F8 u, b# ]3 _$ Q: w<li>ProcessAllWindowFunction</li>
3 X+ W" X# P* W, ]# [</ul>
0 F+ p; m/ q0 w# r' j/ d0 d</li>
! Q4 B' Q+ K+ N, m( c% J4 M7 E</ul>0 `8 S- C+ H3 R
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>2 `. l0 \: Z, l$ H2 c
<p>复制代码</p>
& |" }7 {* t2 Q<pre><code class="language-java">public static void main(String[] args) throws Exception {
- l% o" A" n7 d. Q; c
- O; q' k' G1 V9 A6 a( f* Q1 D/ g9 j$ S
. A0 k# |- [7 k7 z StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
$ }. C( |3 ~/ r/ I/ v# Z' f1 x7 ~5 K. U C0 ]
//获取数据源% i" u5 ?7 T! M5 P: n* t* ]
% ?" g) r. }3 f List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
6 I# [# K/ ^ C" q q `, H' g3 P, ?* y {: \' n
data.add(new Tuple3<>(0,1,0));; s1 ]0 a- e* ~4 U" r2 g0 g8 ^! [
. H2 ]! W8 R8 S+ J data.add(new Tuple3<>(0,1,1));
4 U' O P/ b- m5 ~& v- m2 O& [- x2 D. U5 a+ ~! z
data.add(new Tuple3<>(0,2,2));
* D( _! k; C7 b$ \' D% P1 I# ]( b! @( r# T
data.add(new Tuple3<>(0,1,3));
6 O; p) f; z6 K# f% a1 Z5 `( ^1 U+ B" W# N+ }; g
data.add(new Tuple3<>(1,2,5));
$ _: `6 m3 U B: w6 J' I5 r7 R
* a0 w: r T' |1 X/ C data.add(new Tuple3<>(1,2,9)); Q2 E* B2 y$ p( x
8 y$ x7 _. G. k
data.add(new Tuple3<>(1,2,11));3 q2 G; Y0 `9 V& \" o
/ ~; j0 O. G- ^* W
data.add(new Tuple3<>(1,2,13));
% _" c* x) R* A6 l2 h) m4 ^# i/ {6 m% ?# h; `, t& U8 u
: H. S5 ^1 _; g6 B
2 H+ \" a4 x; B! W6 Y5 z4 ?
5 _: E) I8 @7 ]' }3 _/ c4 g" n* A. F9 L) J z D
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
4 ~$ f% ]# {( P) B* k, {+ H" n7 f& z
- U. v- S ^" _+ Y {' H
3 [1 d8 O& ]( t. ]1 v7 ]
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};2 j) h0 c- N. i* z
* {. l! X2 E0 G2 J
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
- `2 L3 l0 I- u( e2 M+ a8 R- W" ^& t9 b; [6 a, a, m' }
{+ o. P3 ]! x& A7 K, v' x6 N& s/ W
- M8 K* R9 K, b$ z- C/ ^: i- B) v9 w* l7 R6 p
9 Y. k# T) Q' O Y- h. n4 B* n SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {3 K8 n. O4 `* s- w5 k/ ]( f' @
" u ]( d. J! }9 |
@Override7 |; r' t. a5 ?1 }( y- a
7 R7 N1 k" x- }5 O5 F+ V/ M
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
0 ~$ r; t/ `9 k J. }2 s1 k/ S* j8 i7 A; n5 ?5 y& ~6 f
! \# u) W8 j3 Y# g0 M: Q
5 K$ E1 P% D2 n- P: F2 W if (value.f0 == 0) {
$ ?2 I9 Y& b2 J% p# k( m8 y! k( h9 |0 L" g
ctx.output(zeroStream, value);
3 v/ C4 ~ w* q. A. h% d( `( G/ J. C: ^: v1 A! i1 Z6 d
} else if (value.f0 == 1) {3 y8 r! H8 b/ m4 p t X# @3 C
8 L# u# d4 _3 u3 T" r$ P G
ctx.output(oneStream, value);
! X5 _: Z; x6 `. ~- R. F9 l: l8 y/ h u+ @/ y" R4 @$ }/ e) g
}
. Q. i8 h' ]# A3 p$ E7 n; M: K$ F& P2 I0 J! u! T
}
$ U' M* a* V! S4 q* q. V
" Y; m) ?) a$ d8 J });
; B( }$ P# p/ g) w0 Z& }: v! m* q: Q' r k
7 x! E# k; w: X6 |! |9 ^+ }+ x3 j1 x4 Q2 ^& @/ o" x
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);/ K) T' J& P3 a0 `9 Z( m# r' K6 v
+ W, d) t$ |, h, i9 n DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
0 ^8 [& k0 ?# e" o1 g
p |, g9 J6 S8 n- ]/ O& S$ u; y( L
1 |3 M, \* l9 N) G+ W
zeroSideOutput.print();3 u3 k8 }0 M/ f
% e& F- h3 F% \2 B oneSideOutput.printToErr();+ D' U5 b+ C% L6 j6 C
' l2 F# N+ W9 F8 W" U. V+ s4 M* Z
. Y8 ?: e; L6 x( Q3 `0 ]2 T5 n4 p0 u. @
' N) }/ \) r2 S7 s
! X" W$ s* g& _5 ] //打印结果" y) U$ @$ B5 v
! y7 P' ^8 h; G5 V @7 X4 p' |
String jobName = "user defined streaming source";( J, I- l% I% O% `1 ?7 ^* B+ ~
' E# @( `# }% @: s6 M8 C" k
env.execute(jobName);+ k# d: d6 {0 x3 f8 F0 b5 f
; K7 y- E' p5 Q8 k G
}
$ p4 l" \3 X, M( }</code></pre>5 ` G3 f! C: A! a; J7 F
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
; O; U) w/ X; x: v t<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
5 p6 M" G6 T! q6 `5 ?" ~<h3 id="总结">总结</h3>
0 T9 p& H% ~# P; I<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
5 N% F, Z$ Y5 s$ [2 d- b<blockquote>
8 @; C) L0 l4 U) s# f2 x p5 Y<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
5 T" ?' `) T, @% I" }8 k8 `$ E</blockquote>
7 M @/ v' k. c. q" f4 M3 x3 N) u/ }
|
|