Apache Flink的所有java api中文文档 有更新!

  Janix520

Flink中的DataStream程序是实现数据流转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。最初从各种源(例如,消息队列,套接字流,文件)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

DataStream转换

数据转换将一个或多个DataStream转换为新的DataStream。程序可以将多个转换组合到复杂的拓扑中。

本节介绍了所有可用的转换。

  • 采用一个元素并生成一个元素。一个map函数,它将输入流的值加倍:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
  • 采用一个元素并生成零个,一个或多个元素。将句子分割为单词的flatmap函数:
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
  • 计算每个元素的布尔函数,并保留函数返回true的元素。过滤掉零值的过滤器:
dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
  • 逻辑上将流分区为不相交的分区,每个分区包含相同密钥的元素。在内部,这是通过散列分区实现的。见如何指定键。此转换返回KeyedDataStream。
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
  • 键控数据流上的“滚动”减少。将当前元素与最后一个减少的值组合并发出新值。
    reduce函数,用于创建部分和的流:
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
  • 具有初始值的键控数据流上的“滚动”折叠。将当前元素与最后折叠的值组合并发出新值。
    折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..
DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });
  • 在键控数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
  • 可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个密钥中的数据进行分组。有关窗口的完整说明,请参见windows。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
  • Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。有关窗口的完整说明,请参见windows。
    警告:在许多情况下,这是非并行转换。所有记录将收集在windowAll运算符的一个任务中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
  • 将一般功能应用于整个窗口。下面是一个手动求和窗口元素的函数。
    注意:如果您正在使用windowAll转换,则需要使用AllWindowFunction。
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});
  • 将功能缩减功能应用于窗口并返回缩小的值。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});
  • 将功能折叠功能应用于窗口并返回折叠值。示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”:
windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
});
  • 聚合窗口的内容。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
  • 两个或多个数据流的联合,创建包含来自所有流的所有元素的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次元素。
dataStream.union(otherStream1, otherStream2, ...);
  • 在给定密钥和公共窗口上连接两个数据流。
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});
  • 在给定密钥和公共窗口上对两个数据流进行Cogroup。
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
  • “连接”两个保留其类型的数据流。连接允许两个流之间的共享状态。
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
  • 类似于连接数据流上的map和flatMap
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});
  • 根据某些标准将流拆分为两个或更多个流。
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});
  • 从拆分流中选择一个或多个流。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
  • 通过将一个运算符的输出重定向到某个先前的运算符,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的元素将被发送回反馈通道,其余元素将向下游转发。有关完整说明,请参阅迭代
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value <= 0;
    }
});
  • 从记录中提取时间戳,以便使用使用事件时间语义的窗口。查看活动时间
stream.assignTimestamps (new TimeStampExtractor() {...});
  • 从元组中选择字段的子集
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

物理分区

Flink还通过以下函数对转换后的精确流分区进行低级控制(如果需要)。

  • 使用用户定义的分区程序为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
  • 根据均匀分布随机分配元素。
dataStream.shuffle();
  • 分区元素循环,每个分区创建相等的负载。在存在数据偏斜时用于性能优化。
dataStream.rebalance();
  • 分区元素,循环,到下游操作的子集。如果您希望拥有管道,例如,从源的每个并行实例扇出到多个映射器的子集以分配负载但又不希望发生rebalance()会产生完全重新平衡,那么这非常有用。这将仅需要本地数据传输而不是通过网络传输数据,具体取决于其他配置值,例如TaskManagers的插槽数。
    上游操作发送元素的下游操作的子集取决于上游和下游操作的并行度。例如,如果上游操作具有并行性2并且下游操作具有并行性6,则一个上游操作将分配元件到三个下游操作,而另一个上游操作将分配到其他三个下游操作。另一方面,如果下游操作具有并行性2而上游操作具有并行性6,则三个上游操作将分配到一个下游操作,而其他三个上游操作将分配到另一个下游操作。
    在不同并行度不是彼此的倍数的情况下,一个或多个下游操作将具有来自上游操作的不同数量的输入。
    请参阅此图以获取上例中连接模式的可视化:

dataStream.rescale();
  • 向每个分区广播元素。
dataStream.broadcast();

任务链和资源组

  • 从这个运算符开始,开始一个新的链。两个映射器将被链接,并且过滤器将不会链接到第一个映射器。
someStream.filter(...).map(...).startNewChain().map(...);
  • 不要链接地图运算符
someStream.map(...).disableChaining();
  • 设置操作的插槽共享组。Flink将把具有相同插槽共享组的操作放入同一个插槽,同时保持其他插槽中没有插槽共享组的操作。这可用于隔离插槽。如果所有输入操作都在同一个插槽共享组中,则插槽共享组将继承输入操作。默认插槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)将操作显式放入此组中。
someStream.filter(...).slotSharingGroup("name");

------你的笑你的泪,是我筑梦路上,最美的太阳。