Ch09_Streaming
Yang Haoran 4/1/2021 FlinkJavaBigData
# Streaming API(source)
从集合中读取数据:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //从集合中读取数据,可以把类的对象封装成集合 DataStream<String> collectionSource = env.fromCollection(Arrays.asList("aaa", "bbb", "ccc")); //直接读取元素 DataStream<String> eleSource = env.fromElements("as", "sss", "sda"); //输出,参数可以区别哪个输出 collectionSource.print("collection"); eleSource.print("element"); //参数为jobname env.execute("jobone");1
2
3
4
5
6
7
8
9
10
11要按照顺序输出可以在全局中设置并行度为1.
从文件中读取数据:
DataStream<String> fileSource = env.readTextFile("path");1自定义数据源:
DataStream<String> twitterSource = env.addSource(new V2source());1
通过参数的collect方法来获取数据:
public class V2source implements SourceFunction<String> {
private boolean running = true;
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
InputStream stream = FilterTwitterStream.getStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
String line = reader.readLine();
while(running){
while (line != null) {
sourceContext.collect(line);
line = reader.readLine();
}
}
}
@Override
public void cancel() {
running = false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22