Ch09_Streaming

4/1/2021 FlinkJavaBigData

# Streaming API(source)

  1. 从集合中读取数据:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //从集合中读取数据,可以把类的对象封装成集合
    DataStream<String> collectionSource = env.fromCollection(Arrays.asList("aaa", "bbb", "ccc"));
    //直接读取元素
    DataStream<String> eleSource = env.fromElements("as", "sss", "sda");
    //输出,参数可以区别哪个输出
    collectionSource.print("collection");
    eleSource.print("element");
    
    //参数为jobname
    env.execute("jobone");
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    要按照顺序输出可以在全局中设置并行度为1.

  2. 从文件中读取数据:

    DataStream<String> fileSource = env.readTextFile("path");
    
    1
  3. 自定义数据源:

    DataStream<String> twitterSource = env.addSource(new V2source());
    
    1

通过参数的collect方法来获取数据:

public class V2source implements SourceFunction<String> {
    private boolean running = true;
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        InputStream stream = FilterTwitterStream.getStream();
        BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
        String line = reader.readLine();
        while(running){
            while (line != null) {
                sourceContext.collect(line);
                line = reader.readLine();
            }
        }

    }

    @Override
    public void cancel() {
        running = false;

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Last Updated: 11/19/2024, 1:54:38 PM