由于AKka的核心是Actor,而Actor是按照Actor模型进行实现的,所以在使用Akka之前,有必要弄清楚什么是Actor模型。
Actor模型最早是1973年Carl Hewitt、Peter Bishop和Richard Seiger的论文中出现的,受物理学中的广义相对论(general relativity)和量子力学(quantum mechanics)所启发,为解决并发计算的一个数学模型。
Actor模型所推崇的哲学是”一切皆是Actor“,这与面向对象编程的”一切皆是对象“类似。但不同的是,在模型中,Actor是一个运算实体,它遵循以下规则: 接受外部消息,不占用调用方(消息发送者)的CPU时间片 通过消息改变自身的状态 创建有限数量的新Actor 发送有限数量的消息给其他Actor 很多语言都实现了Actor模型,而其中最出名的实现要属Erlang的。Akka的实现借鉴了不少Erlang的经验。
< dependency> < groupId> com. typesafe. akka< / groupId> < artifactId> akka- actor_2. 11 < / artifactId> < version> 2.4 .7 < / version>
< / dependency>
tell 发送一个消息到目标Actor后立刻返回
public class C extends AbstractActor { @Override public Receive createReceive ( ) { return receiveBuilder ( ) . match ( Object . class , obj-> { if ( obj instanceof String ) { System . out. println ( "C: D你回复给我的消息我收到了!" ) ; return ; } SomeOne someOne = ( SomeOne ) obj; System . out. println ( "C: C接收到消息:" + someOne. toString ( ) ) ; ActorRef actorRef = this . getContext ( ) . actorOf ( Props . create ( D . class , D :: new ) ) ; actorRef. tell ( someOne, self ( ) ) ; } ) . build ( ) ; } public static void main ( String [ ] args) { ActorSystem ok = ActorSystem . create ( "ok" ) ; ActorRef actorRef = ok. actorOf ( Props . create ( C . class , C :: new ) ) ; Scanner sc = new Scanner ( System . in) ; System . out. print ( "请输入:" ) ; String s = sc. nextLine ( ) ; actorRef. tell ( new SomeOne ( 1 , s, 0 ) , ActorRef . noSender ( ) ) ; }
} public class D extends AbstractActor { @Override public Receive createReceive ( ) { return receiveBuilder ( ) . match ( Object . class , obj-> { SomeOne someOne = ( SomeOne ) obj; System . out. println ( "D: D接收到C 传过来的消息:" + someOne. toString ( ) ) ; Thread . sleep ( 2000 ) ; sender ( ) . tell ( "D: 我再把消息发给你C" , self ( ) ) ; } ) . build ( ) ; }
}
注意:
ActorSystem 是一个较重的存在,一般一个应用里,只需要一个ActorSystem 。
在同一个ActorySystem 中,Actor 不能重名。
ask 发送一个消息到目标Actor,并返回一个Future对象,可以通过该对象获取结果。但前提是目标Actor会有Reply才行,如果没有Reply,则抛出超时异常
public class A extends AbstractActor { @Override public Receive createReceive ( ) { return receiveBuilder ( ) . match ( Object . class , obj -> { if ( obj instanceof SomeOne ) { SomeOne someOne = ( SomeOne ) obj; System . out. println ( " A 收到 SomeOne 对象:" + someOne. toString ( ) ) ; someOne. setAge ( someOne. getAge ( ) + 1 ) ; Thread . sleep ( 1000 ) ; this . getSender ( ) . tell ( "xxx" , getSelf ( ) ) ; } } ) . build ( ) ; } ## Await 同步阻塞等待结果public static void main ( String [ ] args) { ActorSystem test = ActorSystem . create ( "test" ) ; ActorRef actorRefA = test. actorOf ( Props . create ( A . class , A :: new ) ) ; SomeOne someOne = new SomeOne ( 1 , "哈哈哈ok" , 10 ) ; Timeout timeout = new Timeout ( Duration . create ( 2 , TimeUnit . SECONDS ) ) ; Future < Object > future = Patterns . ask ( actorRefA, someOne, timeout) ; try { String reply = ( String ) Await . result ( future, timeout. duration ( ) ) ; System . out. println ( "回复的消息: " + reply) ; } catch ( Exception e) { e. printStackTrace ( ) ; } }
public class A extends AbstractActor { @Override public Receive createReceive ( ) { return receiveBuilder ( ) . match ( Object . class , obj -> { if ( obj instanceof SomeOne ) { SomeOne someOne = ( SomeOne ) obj; System . out. println ( " A 收到 SomeOne 对象:" + someOne. toString ( ) ) ; someOne. setAge ( someOne. getAge ( ) + 1 ) ; Thread . sleep ( 1000 ) ; this . getSender ( ) . tell ( "xxx" , getSelf ( ) ) ; } } ) . build ( ) ; } ## future 异步等待结果。
public static void main ( String [ ] args) { ActorSystem test = ActorSystem . create ( "test" ) ; ActorRef actorRefA = test. actorOf ( Props . create ( A . class , A :: new ) ) ; SomeOne someOne = new SomeOne ( 1 , "哈哈哈ok" , 10 ) ; Timeout timeout = new Timeout ( Duration . create ( 2 , TimeUnit . SECONDS ) ) ; Future < Object > future = Patterns . ask ( actorRefA, someOne, timeout) ; future. onComplete ( new OnComplete < Object > ( ) { @Override public void onComplete ( Throwable throwable, Object o) throws Throwable { if ( throwable != null ) { System . out. println ( "返回结果异常:" + throwable. getMessage ( ) ) ; } else { System . out. println ( "返回消息:" + o) ; } } } , test. dispatcher ( ) ) ; future. onSuccess ( new OnSuccess < Object > ( ) { @Override public void onSuccess ( Object msg) throws Throwable { System . out. println ( "回复的消息:" + msg) ; } } , test. dispatcher ( ) ) ; future. onFailure ( new OnFailure ( ) { @Override public void onFailure ( Throwable throwable) throws Throwable { if ( throwable instanceof TimeoutException ) { System . out. println ( "服务超时" ) ; } else { System . out. println ( "未知错误" ) ; } } } , test. dispatcher ( ) ) ; }
tell 前置后置处理,销毁线程 的例子
public class MessageSendAndAccept extends AbstractActor { @Override public void preStart ( ) { System . out. println ( "--------- 接收到消息 start" ) ; } @Override public void postStop ( ) { System . out. println ( "--------- 消息处理完毕 end" ) ; } @Override public Receive createReceive ( ) { return receiveBuilder ( ) . match ( String . class , result -> { consoleLog ( result) ; } ) . build ( ) ; } public void consoleLog ( String log) { System . out. println ( "接收到内容:" + log) ; getContext ( ) . stop ( self ( ) ) ; } public static void main ( String [ ] args) { ActorSystem actorSystem = ActorSystem . create ( "demo" ) ; ActorRef my_actor = actorSystem. actorOf ( Props . create ( MessageSendAndAccept . class ) , "my_actor" ) ; my_actor. tell ( "哈哈哈a" , ActorRef . noSender ( ) ) ; }
}
并发 执行方法 例子
创建多个actor 同时执行就好了
public class G extends AbstractActor { @Override public Receive createReceive ( ) { return receiveBuilder ( ) . match ( Object . class , obj-> { if ( obj instanceof String ) { System . out. println ( obj + ",time=" + new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ) . format ( new Date ( ) ) + "--- Thread ---" + Thread . currentThread ( ) . getName ( ) ) ; Thread . sleep ( 3000L ) ; System . out. println ( Thread . currentThread ( ) . getName ( ) + "---END" ) ; return ; } } ) . build ( ) ; } public static void main ( String [ ] args) { ActorSystem ok = ActorSystem . create ( "ok" ) ; ActorRef actorRef_0 = ok. actorOf ( Props . create ( G . class , G :: new ) ) ; actorRef_0. tell ( "a" , ActorRef . noSender ( ) ) ; ActorRef actorRef_1 = ok. actorOf ( Props . create ( G . class , G :: new ) ) ; actorRef_1. tell ( "b" , ActorRef . noSender ( ) ) ; ActorRef actorRef_2 = ok. actorOf ( Props . create ( G . class , G :: new ) ) ; actorRef_2. tell ( "c" , ActorRef . noSender ( ) ) ; }
}