|
|
' s% y) I9 \3 v: _4 A5 i<h4 id="flink系列文章">Flink系列文章</h4>
/ y B0 `* m1 }0 u }0 g<ol>
9 }) k( y5 T4 s- I* ^1 L ~<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
* u1 P! q+ [+ x$ V1 b<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>5 x0 c7 o0 p4 U* E# Y0 e6 l
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
/ K, @4 t# R* _- q8 \. v+ R<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>9 M; p2 ~% D) {
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>0 e; R$ W2 ^: ~& D8 d1 Q" Q
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>8 q8 Z# [4 }( U
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
' Q. Q2 b" o3 G" e0 t+ u5 u( P<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
$ [) l/ |+ L( u2 P A<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
! n/ {& j! r3 `- V. b$ j</ol>
" }. \; }+ p+ I$ |<blockquote>
7 X5 A2 {( c* \8 u6 r( N<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>8 }( v: S* b+ v7 i- ~' w: O& n
</blockquote>- A$ L ^: z( ~ ]" z* D
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
7 b8 y0 c1 a e3 i/ \6 W* v# i<h3 id="分流场景">分流场景</h3>
9 A% _3 z# p2 Z! v2 N3 c2 i/ E<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
' w1 K4 v" B1 W7 r" D# e' W3 r<h3 id="分流的方法">分流的方法</h3>
% D' A9 B$ H0 J K- ^9 A9 B<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>" U% ~+ ^: ]4 P, t
<h4 id="filter-分流">Filter 分流</h4>4 F7 y" Y3 ^ ^/ w0 f3 U% X! b2 t
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
8 f; y: ?" c, m<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
8 o4 k6 n/ ^0 Q8 R! |! f( y! O<p>来看下面的例子:</p> W; x, H- s$ Q5 d7 }7 H/ i+ \# r
<p>复制代码</p>
! L( k4 |4 O- {4 V<pre><code class="language-java">public static void main(String[] args) throws Exception {
- p6 T8 U5 L8 ]& K. P+ @ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
& M# @% K) {2 W; _1 }+ } //获取数据源
8 v/ p- _6 t0 g4 q" n( D" ^ List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
) e4 y8 _9 g1 O; t0 n data.add(new Tuple3<>(0,1,0));
8 P( q5 X( s5 S( s* |. t+ E; F, D& ^ data.add(new Tuple3<>(0,1,1));
/ d) i" {7 S i data.add(new Tuple3<>(0,2,2));
: u' X& r4 V# e4 [ data.add(new Tuple3<>(0,1,3));
, y9 M& b5 V* X4 L data.add(new Tuple3<>(1,2,5));
' h% i6 E& f1 _7 | F data.add(new Tuple3<>(1,2,9));1 d3 R9 g1 X9 j! n4 B
data.add(new Tuple3<>(1,2,11));
/ Z% V) r% j( @( y data.add(new Tuple3<>(1,2,13));
9 p4 m* Y- c: r3 a' }$ o0 ~5 z! ]* i; @1 q
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);# C) U* B" _+ q9 u
' x# y8 W, y( F0 b* c: x/ V# s- m$ b* N; e+ ]* l$ h+ ~
" v! e* V6 S. p1 F9 i4 ^
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);6 Z5 _5 @7 |; Y* v# A# `
) j8 K g! l5 R. y, \0 s9 m SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
9 B, F: P( c0 Q8 |
5 ]1 Q ~0 l# `& T
4 P* c1 e4 e/ D1 m- k* C6 y" A
! ^1 M$ m9 h. Q zeroStream.print();
" f: A2 O1 |9 e' q* R) p; }
* O. O$ p6 G2 `' a3 `1 W" ` oneStream.printToErr();& ?6 O9 D, e4 G
x3 Z9 U) v$ J/ x; C2 h" c$ W! z7 z: Z' @; _3 P |( r
' }% l7 q \. R" e# }. O9 Q/ v! v7 L" t4 u9 Y
: z. u* ^1 H2 |% [: b; Y- O
//打印结果. ^9 X) t: T4 M# l+ ^: U- Z2 t
% S8 j* H. k6 M) w( E6 P+ V* s String jobName = "user defined streaming source";. G0 T1 l7 e5 h
9 I) C4 U4 g/ l4 G5 s8 K
env.execute(jobName);
( N4 P4 V& P, a, S& Q; x
! c4 A1 h/ U5 z% b7 `: V}$ r- T( B+ p- A! p1 R
</code></pre>
( m9 Y, L% q5 I<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>, X) S2 J$ i! Q8 G3 {; n
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>9 P; o+ v; g! P8 y
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
3 L* x' i5 w- t2 }, i<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>, c6 Y G5 {) J4 b* w& h
<h4 id="split-分流">Split 分流</h4>
3 A, |7 T( l% E<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>* B; o# e4 k# b( C
<p>我们来看下面的例子:</p>7 ?% z; W5 H( K" x) u, L
<p>复制代码</p>) d7 M7 i+ s3 [* g0 G7 H
<pre><code class="language-java">public static void main(String[] args) throws Exception {
C7 _' s$ G; W7 [; e; C$ e! W
" z" b) I7 L# \8 C/ S8 i5 c* k* @( e) @* k, w' C, p
0 Y0 x4 F7 I+ `" I StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();1 X5 q% H3 b* X: F! }- |4 f
$ }' s) Q: Q# l `* G: R, J2 O
//获取数据源
5 s/ n9 R1 K" r' ^9 E( B4 C# S3 |/ @( C4 k
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();4 ^! }; z1 T; ^* }3 y$ k0 o
% a1 l& b9 @+ ] M9 }* M9 } data.add(new Tuple3<>(0,1,0));$ }9 @4 @$ C9 R5 N" R
+ t: l; h: c) }. w# X# v1 w" g: Z data.add(new Tuple3<>(0,1,1));4 R" j6 ]# A& ^( r
7 H& q* `- a9 z7 m& r
data.add(new Tuple3<>(0,2,2));
3 e% R4 \4 _$ U. O8 _. a/ f% _9 o* E% T; a+ _0 `4 B
data.add(new Tuple3<>(0,1,3));7 G0 `$ U) a6 T' F$ o* j1 P
5 F+ F) Q2 Y7 ?; l1 X6 _( S data.add(new Tuple3<>(1,2,5));
/ K3 N, @# }: R
0 j& ^6 C" J6 \' ^ data.add(new Tuple3<>(1,2,9));
% @+ K. y/ ?! E& T: m
2 l% a1 u- o' o data.add(new Tuple3<>(1,2,11));9 N0 H$ o! Y6 P7 r2 L! P& T, g2 ]; h
E B5 O# W Y1 n* P5 g. E: T data.add(new Tuple3<>(1,2,13));
9 A+ Q0 I3 K% B
: o3 k+ ]' s0 }3 ~
3 _0 ?% _$ a2 Q! a B7 k; ~8 `# b
4 b; d# A1 j5 {: r: j) @3 Z4 @0 P; C! R9 K$ W" F/ y$ r( K, ^& L" A
+ ?4 L, i5 C. k9 P+ E DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
- y: Z, R* E+ S( M$ T! w
}* T* ~- N- z4 l- f5 K, V
/ ?; v4 B3 e, ~3 [% S( y, M5 c3 L( D8 \/ n: E
/ ^( [' Q3 N4 b5 g8 l0 O9 `( c
* b9 v8 _: G6 S" N; k0 _' R' H: p
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
# j/ O6 S/ {4 q% U8 e1 Q* B; A) w7 V; c) c! u8 |
@Override
+ N, r! }' S9 ?: C A2 g" t- P$ A: Q B0 @5 v( {$ r _6 l1 k3 c4 f
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {. S1 \1 p+ m: }: V# L
% H3 f3 c8 \. d List<String> tags = new ArrayList<>();. N1 W' m1 p; b+ ?0 L a* d
( G, z" n2 Y1 g* U* g5 ? if (value.f0 == 0) {. l1 D9 r# P2 b2 _5 c2 K( X& F
X: H8 ]+ x/ k; l0 c, }) J2 X
tags.add("zeroStream");5 y* F% {+ t# r$ Y: b: z
1 u4 M( v9 _0 T9 ]+ R5 g Y7 K
} else if (value.f0 == 1) {5 X2 n: r5 J) n, E [9 G8 a
6 a. b: R$ R$ O tags.add("oneStream");
7 V" C$ R. P6 Q& [0 r1 O5 \1 I
' o1 W- {( i( K }1 I: i+ G" S* g, v
! Y4 ]/ Q! D( Z8 V& f" n
return tags;
9 s; Z- @& z! [- G1 _7 u( U3 D' |% { S
}; U/ s, o1 H' s
0 W6 ?3 ^8 h& a* y$ t$ G+ r: j });
A4 Q$ P# B# J x& I0 G
( N+ T6 e1 P) r& S( ^4 n# m6 S
. O* J# R `$ A/ D/ g/ x. w2 H3 Y7 \" h* U
splitStream.select("zeroStream").print();; I( ^! y) C m$ N0 T1 a: U' F
" R2 P! C9 \+ @# V( M% y
splitStream.select("oneStream").printToErr();
/ S% P: _# l- C3 l
2 M5 b2 k% G1 f% ?: z! ]0 h, w3 ^8 a3 g) ^
2 H0 I/ J$ T3 f/ _ //打印结果& R- q# [& o4 m0 x1 p+ O6 P5 N
; O. p7 o7 H0 u) M3 r
String jobName = "user defined streaming source";
: n% e4 |7 ?+ X9 ?
. ?' P3 { l- Z- j/ E* ~ env.execute(jobName);
/ l! F. S7 L0 C& E+ S$ ~
1 [: O! D1 l. F6 [}1 r8 W- C* [# V. P$ s) I( r
</code></pre>
6 D; V+ A4 @9 [<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>/ N+ J- x! A8 w/ H0 h$ z( ~
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>/ r8 B/ ~( i, h; T
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
1 V; L9 j2 D7 j2 t0 g8 A3 X<p>复制代码</p>
9 R6 {8 f, |4 A) z; U<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.. M6 f1 z6 y4 {: ?2 j; i
</code></pre>
' v# S# |. J1 x5 e<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
" ?; H2 s: x$ z0 B- T9 R<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>: S# c0 z' r! h" z" ]7 q- c
<h4 id="sideoutput-分流">SideOutPut 分流</h4>0 [* J) I T9 g. F9 A2 H* ]
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>: ]% ]- I8 |: Z ]) T7 L
<ul>/ n- _1 }) ~8 j f
<li>定义 OutputTag</li>$ e$ }9 b* ~* \# M" y: h$ W8 z' a
<li>调用特定函数进行数据拆分# ^( a$ Z2 d6 L# O
<ul>
& [) Z- Y. l0 T$ Q1 ~2 S% x2 g1 Y<li>ProcessFunction</li>4 A* t+ b$ O8 c% N% L( |" g" q* S% K
<li>KeyedProcessFunction</li>; r( V; p, B! C4 r5 s7 t6 B, j
<li>CoProcessFunction</li>
& [ @" T% s$ v<li>KeyedCoProcessFunction</li>1 i% U9 E3 M1 B9 {
<li>ProcessWindowFunction</li>' N& ]0 b' Z$ e' ]8 A
<li>ProcessAllWindowFunction</li>* T, I5 ^3 S. z. Y
</ul>, R& u2 _% y1 b5 H( V
</li>3 q0 o, h* ?! {$ W s- B
</ul>
8 B4 X0 C/ Y# t, i% s3 b, x<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
1 S5 w M: e6 k. h<p>复制代码</p>
) F; C" B5 C% n1 t<pre><code class="language-java">public static void main(String[] args) throws Exception {
& A3 z. E; u# t4 R _/ I& }
" [* F0 D2 {- K% m
+ J' {. d: U9 l7 M! F. o& U/ x' N" q' u, I% ?9 T3 C3 O. B5 Z1 o
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
) P6 Y7 {$ k' J; Q5 x" }4 \* G0 |& ^' }: h) j4 M
//获取数据源; M! y2 k0 e1 H' _/ M
0 ~5 Q# ^* g# G% I
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
7 Y Y& P Y3 m0 k& @- d
1 ]+ D: Z' ` O3 k% S- ? data.add(new Tuple3<>(0,1,0));
, [7 x- y8 [( B( W
' O6 p5 b, c% ]6 E data.add(new Tuple3<>(0,1,1));
7 z, r) J' q( d- \+ G9 N& t( A# `5 ~2 t l. k
data.add(new Tuple3<>(0,2,2));
, R- c3 M- r5 l6 Y, j2 l3 X, a
5 {$ K. X; y6 |: }, d data.add(new Tuple3<>(0,1,3));
- W& E+ |8 W2 U( Z1 j8 J
% {% f& `9 h+ _( T data.add(new Tuple3<>(1,2,5));" w( ]4 s& f2 C# F& g" K
2 P& y O9 U# G: @( J
data.add(new Tuple3<>(1,2,9));
) J# X2 P9 h: f/ c+ Z
. q+ H" p/ H3 F. b" H data.add(new Tuple3<>(1,2,11));
; r- g( U" _1 w8 }6 N% h3 i
6 Q/ d* c5 p/ N) g data.add(new Tuple3<>(1,2,13));
) i, u6 q: a; G; @9 R& I( y5 b4 N0 c) ^
% p7 l( K( N0 Y9 s
( w; a+ O }+ q, x( e) B0 F
, e7 H% m* Q: k2 K$ d: f; A9 C$ a' \& l) @
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
5 D! C* b/ @! [$ v# s' R5 k! Y9 |1 ~3 ^3 m9 v
# q7 h {8 K# V% J! e: ~2 w7 @$ d G2 u3 o
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
, B& V: l1 P' e' h, N2 f6 I7 O# @' T7 J; K3 `6 |' {. q1 a
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
& _0 C. D3 u' j$ `, ~& l7 Q3 g* |5 v) F9 K- Y) Q4 ]2 W" O
( n, b. O' `8 c4 t U
, d {0 j- }) S6 e6 i# I4 }) M0 m P1 E$ r6 B
* U1 G! M. A; A6 B( x B- M SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
2 H. F9 `4 L3 \% N2 c& B% k) F6 @. O1 d2 l. G m- j
@Override; b7 l* y' j2 N, a [
2 U+ b6 z/ B8 B: t7 n: @1 q% n# d$ @ public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
3 y8 ]7 o* n6 W8 t+ S. y1 V- D* V; ~1 |
& C& U) R+ r4 E. E
2 s; Z/ Y1 z2 Q% O* z$ g, u if (value.f0 == 0) {
, g$ N$ u) L9 z4 |# j8 l5 i
7 |7 Z& Y2 i" Z0 `7 ^ ctx.output(zeroStream, value); W4 B' o* A2 ?/ e) O1 a
% w8 ]7 T) }( A' n& j' J } else if (value.f0 == 1) {
# h6 S# a4 |/ @5 Z- m; A1 a p$ q$ `8 u1 v
ctx.output(oneStream, value);
/ @) u, r. L% Q4 V; ?
& H: m8 k9 B7 s; n" F2 t }
I# q9 o7 u: s' k' @5 m' |
0 \) |* U( @% M$ y3 F }
6 z) L) e1 N- J( W1 o
4 a! F" Y! j8 U' v });
, `/ h& |1 O- \ K6 W6 q/ P
# b# T8 J4 d: v( x1 O( |0 P0 j# z) @: V4 Y, n; Z1 N8 A
+ ]4 s6 e) g3 A" P, K5 X
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);+ h1 M9 L9 T" H1 V9 k& c( U% J* P
, n! l. @& [. m: @ _5 Y8 w
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
$ r2 g9 L8 ~4 D: M) K% L* B# W# s5 R/ F, |; |) H+ G
/ M4 W6 ]$ y4 h1 N( z6 U! ~ F8 H" d& k6 v3 Q; a
zeroSideOutput.print();3 j6 J( @6 ?! m v, V5 @6 v
" w, ^8 F# K4 O
oneSideOutput.printToErr();
( p# I( O& _5 P/ R' c6 Z) r) T' @7 @
/ }, t4 |/ D: @
6 m1 _( |! P2 e' u3 i
0 ]8 ]$ b w9 Q% u1 a ~- p: Q. S' ~$ D7 y5 m
//打印结果: a# E. c0 t v5 v
% |/ f+ Q" x X* a5 u- G String jobName = "user defined streaming source";
7 T4 G, u$ J+ |) R l5 ?. u" r4 ]8 w q: ?, V3 P, `5 \
env.execute(jobName);
, F0 y0 ~8 K. N) H7 k5 O/ c+ C2 [1 c4 u% f g$ u) f3 S7 @
}
! }; B, z8 Y" D& g, f z</code></pre>0 c, m. K/ d3 q+ j$ Y. T
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
4 K- X& T) K# \9 M0 l) }<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>; L% l- [: q# a2 T; F+ v: v
<h3 id="总结">总结</h3>
% }. B0 G& ^( {5 O) V; U6 l# [<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>+ k; g! \5 o$ K
<blockquote>
J0 C7 j+ W1 f( ?) y7 Q<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
6 P; c, _/ {+ `) X# Y' s' j</blockquote>" r9 c6 T2 M1 F
& [) q6 y& ^5 I1 H
|
|