GPars的Actor实现

Thu 28 April 2011
  • 手艺 tags:
  • groovy
  • java
  • opensource published: true comments: true

Actor是一种Continuation技术,可以在少量的线程运行大量Actor对象。Actor对象之间通过消息机制进行交互。而Actor本身线程安全,这样的模型使并发编程的复杂度降低,同时也在一定的场景下实现了可扩展性。

gpars是Java和Groovy都可以使用的并行编程库,他实现了Actor、Agent、DataFlow等模型,旨在为Groovy提供高层的并行编程模型。以下分析gpars 0.12中非阻塞Actor的实现。

[cc lang="groovy"]
@Grab("org.codehaus.gpars:gpars:0.11")
import groovyx.gpars.actor.Actors

def worker = Actors.actor {
loop {
react {
reply it.reverse()
}
}
}

def console = Actors.actor {
worker << "Hello GPars"
react {
println it
}
}

console.join()
[/cc]

首先,在工厂类中Actors里会初始化一个DefaultPGroup用来封装后台的线程池、管理actors。Actors默认使用ResizeablePool,他是JDK Concurrent Framework中的ThreadPoolExecutor的封装,coreSize和maxSize不同所以称Resizeable。

Actors的工厂方法actor生成的是DefaultActor,它是非阻塞actor的默认实现。(ActorGroup:67)

DefaultActor的构造方法接受一个Groovy的闭包对象,将其封装为DefaultActorClosure对象后,调用其父类AbstractLoopingActor的initialize方法(DefaultActor:73).

initialize方法创建一个Runnable对象AsyncMessagingCore,并将线程池传递给core对象。(AbstractLoopingActor:57)AsyncMessagingCore对象负责消息的传递和处理,是线程池处理的目标对象。

调用start启动actor后,actor会向自己发送一个start消息(AbstractLoopingActor:173).
core获得start消息后,调用DefaultActor覆盖的handleStart方法(DefaultActor:328)。

在handleStart中,actor会调用用户传入的闭包方法。上面的例子是一种典型的用法,loop是DefaultActor中的方法,loop也并不是无限空转的,他仅在收到消息被时被触发(DefaultActor:191)react也是DefaultActor中的方法,它将nextContinuation方法设为内部闭包对象,用来处理actor接收的消息。

向Actor发送消息,是通过actor的send方法和重载的leftShift运算符进行操作。(AbstractLoopingActor:236)actor调用core的store方法,将ActorMessage压入core的队列中。入队列之后,core会检查锁对象activeUpdater,判断当前core是否在线程处理中,如果不在,则将core加入线程池中处理。activeUpdater是一个AtomicIntegerFieldUpdater对象,他的compareAndSet可以保证原子性。而通过activeUpdater也可以保证同一时刻只有一个core被线程池处理,从而使actor的线程不安全代码也线程安全地运行。

进入线程池后,core首先将自己放进threadlocal对象中,并保存当前线程的引用。然后会循环消费MessageQueue中的消息直至Queue的可处理部分为空。(AsyncMessagingCore:126)。handleMessage在AbstractLoopingActor中被覆盖,会根据消息的类型进行分发调用(前面提到的start消息就是一种)。默认的业务消息,在DefaultActorClojure中调用DefaultActor的onMessage方法处理。

onMessage中,react的闭包会被调用来处理业务。之后nextContinuation被置为null,这时loop闭包被重新调用,react闭包重新被赋给nextContinuation。这部分代码就是前面所说的loop并非空转,而是在消息处理完成后重新准备而已。

此外,core的MessageQueue的实现是DefaultMessageQueue。它使用两个LinkedList作为输入(向actor输入)队列和输出队列,当输入队列为空时,通过同步方法swap交换输入输出队列。swap是整个actor系统里唯一一个同步方法。这样的机制保证actor的core在线程池中处理时,外界仍然可以向actor发送消息,消息会在actor被调度出线程池之前全部处理掉。不过,他的前提是只有一个线程读这个队列,这个条件在actor系统里,通过core对象的activeUpdater可以有效的保证。

Actor模式采用这种onDemand方式的线程使用,允许大量的actor共存,并只有活跃的actor会占用线程,非活跃状态的actor处在dettach状态,并不消耗计算资源,取消了空转的loop。