|
|
* n) H1 V) b9 z6 p* a; r7 Y<h4 id="flink系列文章">Flink系列文章</h4>
* d: t/ c6 U) E* r E' U5 g<ol>
9 k4 `& W) y( a2 M<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
, s% v+ d. C7 m: b% B! @& E# f<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
2 z8 ~4 e; S6 U6 k7 ?<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>$ R' a1 Q% g$ p, p$ P
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>- U' o6 N/ Y0 U( T6 B3 c
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
: V& t5 d6 @$ }4 e8 p<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>$ o' Q; K, W: p
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
. a, W& l( q+ F<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
9 ~) j# c, ?! A5 f0 u<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>: ]* \7 V' Y7 Q5 ~2 V3 ?' D
</ol>
0 j) r E& A% J3 r& H9 s/ Y9 K& C4 [<blockquote>
/ Y8 L3 e1 l! z; a- i<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
' _9 K! u: t7 \6 w- |. r& Y</blockquote>& m1 V1 T9 c9 G( x- E
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
1 l8 s6 w! A9 g+ R1 N# \! Z9 M<h3 id="分流场景">分流场景</h3>9 l* b! S0 |7 o2 }. D7 r
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
* l& C$ A$ H4 j5 K( O3 X' f<h3 id="分流的方法">分流的方法</h3>
( O- b2 a1 f5 ]# l0 q1 D0 G9 ~<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
6 `0 d' b5 s0 \ J, ~) g/ f<h4 id="filter-分流">Filter 分流</h4>8 F# m# m2 H7 @% s
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
# N$ g7 ]: D9 Y/ c6 x+ ?<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
7 B8 i( I: \! S @+ U3 A<p>来看下面的例子:</p>
! k5 z' q0 I5 P- @" L- C+ ]<p>复制代码</p>
4 r! E, D1 W* F+ m* f; {1 B<pre><code class="language-java">public static void main(String[] args) throws Exception {$ C: v1 o. j: c
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();' V. Y* W$ j' k7 E
//获取数据源# y" t3 Q5 r g/ A" e- \$ v( x( I
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
. k8 K! t f, B% `: J9 k data.add(new Tuple3<>(0,1,0));
* H$ \0 H0 W' ~/ [4 I4 A; s4 w data.add(new Tuple3<>(0,1,1));
4 `- [. ^& [0 h% } data.add(new Tuple3<>(0,2,2));
! s- i# Y! E# {8 L u data.add(new Tuple3<>(0,1,3));
- d$ c* E/ b; y4 b4 j, ] data.add(new Tuple3<>(1,2,5));
! ^/ \$ G* I1 g. T3 z3 O data.add(new Tuple3<>(1,2,9));+ Z4 }# m! _) o- t
data.add(new Tuple3<>(1,2,11));
; i6 F% z, D) z8 ~: ^! K* L data.add(new Tuple3<>(1,2,13));
0 ^$ e* x* L6 [& s5 z2 U5 n* ^: A! Z9 B7 S% N1 z4 ]
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
! Z* p7 l6 D8 F
6 C' k& b! I6 I# x5 G: r
' o# n: p% v% M' ^% V! W$ N: m" x! y* @3 {; b; F
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
& Q( R6 S% \- Z$ g( v
' R$ S8 S, i6 ?; p SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);: u [5 L5 |/ Q! {- ?9 o
7 s0 V4 j9 d. _' u4 D8 h; K+ {4 u7 n0 l
) Q# H A' P' \2 C2 J5 P zeroStream.print();
8 Q8 q0 r9 |. O
& u- q& S; E* \) K- [ oneStream.printToErr();
8 D6 Y8 m" d C5 z6 R6 @- F3 N1 k$ |1 w1 G6 I% \
2 _& `) _& [0 K5 `% T) Y i! L/ z* ^5 N9 G
% P* J6 s/ @" V" V+ e7 T6 q5 o$ w/ i$ P- t" D$ j( i, e2 R
//打印结果
- X% i0 o# E. C V4 H+ \% o( v6 ^! e
+ a! a, h0 i# b; B) } String jobName = "user defined streaming source";
) L' E% g7 P, @+ O, q% P [; C7 y; _5 T) V, K: y# z+ {
env.execute(jobName);
7 j! V% h1 i" i8 m
+ A! ^/ l `: o3 s8 H8 I9 E; J}# E. [% O7 D' N! ?; P
</code></pre>/ E& l0 o( u& _ Y" \0 Y* ?8 I
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>/ ]: j. \4 ?6 T. c/ w- M/ T
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>, |$ x0 Y t; J& g/ h; k, E
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
& i/ ^7 z, Z9 d" X# ~. w<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
& t$ F4 R Q0 p7 I- _, c<h4 id="split-分流">Split 分流</h4>
1 E0 L" I1 n# m. r) r/ J$ B<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>( R4 c; C: x3 t; T' m* C1 ^7 q
<p>我们来看下面的例子:</p>4 O+ ~1 P7 T6 ^' c+ C
<p>复制代码</p>3 R1 E! y1 U8 H$ q. B5 N0 `
<pre><code class="language-java">public static void main(String[] args) throws Exception {! \- x. s" T7 Q* T ^
, ~9 R2 w' n# D) T4 ^
' M! Q; S: N9 e9 u, N
1 A( g& L9 k5 i$ m" Q' z StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();" e- `- R4 u- H2 m
, `. h( y. Y" ]6 q; O
//获取数据源
! Z" F; r& Z( d$ Y: Y! ~$ h# f+ T' g0 W9 i: ~$ u
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();; H6 ]& s0 v; a2 [4 {% G- x# X
/ y, f6 r. _7 U! G, [
data.add(new Tuple3<>(0,1,0));5 _- a1 w( ~* V% U4 P- P4 q: z
' b. |* [" O* F$ ~) n
data.add(new Tuple3<>(0,1,1));( j5 A: [2 R+ [; \
5 {$ J: ]; v4 M5 S$ M3 N$ C data.add(new Tuple3<>(0,2,2)); R2 K" S6 H( g9 H0 O9 O' i" V
* ^. i! d/ r4 L. `: S% I% `
data.add(new Tuple3<>(0,1,3));& W: M* A, w4 [/ e6 ]4 M
" z8 F/ `" E" ]9 K6 L
data.add(new Tuple3<>(1,2,5));. L8 A9 L" g3 x1 L
$ B' O" K7 E! N* W
data.add(new Tuple3<>(1,2,9));- v1 l1 F* \) e V0 V7 ^3 H
5 [$ Q; M# N0 @- m# y$ b/ u6 X2 m data.add(new Tuple3<>(1,2,11));
2 d% y% R( e9 U$ M5 H2 K3 s4 U) A5 m1 [ c6 h
data.add(new Tuple3<>(1,2,13));
1 { X+ U" } w6 q( r# t% K* A% D0 h }+ R/ K, H5 K9 k
" J- } r6 c% i& b: d) c# j
k1 B5 Z5 ]1 _# \
& j6 y* T+ `' f$ v- K z8 u( @6 d1 r6 Z% F p
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);' j4 n& r: c! U A" i% K
$ B ] f7 ~0 E/ \9 f- c( }
; ]9 U f i% |7 ]
& H& Z( W$ w: t4 X% Y; y
; d8 Z1 c& ~$ `
; _% X2 @" T. |+ a% a+ n SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {) B1 ~1 u9 x% _7 t, z
, ~- W+ j2 A2 i) @" _; E3 ~# O6 O% ?
@Override
, M: R j8 M+ L9 a( M- F) u5 C
7 P9 O) w9 w% V$ c" F' W public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
; V4 F; o# e; ~% \/ L9 f
0 Z9 @9 U4 K3 e3 v7 M List<String> tags = new ArrayList<>();- k* o8 @% L! ~
/ l! v8 c4 n" E3 j4 t if (value.f0 == 0) {
; X7 n" m& L5 }- K
4 V% `) b" m1 f* _ tags.add("zeroStream");
; @! D l' E4 _# ]1 F& {* r- G8 O& E/ P7 q3 I( ^- E
} else if (value.f0 == 1) {
- i3 |( n+ t' ~! m
, l' u0 P* b) O, [7 i3 u tags.add("oneStream");) @8 s0 l2 s% R* Y5 Y
2 [6 [) h3 ]( U8 L% o1 y2 W; ~; i }
' G( F. `9 q% b/ f) ?) e9 o
$ U+ P' N$ t5 q- C' g( @ return tags;2 A `9 s9 R$ e$ E
4 ?- X& ?0 h+ q, n5 \* r& ?, j
}0 H% i; D/ n8 i! I3 P" _# T. f Q
2 q, l. ^. r: k; l m& Q/ Z
});
. c: I7 u7 A: ?, b+ ?" i$ w* t% H" C& n% |) @
+ a! \( a$ z4 X
- W: N+ y [6 ]; b5 R0 P2 m! } splitStream.select("zeroStream").print();5 ~# `2 x& h/ m5 l: t
5 {0 t1 a8 V6 H- M# D( U4 { splitStream.select("oneStream").printToErr();
, I. \1 v3 B! [+ s/ B2 x
' @6 \! C1 J7 ~! U
5 Q' l0 D- D! e; ~* C" x, A3 p, k: C
, F* o1 R4 P, X {8 q //打印结果( U9 |0 l1 C! ~3 T- f7 h, `
; I9 }3 {4 s: l9 } String jobName = "user defined streaming source";8 u! C% d2 `; W1 Z
5 i- v* w! u& y2 `# E; i* U6 r% V env.execute(jobName);
c8 n- U- a0 A6 \. p
8 j Q7 N3 f0 A# @( W( X}1 O% \ w. O' z: f/ o: b
</code></pre>
" c+ j! E! _" k3 f! D+ [# q8 W; a<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
! V9 K8 x9 u# B5 k1 v2 j<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>+ s3 n: K: S& m4 L
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>/ _, c" K9 v$ u1 q3 f
<p>复制代码</p>4 B5 v* N5 b5 W$ Z3 M* [
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
5 c; [' C/ B5 H: U</code></pre>6 w% b5 d2 C4 W, ]9 F) I/ \
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>% F5 K2 K D) e" z1 ~2 x' _
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>1 I% @6 N2 A. [# I
<h4 id="sideoutput-分流">SideOutPut 分流</h4>5 {2 R+ G1 D+ a; Z I1 n
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
- U+ s1 R9 I( W7 w. s<ul>
2 U* x% o% N& t% r/ o) u<li>定义 OutputTag</li>
" j o$ `9 v$ ^: F b9 Z' U- u<li>调用特定函数进行数据拆分( k' u9 e$ j8 Z
<ul>) f ]/ I' f& V3 T2 n: z" i
<li>ProcessFunction</li>8 g: @. y8 I! U5 s
<li>KeyedProcessFunction</li>
" A6 o6 S$ a9 k3 j9 [+ A) N<li>CoProcessFunction</li>* w7 \( E4 W, E
<li>KeyedCoProcessFunction</li>$ p/ K0 Q9 Q0 S, b5 @2 g+ Z* e9 p
<li>ProcessWindowFunction</li>
9 f# q8 B$ R: `& \6 ?$ u" I<li>ProcessAllWindowFunction</li>5 h/ s0 i' Z0 u# B0 y' \
</ul>7 J7 B6 ?4 f/ c$ I6 G
</li>
4 ]4 E1 n& M. d# C$ G' l% E, Q2 b</ul>
: @& D' O; }% j( H% o- z0 J- M# V<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
& _! K' o3 R- D2 u2 X6 I/ {2 t<p>复制代码</p>
" j0 A- }5 O* I/ C<pre><code class="language-java">public static void main(String[] args) throws Exception {; Z4 A! c& d+ R# B( j
1 O: R. s3 ]5 R3 q8 y# r: y5 O0 z, w' j7 x, C
7 ~; t. `* V6 g% z- c! X, L& A) S
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();- ]! J, `3 L4 D* V( p+ }
" I; ^, {5 Q6 c, O; h! ~! x //获取数据源! V6 u% H8 N+ F! R0 R4 R" l6 ~
+ c. x/ P. e" ]6 B. g List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
: x9 M) w+ g1 H. G" u" A' H; |6 p, I" }6 {* u
data.add(new Tuple3<>(0,1,0));
" G; h4 a! W+ }2 q! D9 ]) D' c! @, H7 H( [
data.add(new Tuple3<>(0,1,1));
! |0 n$ d }, ^# c" v% U" E
% f. l9 T# W6 }5 h data.add(new Tuple3<>(0,2,2));- S- x! l. i) T% |/ M
: x$ E% n$ c) ?0 ^; F5 Y8 Z data.add(new Tuple3<>(0,1,3));) k9 F" {8 J$ e6 p3 a& l5 u" z5 ~0 Q
6 |2 m+ b( w W/ I& @/ {% C data.add(new Tuple3<>(1,2,5));
# I2 k$ |) e5 i- f0 X
( C5 l; I; H5 c6 k: | data.add(new Tuple3<>(1,2,9));
8 ~/ s7 o4 [8 }, @- C
6 u1 P. h! `4 b3 ~4 s/ Q* i data.add(new Tuple3<>(1,2,11));
' K. J5 r3 ?# E n3 Y1 M) S0 c- a2 b5 C9 P4 H
data.add(new Tuple3<>(1,2,13));
, X0 m9 S& E9 C* l7 _3 k3 M
- J% Q2 K' `% R( I# P4 w+ H4 I7 l
~: W4 m" _- F: I" Q/ s1 X4 S. z4 u" F5 A5 t- X& Q9 U8 b/ i" v
$ y5 J5 x+ U6 {8 w+ z) v# W3 y) G* _7 M- |. z
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);& F3 Z* A/ V: o# l0 |
! a% n( F5 s; X' V7 j- ?
8 I+ {* P: T @6 ?
$ e$ ~, R. i/ \: w, U OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};* w! ^, T3 h- r2 |% G
- U. H+ g: K/ O% h3 J OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
/ C1 q- C4 } R1 m3 r# g
# f, o0 Q' m: T9 t: {- |
& J R9 ?" W i! Y9 R( [
N, i7 f) c# P0 t4 P7 d. g6 }; z: O9 u0 j0 [( r) S2 l, [* @
, X9 k* d: R$ y& o
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {: W" S$ S# s( n4 B {
* j% j1 d8 {+ S% q7 Q @Override
: G7 b R2 W6 |) h9 }/ ?4 A$ @ I5 ]! y" E1 G! Z% i
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
% L( y7 S6 w' c& d% R1 m! M2 @/ I6 d) Q- n4 d t
% v9 z9 ?1 ]6 S, I- _4 C) X6 k- h+ A( s c# i' W% l/ l' \
if (value.f0 == 0) {
& h% l8 ?8 F% K
1 v+ F/ @9 J# A$ [, V/ A ctx.output(zeroStream, value);
2 {; c, l1 f1 _- |6 a; V; A7 d( |2 T: K3 t6 J8 |5 {6 i
} else if (value.f0 == 1) {& t2 E- W' S) ^7 J9 a9 B" W
6 Y7 E$ b; z0 @7 s# o: W% f1 w/ j
ctx.output(oneStream, value);
- |8 }5 W+ Y8 z% H8 W' v# x! Y
, |1 O" g# s# D0 o# ]& ? }
$ ]' h& P' V) P. w1 x% g1 @% O6 L' h" m5 r Z" i% {
}
e0 {, K( B( j8 j
0 G- B9 L+ B8 y });
6 a8 g* w& O: a
9 \& t$ n1 j& M. @2 s8 y N# f0 P) ?# h6 G1 W
8 r- v6 Y B- }! _4 y
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
% e( ]: j" S; f% w) L% P7 E5 d7 b$ l: W4 R
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);& B1 D Q% \' s* e: v3 r
* R; ^0 {& {4 [+ f. t/ n4 {' H# X0 r
7 N2 d1 U$ u a1 u$ c0 t
zeroSideOutput.print();
: S* |6 E b3 C$ O9 W( T3 U( j# d; C7 ?. Q& B6 J
oneSideOutput.printToErr();/ U) V- \+ h# T9 ~) K) t- j
; K/ D! ]% s& b/ G7 q, A! E4 E/ z* Y
# b% }8 n/ U+ u6 V r, L
* |" A* Y4 T6 |
( F. l1 c1 Y, u H //打印结果" L' p2 D. Y5 o0 z( p6 M( M
& h/ R& N4 _# O% K% P3 k String jobName = "user defined streaming source";
. ]- d7 U" t& q- W# r! `2 H5 q e: Y1 ?
env.execute(jobName);) \ d& d2 R$ [# x- @ l
7 Y2 P0 A* F6 t
}# g: g' M$ W2 \1 ^. C
</code></pre>
+ A' ?" W) g$ g<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
5 {4 l" h$ `; C$ I/ d! B<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>0 ^' x7 \0 R5 ?' N. |8 L5 X, Q
<h3 id="总结">总结</h3>. i( l0 l3 L6 Y* H2 t' M* R
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>! ~6 d: i1 U1 H" R
<blockquote>
, P* X: u f* ]<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>6 e4 d* {: S- ~1 f
</blockquote>
1 u" u5 e; t5 P$ P$ w* I5 p. e1 ]& T! G1 P- i5 l
|
|