一个简单的示例,说明如何使用2个入口和3个出口定义自定义形状。
case class TwoThreeShape[-In1, -In2, +Out1, +Out2, +Out3]( in1: Inlet[In1@uncheckedVariance], in2: Inlet[In2@uncheckedVariance], out1: Outlet[Out1@uncheckedVariance], out2: Outlet[Out2@uncheckedVariance], out3: Outlet[Out3@uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(in1, in2) override val outlets: immutable.Seq[Outlet[_]] = List(out1, out2, out3) override def deepCopy(): TwoThreeShape[In1, In2, Out1, Out2, Out3] = TwoThreeShape(in1.carbonCopy(), in2.carbonCopy(), out1.carbonCopy(), out2.carbonCopy(), out3.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.size == 2, s"proposed inlets [${inlets.mkString(", ")}] do not fit TwoThreeShape") require(outlets.size == 3, s"proposed outlets [${outlets.mkString(", ")}] do not fit TwoThreeShape") TwoThreeShape(inlets(0), inlets(1), outlets(0), outlets(1), outlets(2)) } }
这种怪异形状的用法示例:一个阶段,该阶段将通过2个流的元素,同时保持流中有多少个元素通过的比率:
def ratioCount[X,Y]: Graph[TwoThreeShape[X,Y,X,Y,(Int,Int)],NotUsed] = { GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val x = b.add(Broadcast[X](2)) val y = b.add(Broadcast[Y](2)) val z = b.add(Zip[Int,Int]) x.out(1).conflateWithSeed(_ => 1)((count,_) => count + 1) ~> z.in0 y.out(1).conflateWithSeed(_ => 1)((count,_) => count + 1) ~> z.in1 TwoThreeShape(x.in,y.in,x.out(0),y.out(0),z.out) } }