|
7 o5 R) J* L% V! H( L
<h4 id="flink系列文章">Flink系列文章</h4>
/ q7 F7 N1 Z8 C# q# b" ?. y% }* B<ol>' _, Y: F( p4 j2 r! l# }5 i
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
# V q; q7 d" O<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>/ f' z6 l* G! k. H7 [
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>6 F' N6 Z \. I3 K8 _3 [
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
0 d$ O. _7 V& l" Y<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>* k! H0 k+ F: _7 ]! D1 S4 O% m5 z' o
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>. W m# Y) v2 S' P
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>$ d: ?. R2 ~+ N: q4 K: C
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
, p% R1 Z3 ^2 b3 B1 ]. Z0 L<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>; o3 ]4 n1 H! ]
</ol>
2 A/ g! |' E$ M; v i4 d<blockquote>6 N; M# p- }5 W$ {5 t
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>; T x; u" O J5 V
</blockquote>
+ k/ |7 W" o6 m- }% d<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>' \5 m/ p2 ]) @( z A
<h3 id="分流场景">分流场景</h3>
|) k2 i7 M' r<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>7 y2 Q" F$ H# A- g: O
<h3 id="分流的方法">分流的方法</h3>
$ s, u: B: J* c' Y d! X<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
" d* ?5 A% D! u# Z- S* O<h4 id="filter-分流">Filter 分流</h4>2 V) }( s2 O2 f. Z0 j' g2 g
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>% `) o v" l" P
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>* l W1 z$ _) F3 o
<p>来看下面的例子:</p>
+ Y0 ^) H* I' B/ ]<p>复制代码</p>$ |- G& @$ I: e9 e: C" D8 b# v" I
<pre><code class="language-java">public static void main(String[] args) throws Exception {7 H0 I6 J, X2 R
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();8 U1 d- G6 K) Y# B+ i; i
//获取数据源
# Z2 @9 x# J3 u( ^$ u5 O6 v4 F2 H List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();- h$ U, a9 i4 v# P
data.add(new Tuple3<>(0,1,0));, F4 i3 l# L5 r; u" I
data.add(new Tuple3<>(0,1,1));' m9 q$ p% H( H4 U# \
data.add(new Tuple3<>(0,2,2));
3 K" y E ?. }3 ]- e3 p data.add(new Tuple3<>(0,1,3));
. W3 H4 q; c) f: c" f+ z data.add(new Tuple3<>(1,2,5));
! p9 O2 M1 t5 [6 v+ K8 E* D data.add(new Tuple3<>(1,2,9));; m# r( F4 T% I7 X! k
data.add(new Tuple3<>(1,2,11));8 s, g4 W2 J4 ]( ^; [/ T
data.add(new Tuple3<>(1,2,13));5 K, X. E; w u6 {
R. j5 F+ R9 U( A H) F DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);$ l% k+ v9 [, K5 R! G* a( b
5 Y) @* D" h c
* S, ^/ ~1 c! S, h+ i s/ Y; G! Q, |3 J. y
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
7 q' H0 P; S+ f/ ? }- ?
3 R0 ` {1 \( _8 P SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
" ~3 a; V+ {; x/ m/ r; c6 N+ a$ ]7 u( ` r) @6 X: C
0 e' G& G" g2 a0 u8 Q. I
1 G" k' h0 w% r+ H zeroStream.print();
0 v, `" A4 Z n+ Y! X9 J: t% j: f6 _+ H) J* k7 Y
oneStream.printToErr();6 {. Y, \8 P, e$ P# b& g' T
0 ?7 F9 h0 a% ^! s# W' G3 m' L" i* y7 L& W3 [
3 j& c: z2 }7 N8 i3 a, J% t" G
g5 _6 e7 S6 p6 `& z: e" C
$ H: ?, V1 h" O, E5 P
//打印结果
6 r1 t9 x$ D+ S' I+ n( k# f ^/ p2 ~+ v4 z+ i% d
String jobName = "user defined streaming source";; U- \6 T8 ]0 x3 W
' O: @# o2 Y8 ~0 G& \8 K3 Q1 O8 e0 f
env.execute(jobName);( e, D3 g- m+ z9 z& X( c
' P) m9 z% y- P2 ~. A
}& U+ P: D _& S0 n1 j w
</code></pre>5 Y7 N9 c9 g3 J! R. `9 Z) v
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>$ n4 W" k: V5 k) z: o
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>/ d: T: `; j. T+ h( r0 V! }5 f
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>: S1 c' Q, ~1 N+ f
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
) r6 j1 H- @2 h5 {<h4 id="split-分流">Split 分流</h4>- P( r/ A; {( S. W& a q5 g
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>! O& F8 r; ^$ ~3 O6 `$ `! S9 {
<p>我们来看下面的例子:</p>; L6 F9 p# J( x$ o5 X& x
<p>复制代码</p>
7 z( _ a' G- A0 V7 C' l% i<pre><code class="language-java">public static void main(String[] args) throws Exception {
0 l) @! i% @( q" r% N0 G" ?' g: C- _
# A8 \8 i, E6 P8 M6 V
0 x4 l M/ Z: N' a- P6 B StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
$ E/ |% g0 w: D$ |- {! }" b. g+ a/ S, [
//获取数据源
( g- G1 M8 @; ]7 e3 F1 ]
4 }# h5 d% m& u7 j0 G List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
: m" h- ^3 q9 |: ^
$ H2 b/ u: G1 P8 ?% E' o data.add(new Tuple3<>(0,1,0));' j2 C8 m4 W5 q5 {
" {) J4 r& f0 Q" T# f" s data.add(new Tuple3<>(0,1,1));
* b0 y( }; T4 Y2 B$ `( I6 l3 H5 ~6 p
data.add(new Tuple3<>(0,2,2));9 a7 ^, h4 S+ W+ {( J- k
' x2 @7 i% O7 e" u" Y, X- H data.add(new Tuple3<>(0,1,3));; |7 L' N/ |4 w3 Y
5 t% h& {4 \) @ C4 l: U data.add(new Tuple3<>(1,2,5));; e! v- O- P" k; z4 a
9 M8 S$ i; D6 v9 ?, h: I
data.add(new Tuple3<>(1,2,9));% b- t j0 P# ^) k& R, n
! Y3 d7 S: j/ d5 N0 f data.add(new Tuple3<>(1,2,11)); U; r% `6 k' a$ o! l
2 W) k. d; `7 B/ q' ]2 m
data.add(new Tuple3<>(1,2,13));
, O. Y2 e2 J7 }4 ^2 o
# M+ {; l0 Z Z, V s4 x# A/ M% k0 s
) q8 K2 a1 H X `( M6 w3 K& q
- E6 s6 n& C+ B7 ]+ E( i
/ E3 Q. V: ^$ \ DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
8 i. C( }2 s0 J( N- B* O% K* k. ]6 Z. \
; O0 m/ @' s# H- [5 M/ q" `5 j
# W$ p* M: c5 Y) I3 D0 ^
" ]5 g, G5 ^' x
1 G( Y& G o8 P; z
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
8 p' w. L% k! ?
; c$ t3 E9 g9 \& ~7 G% | @Override4 b/ H0 I8 e* h0 y A" L$ b' @
3 l3 E( ^; O. i) e3 r% [0 @: D+ R* o
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
7 x/ K5 z) x; D' f
7 I' R( ]7 h- M( z List<String> tags = new ArrayList<>();
5 e3 p9 ^3 i4 B' H# ?9 p' i
! ]( d' Y" A. s$ P+ A if (value.f0 == 0) {& q( N9 O1 L: ] U' z
# Q& J/ V5 Y, Q6 s1 T5 J- @" s2 q tags.add("zeroStream");2 N9 J, X4 \5 B$ B( d# w
* l/ O- g1 @+ _+ @9 y } else if (value.f0 == 1) {
( u+ n; N( @3 U7 {! T
8 |. Y' U( J# h5 q7 C" h Y tags.add("oneStream");9 s S V l! B4 R" s. X V
. _2 I2 D; t& d* c- T
}4 c" T0 z/ X+ X7 o1 X$ j. e
" T5 Y4 E6 d8 E
return tags;
. ^" h/ I! E" d4 `
. P+ {5 J1 M, ^8 ] }: K6 ?0 Z5 T( }& F) g( w0 C
. A( o. V1 \: t7 n0 B# |- s" {. F
});" p4 ]+ X9 s/ S. `2 O& C
# ?. x7 S, P n6 ^9 ^0 M
9 ?* p) A& a, D g& b
3 G e' |. W3 y7 O/ s splitStream.select("zeroStream").print();
4 ?$ J; p' C# S% q D1 F
0 u& Z, ]% _. A! W7 ?4 A splitStream.select("oneStream").printToErr();
1 E7 I; V4 L$ N" r8 L$ Y6 n- J& `- \7 c6 l6 Y3 \; k9 p) j: j! J
, m2 `' Q# g. C4 _
3 P8 c2 \1 n$ L/ X
//打印结果
/ V0 h4 a4 A# B/ W& E5 \$ M$ C& b$ ^6 R8 W% i
String jobName = "user defined streaming source";
H. B" Y9 }( h+ p" l7 d* F/ _" j/ D8 g! ^2 v
env.execute(jobName);5 y5 Y, p( ~5 a, v3 v
9 F' n7 y4 S, o, p
}
2 V/ [. O' M% Q* K, K) X</code></pre>
; {; l6 T9 P! r% w& w9 P9 D t<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
* x- n! J% v5 j! W' W<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>5 O) ^8 |+ o( Y8 }
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>) I: ?5 Z1 {4 l9 P+ `. [. x) i
<p>复制代码</p>+ V7 b/ H0 s# ?2 m% X
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
$ ?6 r H/ |8 L9 a" e! ~5 a</code></pre>/ e4 B6 t2 Y8 b( b0 {! v
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
3 D7 r, ^4 X" y- `' b6 j5 s" o<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
4 p, \1 P8 @# r$ ~<h4 id="sideoutput-分流">SideOutPut 分流</h4>* r& M1 K: j) C) m! L' N
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p># k1 ~8 P; }' @
<ul>
1 U- c5 x$ Z7 ]/ f1 {' M<li>定义 OutputTag</li>5 F9 Q' i! r& |/ S
<li>调用特定函数进行数据拆分
4 f8 m- } R7 P5 o8 L4 K<ul>4 N! B4 t* f2 U( w- I8 ]
<li>ProcessFunction</li>
. s! D* `, [0 e2 j$ N: U; r. ?1 g<li>KeyedProcessFunction</li>3 f; e# k. C) N9 _ i
<li>CoProcessFunction</li>
3 q3 u. k6 P* Z4 a<li>KeyedCoProcessFunction</li>! e L% ~# a3 t" r$ n5 o
<li>ProcessWindowFunction</li>
" A k# t* M& T& O/ q<li>ProcessAllWindowFunction</li>
8 P8 X" H! |" U& y( Y+ v2 F$ d</ul>% } x' Y0 z7 \& D7 A# G
</li>
- w E# u/ x# Y6 G</ul>7 K1 \2 G: H# `% M& P8 F# c& j1 W
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>0 U5 K& R+ j1 `' Q- Y
<p>复制代码</p>7 g8 K6 y- h0 z8 h5 ]5 H
<pre><code class="language-java">public static void main(String[] args) throws Exception {
- ?( v6 @. s0 _. t
4 ?2 l9 d9 X7 H- O
" f( J' x- p" u7 g" I, c' f$ l, }) N8 v) B
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/ X" [$ u: l* l8 `
W* E5 C6 w) B. Z //获取数据源( |5 l) l) u* K( j' J4 G# F
$ f+ v8 h. q! s8 f) j4 ?8 M List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();/ M& T7 l6 }+ f
3 N& e) e( ^0 _( K1 i5 t5 G data.add(new Tuple3<>(0,1,0));
8 L4 w. w# T% ~; s S
$ l2 \$ K: K3 f A/ m. Q data.add(new Tuple3<>(0,1,1));% g9 t" F! K1 L! {: \1 y- O6 c
) L5 C2 ]4 V; C: d4 @" I0 g$ ~
data.add(new Tuple3<>(0,2,2));& X/ h3 N5 q( m* O0 @, \0 {' p
2 z: `" m: Y4 ?+ F: X2 h. ^+ i) j$ B9 x data.add(new Tuple3<>(0,1,3));9 O1 J, F1 S& }2 l/ U# e4 o
; Q7 ^7 i* V) O( w
data.add(new Tuple3<>(1,2,5));
+ o4 y8 G# N0 s9 Q2 k# U
0 X- P/ J: a2 V9 {% F0 f$ d data.add(new Tuple3<>(1,2,9));
7 d$ o5 |/ }2 p7 L5 U O: L; F0 B9 N) I; k
data.add(new Tuple3<>(1,2,11)); U) ]) F0 v# |! G" }; C
0 M5 h5 @2 M* o2 h b! g
data.add(new Tuple3<>(1,2,13));
/ [( Q/ u* @) P( \0 ^# H# `
" ~& O* V% K5 J8 E# m: t6 X; H
+ {7 r) M1 ~1 B; h5 \+ f, o1 {) u+ l$ P, R) Q8 S/ c
3 \: q# S5 L2 k$ e. L
M8 X/ R& D7 w8 D5 S" {- \ DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
. I* F1 G* l+ o
" p: M+ o1 E- i: ?3 C5 e5 x- g9 A2 I D1 ^0 m
3 z: \' q8 X+ u- w
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};4 E5 K$ J4 z/ u4 F( A. w2 Y) G
3 `# }5 X" x* \ OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};) J- Y, z7 ^, O9 q
* f O! e% g" U9 \( h" b
& O! ^5 G6 S( W8 T0 ^
. N% W2 z/ S4 _" a3 q Q3 m. |9 R" e' \ W- e7 ^' M, [% o
$ U7 \/ ?& M3 r5 n3 l) G" |% n" h
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
7 }5 @5 @. X$ `( R5 f- J, P3 h& O
0 d! M0 C0 L/ S @Override# l' v! f+ s8 N8 y, T
, C: P1 I( S: V$ \
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {# a7 J5 P2 P+ u, p! l8 \
3 `) ~# ]* l* d" V$ `/ d% s3 I) C. s2 v( e1 N
6 u; a) Q$ g' A% k/ N0 A' H& `% o; z
if (value.f0 == 0) {
& {" X, L. v3 {5 B
: u$ ?, ^1 L6 ^+ _7 H ctx.output(zeroStream, value);
9 p0 D1 R8 \( V7 t! j! ]
/ W/ V2 y( c- P/ n6 l( O4 ]2 X } else if (value.f0 == 1) {, s8 r7 S5 k o9 J) Z
$ @& ?8 ^6 r0 t2 a9 T& t ctx.output(oneStream, value);
& [3 U$ e) X# H2 y/ f- b3 m0 e( L* u9 S. H- K5 @- R* u9 B& o
}' R$ U {; E4 Q9 p
8 V9 b4 ^& V- F: v6 D2 f }- D7 A7 M( l% S; ^1 U2 w& i
3 @. K2 L6 I. l! R7 r });/ Q7 r0 v6 ?) ~1 ^" o t
0 S/ ?$ A( |' o+ m, }8 z5 m1 }6 ~6 b
1 w7 i0 H; s- ~, O" Q$ J% O4 R" D9 y4 h* s" l7 X9 R( a, ?
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);5 t2 ?4 S2 ~; u' d7 P' L
( v1 x( m8 [/ P6 @& S. @ DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
' U! Q# R5 W4 X. ]9 ]5 k) J' D2 }, Y9 t1 e, ^
) t: E8 ]. H$ x& y5 E
4 ]& \- B( s" D% Y* ]4 | zeroSideOutput.print();
4 [5 K% t" D2 |. V8 j9 Z
m7 I8 P5 W1 c oneSideOutput.printToErr();
: E! f! s, c$ z, b8 C, F& Q3 K5 I
* \( J' ~7 m( ^3 y, X. P
8 S ]; ?9 s6 U& ~) n. Q+ e3 |
* V, t% T) [6 S" O! V
+ d8 r% n, c# @; K0 p //打印结果! @% h: ^( }8 l# R& P& P& b# Z0 a* x
; y: A/ w- j z
String jobName = "user defined streaming source";, }1 U- g! E, u( \
7 p. ]+ z, p, Q3 W
env.execute(jobName);# @' l4 G( ^! A+ ` Q2 J
' D# B( M S3 T2 C6 L}
2 j. I6 i6 y, `# I9 E: R7 V7 h</code></pre>& ]4 D" N! L# j: m( k
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
0 l6 X0 H; k/ ]0 e' D<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>' e( S/ q! D9 L& _
<h3 id="总结">总结</h3>
; T/ y' Y: c) `2 ?7 }/ s9 b<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
( X* I' C( O5 |<blockquote>
; u: Z% b; `' W/ P7 F<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
( n: t: W9 v2 r. M* q</blockquote>% _0 W% G# Q1 F: m2 ~' \( W, t
( D$ ^' p0 k# t0 i/ I9 u
|
|