|
|
$ N' X$ f( @6 S0 j, N<h4 id="flink系列文章">Flink系列文章</h4>
3 o) r( s; I4 h/ F3 l) p: w5 q2 Z<ol>) Y& n8 i6 t7 W8 N
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
( C! E! K3 Z/ N) _<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>& E4 M" P" o0 d7 {* `4 J- Z
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>1 N+ S! u N l8 e
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>6 D; z* v. W. V% @" m
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
& Y- K. e4 b j) f$ M4 s0 V<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
/ y( R: n1 c, G1 f<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
7 h2 c# t& Y8 E/ C1 C+ W( \<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>3 W+ n4 G7 C# a8 g! |
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>% }; x' w- b/ j, w2 Z: L8 \: C2 H6 b
</ol>
* u; F/ n0 M6 d8 A<blockquote>
4 r' E+ R( V7 U" @, \/ Y& V<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>3 ^) _9 }. f, l2 q. F
</blockquote># H9 Y2 q4 b7 `4 w/ k, R" O% W; `1 g
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
, M- \2 h6 _7 B! Z; |/ ^- g$ }+ ]<h3 id="分流场景">分流场景</h3>
2 F* R9 U* A2 Y4 |2 m<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
, L5 N! R5 w( ?6 }6 Q<h3 id="分流的方法">分流的方法</h3>* d' C$ l, }' _: o
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>8 v1 V/ R7 T% {. A
<h4 id="filter-分流">Filter 分流</h4>
8 I; _7 I# o/ H, G& {<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>5 v. B. ?( K1 h8 U* s# @
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
; v( n. v. O3 z0 x<p>来看下面的例子:</p>
* s9 p' k* [ y( |<p>复制代码</p>; I1 K9 i( S. e( _
<pre><code class="language-java">public static void main(String[] args) throws Exception {
, l4 c% M8 W" j% }: s: E; T StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
. f3 x( G3 Z. j. i" t //获取数据源, B4 \. Y4 i3 J5 v' e
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();" K- ~6 h+ d* X6 A5 y
data.add(new Tuple3<>(0,1,0));( n) V3 o' |- b0 R: d) x- k4 A
data.add(new Tuple3<>(0,1,1));
8 I. B8 {4 m, T! g data.add(new Tuple3<>(0,2,2));* s7 T5 q" I$ T" z
data.add(new Tuple3<>(0,1,3));
0 d" u0 j- b. p8 P9 d# T data.add(new Tuple3<>(1,2,5));
& w) @8 `/ p6 I& p2 j- L" b3 t' C data.add(new Tuple3<>(1,2,9));
5 r d; k4 j8 [# [2 I* \ data.add(new Tuple3<>(1,2,11));
, r' e- H- m" [. A data.add(new Tuple3<>(1,2,13));
) }2 Y, H3 [! [3 T" [
! _1 s+ N9 S1 @- h g/ ` DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
8 W6 {% p; l& u/ J2 k. T* G. ]0 t! H2 J$ p; L: b" \
. S6 C0 [# a" B+ @ @! e9 ?6 e, l% i( g, E+ y7 G
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);6 u* T, ?& c' P5 h3 \
& ?1 D+ y. R6 Y' c( g" @/ r
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
. \6 [0 }3 N8 z2 T6 W$ l7 s* q" G" |! w% n' j8 q* _3 M: h
V: v, P- c( z
$ y' b% O; x: {5 }$ I7 J1 B* C zeroStream.print();
" V, q3 O4 I; W9 X9 r4 o& C+ Z6 u) P- t& z9 r5 S- H
oneStream.printToErr();& g1 D/ k4 s/ E1 O- o$ F
3 F& h5 P: s9 R0 |
3 V1 E. q1 @6 i$ r+ ~' u
" A+ {, j6 [; o* r2 u( a, r$ E2 V
" F9 ]6 r5 n. A. n. E3 A8 b1 z0 o8 y+ ]
//打印结果
' ]+ f, F. ^4 f; v2 o
1 G% |. y- B( N9 H8 S String jobName = "user defined streaming source";0 X& d c: {; u
* X" O6 k- ~" E0 Z env.execute(jobName);
3 r, t7 ^" g- }1 u8 e% H+ B* K5 R- j: d. g& Q
}
! K8 Z* D+ |, ~' S5 E" d3 d</code></pre>. e0 J; B! |7 j( }" P! X& J! \
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p># r2 M. A* p S$ P! ^' X
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>$ N1 K7 Y7 v2 X* e( P% Q
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p> a9 ?7 u0 U/ l+ x6 _: l
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>9 {5 }4 p& E2 U1 }. x' G) X
<h4 id="split-分流">Split 分流</h4>; f8 Y! ? `# \
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
3 l- r& Y4 `! v0 J, E9 J<p>我们来看下面的例子:</p>. [8 U, h, w0 Q( f1 @/ N
<p>复制代码</p>% r5 f+ v- y: S* u% h7 I& _
<pre><code class="language-java">public static void main(String[] args) throws Exception {* E5 r' |( g% H3 k' K+ K4 V5 G* j
1 W ^; Y& E- J9 B2 M+ S) h9 v* | [( l2 P" M @. y
^8 C5 g3 k3 W: H$ P+ J StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();7 B/ h! _3 Y+ d5 Z5 U" k. c. I
7 [: f/ P! i; B. V
//获取数据源
3 D7 R6 R- p% [4 z6 q9 G- s L6 ~( v5 ^
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
* S8 p) S, r9 t0 o# v) I
) r7 ^, U* X; P) P' K& j+ v data.add(new Tuple3<>(0,1,0));
. n6 d9 [( Q% q. ]$ u' a: P8 q: K6 X. t& c9 T! e. p2 {
data.add(new Tuple3<>(0,1,1));
$ D/ | @" T1 d# {5 H( Q% Q" L `$ [/ {
data.add(new Tuple3<>(0,2,2));; W E, ^4 i9 K
* {5 I3 x4 h+ X5 J6 t7 f7 T data.add(new Tuple3<>(0,1,3));) a4 s$ d* U, B/ E" n
- ~ \& G4 ]6 j: Y" E/ M data.add(new Tuple3<>(1,2,5));/ H, H, _0 @& u! C7 J' Z
/ D+ V+ }! Q) @$ W; j( X$ [: P
data.add(new Tuple3<>(1,2,9));
. \4 z6 G, N0 m+ ]$ Y3 _+ @1 N7 d! n }7 C
data.add(new Tuple3<>(1,2,11));- U+ i9 j) q* E5 q% \7 d) q
. D$ L* L: A1 N- r% S data.add(new Tuple3<>(1,2,13));7 }& p9 O/ k6 f2 i4 i! B
# f; B2 L0 p- f
& D6 Y% _! o. c8 X% Y+ V( W6 }/ s" X2 `8 y& n1 r6 a9 Z% C
) k1 k8 ~# w/ P7 a
! J( s. C+ u: E DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);$ y: S- |2 p2 `: [! g) r" b4 f
$ o( r: K" [; i5 ?+ d# A/ H( b0 L; |1 p( o
! _) K5 G9 Z, M/ f& G1 a/ Z
: h% |; t: g; k9 s& B7 d
* B9 ^0 J. w8 S( {" {* z
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {* N; b) X) e; x
0 U% c! K( ?' ^3 V5 J1 b" N# q
@Override# a% d2 h4 R; b- a; x M- c
( N. Q c9 D6 v2 M1 ?
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {% X7 I: U6 U& R* ^1 p: h* {2 n
( N2 p! t; D9 c# [2 a List<String> tags = new ArrayList<>();- L9 ]7 w1 {2 V* f; R$ f* \
: l3 Y; w2 R5 \8 {/ ` i+ A9 a if (value.f0 == 0) {& R5 Z9 O$ y" w4 J4 H9 D9 d7 \
2 M+ E# k6 G, s0 Z _3 t tags.add("zeroStream");
J+ b+ G2 P5 e. f0 O- E
/ v- @/ d1 o2 P# U7 @( l6 B } else if (value.f0 == 1) {
& h b" ?+ D. [
3 i- y+ L' o" D" k/ @% A tags.add("oneStream");# R5 J2 K2 o- F
: F a" u" n" p }
7 e# Z+ Z, {$ Z6 u6 h, v; S
: n& l6 Y9 P; ~4 D: ?6 h* q return tags;( {5 v) o5 d: |8 n. g7 j
3 K* g* N1 ~/ I8 {7 m
}9 x ^; F1 P Q2 X) C+ y: g
; Y/ h% ?! ?3 j
});; A4 |, ?; a- M i( G
" h1 _ W% A: r% C! l. r0 w
6 M! u6 @* y/ @/ ?5 h7 u, b; t
; I/ `/ t1 i; ] splitStream.select("zeroStream").print();
7 r1 d. Y; i* x! Q6 C, o5 F
0 a! t0 D; N2 f6 y splitStream.select("oneStream").printToErr();1 p e' v5 k( {/ l3 l# } y
. m: L. W& N6 T `% R( s( ^4 Q: F" a& v- I$ A4 g1 D6 s+ u/ `
# i, m" r$ n% _+ y' C7 O E
//打印结果0 w& p* Y+ C7 g% @ i4 t- F
) H, V) K [* x: ]' ~' b
String jobName = "user defined streaming source";
6 R, i/ P9 u" [! e6 D! \% `% v; P7 C! x! H5 o/ D
env.execute(jobName);+ }: q B& Q4 W: s; X' n
; t! y: m. R! x. D; s( D& \0 u! p
}
' L+ ?& O! }' b</code></pre>/ X F4 b/ Y w6 ?2 |* s
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>: F; o2 J! G' @) f2 N, G
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>2 z+ E# C9 e; b' N" @2 T/ ~
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p> i" r- ]# I$ p. {6 [, y4 D
<p>复制代码</p>. K5 H4 Y5 s M0 x8 O
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.& m$ Q- N# f1 h( v
</code></pre>% u3 u& K/ ]5 H: O! j+ z
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
$ X' q' ^% Q: U% z<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
! z) Z v5 M) c S<h4 id="sideoutput-分流">SideOutPut 分流</h4>
/ k+ S h. i. L$ X7 }<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
+ p/ R" z, ~5 ? s# j<ul>" ~" r1 a" _4 O7 N6 |
<li>定义 OutputTag</li> H8 [* a: Z: ?# {: `4 `: w' F, V
<li>调用特定函数进行数据拆分; u6 g, C5 Y9 O, _5 I6 `
<ul>
4 b* s! P1 w4 Q" m2 S<li>ProcessFunction</li>- Z6 U" }) q' U0 O
<li>KeyedProcessFunction</li>) @' G& T5 y- K! {* L9 X
<li>CoProcessFunction</li>
) Q5 u" n: m. V8 J% c<li>KeyedCoProcessFunction</li>" t8 l9 t7 Q0 H3 \/ H9 R. G. P
<li>ProcessWindowFunction</li>; C! r% d# ? t! [
<li>ProcessAllWindowFunction</li>+ L: \; q/ l8 s" N
</ul> w% G& F( }( T" }% I3 U
</li>! _0 c8 i9 e0 `% _" V% U
</ul>
0 Y6 a- O$ X% G4 c8 @<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>+ f- d! e( e8 P/ H
<p>复制代码</p>
E! l, w) h' e' |$ W* Y8 B# K<pre><code class="language-java">public static void main(String[] args) throws Exception {/ g' B$ s/ k! m! u. _& s6 [
6 ?! `4 y) c ]% ^
* D e6 H: E& m1 w2 _4 n
, e3 I0 R' J' ?3 U! Y! Y1 n3 j StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();2 U8 M$ l+ ~" y! ]" \
. X' b: h" v# X- h
//获取数据源
2 a. e# ^$ d* }3 x6 ~6 y) x( X" J" n+ V- w6 e& h4 w* N$ I
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
! R, v# [( I* ?: W: x
6 ^. ?7 w$ ]; B9 b b6 d! r0 N1 k. N data.add(new Tuple3<>(0,1,0));
% K B0 r6 O! v& _6 |" m; l$ `
* @& \0 m+ G' N/ [5 H# x data.add(new Tuple3<>(0,1,1));2 f0 J* y: ?/ S3 S
( T, L" D, M+ M data.add(new Tuple3<>(0,2,2));
3 ]) ~! q) Z8 H9 m* s. {" P p/ w
data.add(new Tuple3<>(0,1,3));
8 B, p7 `+ E9 T4 p2 \- n* ^
4 A7 a2 q# Y; f4 _. z3 O data.add(new Tuple3<>(1,2,5));
& A1 n) z6 u" t9 C- l) D1 [
9 y* y/ I. d d& Z" {2 e3 m data.add(new Tuple3<>(1,2,9));1 a' b% ?7 Z/ r. I& d- I/ o# \4 G
4 V- s+ K" @ E- Q0 L5 w7 }: C
data.add(new Tuple3<>(1,2,11));! C# w- T N- ~% s7 Z0 L( T
1 U+ P! {3 c6 T! g
data.add(new Tuple3<>(1,2,13));
( x+ p1 i0 }) v" }' s2 ^+ z8 m4 q3 j& b1 V6 v( x( f
4 I [* _, K) K7 X9 C2 a
7 a/ _6 Y8 `; u' C' ~- V5 _3 d3 Q9 h1 R; V; s9 M* a2 ~1 H5 _
4 a" M8 w# f8 n DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);$ k& V7 c2 Z7 e- @- o7 c+ O7 l' V
8 x( `, [- F6 ?. Q: B: M
5 t0 @2 r# h9 K3 v. q1 b- `% S1 n9 z
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
' J, E& z) F# I1 e" }! N5 k, d3 Y" f8 Y) U
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};4 p/ [5 D8 T6 J8 K5 c
. |7 D3 C0 K5 v( N9 K( M- x. I* F, X8 d1 F0 {4 _6 p0 \. Y
" [1 h' |3 f3 ?$ Y
4 \ K; b$ {3 ]5 E$ ^0 i1 H$ T" v4 k, W5 v0 ]2 {2 o" l
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {+ V! i. K5 W9 S" O3 E
$ C& g! Z% A4 m {6 l @Override7 D. S0 P5 V: m( T4 H
! {% `6 B6 `* P% h% \( h public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
9 x3 ~# L: k9 I) L$ s, f# ?) q& P) O3 B# X# V
" U2 c1 `/ @! p6 A" ~; e" x8 s( R8 G$ C. W
if (value.f0 == 0) {8 h5 B7 n1 @" [$ u+ }
: M0 d) v: A2 T# ^) F ctx.output(zeroStream, value);
3 H H8 C# t9 y( g2 X
~4 Y- M; P E4 ? } else if (value.f0 == 1) {
' H) t" s. [1 N: S: x( o& x/ C1 r: V" m* V, L" o' y Q( u
ctx.output(oneStream, value);
' {8 {" ~3 r% }5 U% W' A4 R
0 N7 y( E, e. O% a' u, f9 w }6 ]3 |# N' ~" ]& x% Y/ O0 U
$ ~: h4 ~5 [9 {7 K7 `+ x0 c$ R
}" V1 ^! l2 b1 R& ?
, a8 }# |% ~& g8 v });! m, {" {$ a! v
; L H3 C' N% i E" N. ]7 V/ o0 s
% V/ }8 W6 b2 A& Q
8 j; v" ^: W+ o6 T+ ^/ f+ ~ DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);9 H$ N, T$ G, I% R, y: ^$ E( W9 w
: _( o; ]1 y1 B8 k7 A DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
! I1 K0 J. i: W0 F& h8 `; g* e; J- F5 Q( m1 C
" |0 Y6 C8 o/ _' r. y: t* w
% v7 X3 J$ \* G9 n" f4 E zeroSideOutput.print();. q6 m$ X+ t7 @3 t$ {
$ q# R; t" U0 d+ p5 Y! n: k& I3 |
oneSideOutput.printToErr();
; ~! R2 D, r( o$ E& z. c g+ c3 O+ L+ W' m( C
/ g$ c6 X+ e3 Y; ?1 r. t
2 T, L; [$ G1 j- m3 x' O. p
7 L2 X C8 p3 l! c( j+ G
% c0 U5 [( B2 m //打印结果
' Z6 k. r( E0 d/ G4 \0 D/ ]3 h4 C: G5 |6 j
String jobName = "user defined streaming source";
2 l! k% n5 L* j, h+ a% h
, P; [& D& r1 G, r l env.execute(jobName);! Y- c y: L! ]! E; B
* C; a U" T* [: Q
}
' o8 [1 d8 k4 _5 p</code></pre>4 g! z; [) M9 M& G0 Z
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>5 C: u2 P# s/ R* U, r7 l* e
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>( J% z( X4 H5 u- W
<h3 id="总结">总结</h3>
3 V/ V B) A8 _! L4 S( G7 Q L<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>' G" s* F* G. `7 C# H1 [$ T+ \
<blockquote>* k8 b; Z1 U. U' Z3 q
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
+ l$ g0 i! v. W, i1 ~! [</blockquote>5 u8 e! o5 {, M; w; d
2 M, y- O" M5 [; L* j |
|