Flink批量广播变量使用以及ml mapWithBcVariable方法使用

人工智能机器学习

浏览数:172

2020-7-5

方法1:使用flink DataSet API

points.map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids")//申明对map操作进行广播
import scala.collection.JavaConverters._
final class SelectNearestCenter extends RichMapFunction[DenseVector, (Int, DenseVector)] with Serializable{
  private var centroids: Traversable[DenseVector] = null
  override def open(parameters: Configuration) {
    centroids = getRuntimeContext.getBroadcastVariable[DenseVector]("centroids").asScala
  }
  def map(p: DenseVector): (Int, DenseVector) = {
    //use centroids ...
  }
}

方法2:使用Flink ml mapWithBcVariable方法

points.mapWithBcVariable(currentCentroids) {
          (point, center) => {
            //直接使用广播变量center
          }
        }

作者:ch123