|
|
. a4 ^4 g3 ]* g6 x: X, o' t0 b6 n
<h4 id="flink系列文章">Flink系列文章</h4>
* Z1 d& D) x6 K* \) A" U/ ]! b<ol>
, ?, ~9 N. |. J( K* R, H<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>% C$ m. z3 `3 o; L# ^; `5 g
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>- z1 O, s' u- W( O6 M
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
# O2 E0 g# V& Y) f" \0 B<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
+ A6 b# c& z; t+ `; N<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>8 j5 s0 z- F: {6 q
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
- {8 C8 e% L: s- g; Q" I<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
: Y2 l- \4 W+ f" o) E<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
: h$ u7 A0 K/ Q8 G. P7 l4 V& `<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
6 r* g3 B2 L+ B</ol>
- @/ V( A! U2 E. a8 @<blockquote>
6 R6 t. K! a1 L# d<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
5 Y# q8 h$ V; O5 e" z1 _</blockquote>. D9 a8 P$ v+ d! B
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>" C8 w9 T) k- n; [
<h3 id="分流场景">分流场景</h3>
& M6 O, y7 [% v% @. T& A<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>* M0 ^$ [ q+ P+ ~! `. z
<h3 id="分流的方法">分流的方法</h3>1 X4 j1 i, G: x
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
. r% @' w. t: d+ ^1 J<h4 id="filter-分流">Filter 分流</h4>' }3 d( e, D, H' \' g
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
* e+ `5 C1 J. V" L<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
/ ?) Q* z9 L: B: q1 t- |<p>来看下面的例子:</p>2 q; Z$ O% h# @/ w
<p>复制代码</p>
3 H, B2 C2 f1 @( u# ~9 b. T, u<pre><code class="language-java">public static void main(String[] args) throws Exception {2 y6 ~$ J8 ~0 s3 I% _( c; c- S
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6 L3 ?' k" ^) \ l //获取数据源% w- j3 D( F3 X5 m3 d. {, u1 Q6 Z
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
" o" ?7 P) P: d* |4 Z( j9 U' v data.add(new Tuple3<>(0,1,0));3 E- H0 i2 K6 z% H: n1 u5 F
data.add(new Tuple3<>(0,1,1));, l2 |7 n; g4 z" l
data.add(new Tuple3<>(0,2,2));
! k) J1 x4 \& W! J data.add(new Tuple3<>(0,1,3));
8 t9 }8 [% W2 B4 \0 a, T( l/ T- \+ V data.add(new Tuple3<>(1,2,5));, `. \6 f, F" g
data.add(new Tuple3<>(1,2,9));
- T" A4 |* v! Y1 e, W& u. F, R7 ?! C data.add(new Tuple3<>(1,2,11));
3 Y, o; y+ x" w' t3 m' L) D data.add(new Tuple3<>(1,2,13));- L" k6 w' a/ A
C. ^. _. F, j9 g" H DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);; F+ D0 F: j6 P9 b7 J/ G( ^
6 [, l" C' S8 M: o6 S& p. x2 W) K) m1 c% }" j$ u0 j( c
2 z1 s2 n# f4 V. Z4 ] SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
9 Z- Z1 B0 {' s }' G$ @+ R8 J% Z) o7 E" B/ X% R% \9 B( l2 V
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);# g$ T7 }$ a$ w3 L3 r+ Y2 ^% T
% P- J8 D4 j2 N" j& L3 m6 E2 Z( H/ m9 N4 P7 L+ `2 h+ R
# d% c" V F8 I. Z' w( {% Q0 ^ zeroStream.print();
* \$ @% k- J/ F6 e" M/ S# M* ^. [8 c. V# j5 a9 W, y0 h
oneStream.printToErr();
' d4 a, J( Y9 Z" P7 s* g4 `/ E' O8 `0 @
+ J5 H5 X9 Y8 L4 Y
3 o x( o/ l: P4 P" M: |
7 {& t2 ^) C7 |, x# G7 H% `9 n
' H) b' y* ^6 O( ?4 k //打印结果. j- u' f# @) H6 o
2 N/ N- a: @7 l. |
String jobName = "user defined streaming source";
0 H& ?! a4 C* m0 O2 W% n' O* l* J( u$ ^) W3 K
env.execute(jobName);
, a {( V* T0 I, ]0 F$ ]3 @0 w1 v, e2 F4 q! a6 N6 B# ]# I H
}
. F0 O8 @ u: z* y5 u</code></pre>6 d1 S: j" _$ c0 G6 q) N
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
, e/ W! |) Q/ {$ |<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
' D( {5 K6 v/ ]/ Q# _# T<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
8 h" o, x/ i5 d) M! A: M2 J/ U<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p># S2 j! z1 q# Z5 J# k
<h4 id="split-分流">Split 分流</h4>' u; a7 X+ Z3 J* m6 V: g5 O' ^
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
' k7 F; d6 f! B# K& f) a o% f<p>我们来看下面的例子:</p>; H/ M' e- g) Z+ D! {4 o1 C
<p>复制代码</p>
- J' L l4 f3 Z* t: a0 v4 N<pre><code class="language-java">public static void main(String[] args) throws Exception {
- g2 O. W$ y' o( o8 |: j! r0 Y8 r, b! _: a, e0 E) g
' J" b" S' ^8 {! H# c( R p$ z! H. r! q/ u& q
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
! K) \% M' X( R# f& o; W4 ^9 [6 w
/ a+ V1 u2 I9 O! q+ Y3 J' V //获取数据源
+ ?& a$ c7 q$ S' e0 a% P, S4 ?) x, [* ?/ T( u3 @0 ~
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
6 w- v! }& `0 ?0 Z L2 P/ e) k& e
; `6 q3 c4 ~# t9 ~ data.add(new Tuple3<>(0,1,0));
5 J6 ?& L! `4 M3 V$ f
4 O' o3 E# J9 w2 N" ?9 g- v data.add(new Tuple3<>(0,1,1));8 Y- [2 x# [5 W$ u- } [
6 g4 f5 Z- u6 n data.add(new Tuple3<>(0,2,2));
! t( w- G w" V- K" G$ I* Q# C% z c+ K4 w2 h; R
data.add(new Tuple3<>(0,1,3));! G# G7 I0 ?. i) U* G1 h$ ]7 U
& l. {7 Q2 w7 s) U data.add(new Tuple3<>(1,2,5));: d' G& K+ q$ ~: h) X( f+ ?
: |2 t) ]* x4 c7 D
data.add(new Tuple3<>(1,2,9));
3 v+ [8 I. t' a1 G* z1 p4 C3 B' G7 Y! } W
data.add(new Tuple3<>(1,2,11));3 ?4 S( h) ^% y+ L
; g) o" t9 t U" e) V$ Z/ X4 y2 T! n
data.add(new Tuple3<>(1,2,13));
' A; v: N! h. b- X
. C% X/ m3 }8 \& \# P( H/ T
2 J% ?- ^. u, O/ L) v. f, t% |/ D5 I7 D; D
; ^$ D1 U O+ v$ Q' V: S
2 A4 V% }. c7 \3 v6 g
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);* ~2 \8 b& Z9 ]
* m4 f( a. T- }; e2 [0 [' V! f' X9 k( u( ~
( k8 y$ h: M8 `! X7 C5 H
" `% t7 R3 x4 C
' M) L. o) K) [9 ?3 Q/ S SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {# @# K! X2 e r$ y: W
% @0 S% R# I$ {$ M6 ?4 ?7 F
@Override
1 d) c4 U- B6 s
9 \" f) F, J2 e9 @9 [* ]: g public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {5 n m# x; a8 ?
/ B0 g4 D4 ^! U+ I7 a. T9 J
List<String> tags = new ArrayList<>();
+ n- B; f% }8 G4 A% H; y
7 u3 h- U2 e) {! y2 o* Y' W4 ? if (value.f0 == 0) {
0 w3 b0 R* A5 P( q0 J6 u# ]* X
+ ]: @3 `. | | tags.add("zeroStream");2 c& N' }' A/ P1 [
* L* [' I1 Z* N3 M ~ } else if (value.f0 == 1) {# s; x3 B6 b# B8 I" z4 L0 z
( g5 }; n. H" n/ z, l tags.add("oneStream");: L" U' X2 ? b
$ Z1 K T6 P7 x
}! l7 p; y1 R; V4 l
* i; _- @) V+ p" v4 \ return tags;& L) z/ x# Q; y% ~5 A0 T8 }5 r; F
7 X6 n1 Q% t+ y/ _
}8 u0 a! D( q5 _0 [ L8 o
# f/ g: _$ u$ `5 ]3 h });5 |* l* X& P( h. r/ O3 y# W/ X
5 ~) z& M% T+ M) T
4 D% [. x+ G5 y6 g
5 I; t7 f" ? _6 r1 L" E; L splitStream.select("zeroStream").print();* v C, u4 Z4 T' N
' C, k9 `4 J$ u+ o% @7 i, j" I
splitStream.select("oneStream").printToErr();
1 C. z, x. K/ R6 x; h- G2 X1 u* p, ^/ \
X- f; s B. x. H+ i( g
0 F. u* h# H- A P( K) P* l0 N& e# _ //打印结果3 L! x3 ?% ?2 h! X# {: |7 j. m, H
/ B' I9 Y1 l8 O6 l9 D* F9 t5 ~
String jobName = "user defined streaming source"; ~" ^* S+ K: t2 a6 z- f0 K2 _
7 k$ B G4 F$ {3 D. C$ }8 W
env.execute(jobName);
1 V/ P& @% X( P4 _0 b* ?$ R+ q
5 A6 t: j6 c7 s; M8 W}
* F- |7 y6 n/ T. [</code></pre>) v% N' j& I) G0 N- A# l8 a
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>6 j6 x1 ^7 B- j! H: O
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
6 |& J2 p5 w( T<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
$ ?9 A1 ^4 f1 s6 C<p>复制代码</p>; a2 V+ B! n! |4 P2 u5 I o' x6 q
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.: u. ~3 d: R6 T2 g5 _* G I; @
</code></pre>
: B/ L' c% d( v! @( B% r0 Q( b8 ~- T% ]<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
6 d" Y6 {' }9 G* V- I<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
. |' x/ {5 [. I) X. i<h4 id="sideoutput-分流">SideOutPut 分流</h4>
, B& \$ ^4 t0 A3 g* i<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>+ _# S6 a4 B' `3 E
<ul>$ j4 ]0 ^5 y: ]1 C: c; m; X/ `
<li>定义 OutputTag</li>4 U7 |% t2 n$ r5 N( Z0 \
<li>调用特定函数进行数据拆分$ E8 p6 `8 t; _* M: o
<ul>$ S% e: Q8 }9 S4 c$ f& G4 h
<li>ProcessFunction</li>
5 A1 f0 {, j; I& S) }7 U2 K) H9 x( F<li>KeyedProcessFunction</li>
8 T. {9 Y& y. F8 H<li>CoProcessFunction</li>5 \) N' [: X3 A- j
<li>KeyedCoProcessFunction</li>
" U3 h- x' w) w4 F, J! N% h: P<li>ProcessWindowFunction</li>) I4 ]9 [: M" Z- I
<li>ProcessAllWindowFunction</li>1 t' G! D! B, H3 C
</ul>
8 b; h+ i2 P A1 S W9 e</li>7 ^' K- p9 {9 g; f* R
</ul># d) A) p% Q. u8 i% y" p
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
/ F5 B% b8 w7 P4 a<p>复制代码</p>$ N3 o9 Q! ]- q! Y# t6 B( z
<pre><code class="language-java">public static void main(String[] args) throws Exception {
& G# i3 h1 }1 C/ p( ^8 i' F% j: ]
+ I" m+ V) J1 Z+ U- j8 j+ s& q' A1 B4 U
( c) z1 E6 d. \) X* A9 _
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
# [% {. Y3 m% p! ]# c; ?$ }2 e; m; `; m a8 N: s
//获取数据源
+ M4 ?) n4 }; k( W$ `) T
7 p$ {4 I. ]7 W1 ? i& s, m/ | List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();1 M! U3 i, R$ f' `, |4 Y2 q
, q' G- ~ H% X( h N
data.add(new Tuple3<>(0,1,0));2 p2 f ^9 F7 `1 H
* Y) r3 N2 t: e# M% t' [5 Q$ w
data.add(new Tuple3<>(0,1,1));/ z4 c5 E+ k6 X( W6 n3 p" X; V
! @6 \$ n, |) L- S# l& G- [$ j data.add(new Tuple3<>(0,2,2));1 u* N8 x7 e' y( M/ D9 E) w7 }6 h+ e* a
6 d" x/ |& f5 I. L' N9 h data.add(new Tuple3<>(0,1,3));8 v3 W/ h# C& s! B
& {$ R0 [- C @
data.add(new Tuple3<>(1,2,5));
% {& m' L# z; ]1 f4 I+ A! K' F- D7 Z1 j1 h# z' r1 G4 }
data.add(new Tuple3<>(1,2,9));
* Y$ E- k Y6 C- W+ B6 F" t9 `6 v# l* h2 ?
data.add(new Tuple3<>(1,2,11));; e# p7 Y7 G# P% j8 S
, ]/ [# f! O$ z
data.add(new Tuple3<>(1,2,13));
+ R8 y3 M: _- A
) }; |$ E# `6 |! x$ g( n2 N7 ~ R" J' P4 B* O, S: ~
) T/ ~4 N' {% {9 |
8 S9 `# k, e. n
6 J& s4 T! ^9 g( ` DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);+ x! ?; C! t6 ]9 c {8 g$ T
6 [7 k( c: M9 K! N0 ^
( T2 i9 L% E1 j6 P5 j# O( o9 I3 I. P5 O
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
* G. q4 A0 Y/ A2 }7 X' C5 _% ~( e7 o9 v5 W7 r) O# b* v- k2 P
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};( e# {+ e8 d: L& q
) W& I* X& c8 s; ^7 f4 Z5 i$ [# i6 y1 r
$ k: B3 l1 d" C; r* o5 ~; V: d, C' R2 u( I5 C3 X
; X) a$ E5 f6 Q' ? @6 e& m# T SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
: W+ Z( d* Z2 y1 J" a
8 D) A) t( i, _ @Override
4 t2 X, B% p4 s
+ q/ h. h! y }0 |. G/ T! _2 o# T public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
: R2 W$ o6 p3 n6 f+ x1 p, \4 ?) ^$ K! S" K
8 ~( G% J& r# S7 K6 k- h
' {3 `( ^8 p8 |; `3 n/ }1 ~ if (value.f0 == 0) {
% r. K0 x$ g+ h/ w" P; X6 h1 |* l5 D" E- K, v* `; t3 X
ctx.output(zeroStream, value);
w0 H9 U# H) J5 `) p# i5 ]2 F
8 C. D$ z$ o* N/ p } else if (value.f0 == 1) {7 T2 f! B& C" U- b, a, H4 m
4 \( |" ?9 O, I) q L
ctx.output(oneStream, value);
5 G; b3 M- g$ V; a7 ^
+ \; r" I- A1 W$ s: q }
1 \: N& |9 X& {2 i& X7 X$ @8 L% M- | \; x. a3 F
}( c, {% a+ H- t8 Z6 `
) a) S1 g. M# M$ ~" D7 o });8 M3 R% s3 l u: T
& R" j' g, }; v/ f3 ^
' U# t- }3 x' G1 g7 N1 s
' ]4 ~! ?7 E* M8 m DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);. ~% z( V/ p. L& m( }* T3 i+ A
3 \5 A% H- l- k3 u8 ]
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
! _% X6 `- Q* w2 q+ X( P( S( G/ [1 n6 h3 M+ M
& J# v8 c+ A6 k( G9 i$ h3 w5 x' e7 `* t5 v- ?& ^/ b" f
zeroSideOutput.print();* M1 S: T. ^/ p
8 ]0 v" W/ k( ~2 p3 s3 M* z0 M
oneSideOutput.printToErr();" m* [- X+ ^, [
6 ^+ o. S% X9 X5 J$ V9 G: T( }" X" R
4 P0 J4 ~# P. p8 D( D0 e5 C% Z
|& Y/ \' p3 }! e, e6 R7 r. A) W% {& \! q% q% J+ l- o1 g2 U, A
1 y" N& P9 b/ o# [ M# M
//打印结果
! d% i7 A+ a w- }# @$ o5 \4 N+ i4 T4 y; Z- ?- i" V& d' q
String jobName = "user defined streaming source";+ n' w; W* ]5 [/ V3 g/ }
$ X1 \( o% K" W" L8 a env.execute(jobName);
* b& K( p( m0 K" P7 w' p @+ M" s
}
! X: O2 `' _- [1 j</code></pre>- _" _7 u3 S2 m5 v
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
% k* {5 l) m* k# e0 Z8 T, D<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
4 ~8 C( K; A% K( W5 U<h3 id="总结">总结</h3>' b( d! S- Y$ z: g% h& t! S
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>* l/ Z! f- O! J9 U' Z
<blockquote>
; z0 ^( Z( h! B1 }3 ~* M<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
% R* g( W3 }6 G! L</blockquote> {7 g7 ^1 j% r- \5 j) [
4 R: ?9 T5 E9 {8 d. Y, R; O" D
|
|