Scala Actor是Scala里多线程的基础,核心思想是用消息传递来进行线程间的信息共享和同步。

河曲ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联建站的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:18982081108(备注:SSL证书合作)期待与您的合作!
编辑推荐:Scala编程语言专题
Scala Actor线程模型可以这样理解:所有Actor共享一个线程池,总的线程个数可以配置,也可以根据CPU个数决定;当一个Actor启动之后,Scala分配一个线程给它使用,如果使用receive模型,这个线程就一直为该Actor所有,如果使用react模型,Scala执行完react方法后抛出异常,则该线程就可以被其它Actor使用。
下面看一些核心代码。
- def start(): Actor = synchronized {
 - // Reset various flags.
 - //
 - // Note that we do *not* reset `trapExit`. The reason is that
 - // users should be able to set the field in the constructor
 - // and before `act` is called.
 - exitReason = 'normal
 - exiting = false
 - shouldExit = false
 - scheduler execute {
 - ActorGC.newActor(Actor.this)
 - (new Reaction(Actor.this)).run()
 - }
 - this
 - }
 
其中Reaction实现Runnable接口,scheduler基本相当于是一个线程池,所以调用start方法之后会有一个线程来为该Actor服务。
使用receive模型。
- def receive[R](f: PartialFunction[Any, R]): R = {
 - assert(Actor.self == this, "receive from channel belonging to other actor")
 - this.synchronized {
 - if (shouldExit) exit() // links
 - val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
 - if (null eq qel) {
 - waitingFor = f.isDefinedAt
 - isSuspended = true
 - suspendActor()
 - } else {
 - received = Some(qel.msg)
 - sessions = qel.session :: sessions
 - }
 - waitingFor = waitingForNone
 - isSuspended = false
 - }
 - val result = f(received.get)
 - sessions = sessions.tail
 - result
 
如果当前mailbox里面没有可以处理的消息,调用suspendActor,该方法会调用wait;如果有消息,这调用PartialFunction进行处理。
使用react模型。
- def react(f: PartialFunction[Any, Unit]): Nothing = {
 - assert(Actor.self == this, "react on channel belonging to other actor")
 - this.synchronized {
 - if (shouldExit) exit() // links
 - val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
 - if (null eq qel) {
 - waitingFor = f.isDefinedAt
 - continuation = f
 - isDetached = true
 - } else {
 - sessions = List(qel.session)
 - scheduleActor(f, qel.msg)
 - }
 - throw new SuspendActorException
 - }
 
如果当前mailbox没有可以处理的消息,设置waitingFor和continuation,这两个变量会在接收到消息的时候使用;如果有消息,则调用scheduleActor,该方法会在线程池里选择一个新的线程来处理,具体的处理方法也是由PartialFunction决定。不管是哪条路径,react都会立即返回,或者说是立即抛出异常,结束该线程的执行,这样该线程就可以被其它Actor使用。
再来看看接收消息的处理代码。
- def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
 - if (waitingFor(msg)) {
 - received = Some(msg)
 - if (isSuspended)
 - sessions = replyTo :: sessions
 - else
 - sessions = List(replyTo)
 - waitingFor = waitingForNone
 - if (!onTimeout.isEmpty) {
 - onTimeout.get.cancel()
 - onTimeout = None
 - }
 - if (isSuspended)
 - resumeActor()
 - else // assert continuation != null
 - scheduler.execute(new Reaction(this, continuation, msg))
 - } else {
 - mailbox.append(msg, replyTo)
 - }
 
如果当前没有在等待消息或者接收到的消息不能处理,就丢到mailbox里去;相反,则进行消息的处理。这里对于receive模型和react模型就有了分支:如果isSuspended为true,表示是receive模型,并且线程在wait,就调用resumeActor,该方法会调用notify;否则就是react模型,同样在线程池里选择一个线程进行处理。
这样,相信大家对Scala Actor就有了一个基本的认识。
【相关阅读】
                网站题目:ScalaActor:多线程的基础学习
                
                标题URL:http://www.csdahua.cn/qtweb/news13/188513.html
            
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网