使用spawn创建一个新进程,其第一个参数是模块名、第二个参数是函数名、第三个参数是参数列表。spawn会返回一个进程标识符,通常叫做PID。
defmodule Spawn1 dodef greet doreceive do{sender, msg} ->send sender, { :ok, "Hello #{msg}" }# codeendend endspawn(SpawnBasic, :greet, []) #"Hello"
进程间发送消息
使用send发送消息,第一个参数是接收方pid、第二个参数是要发送的消息,通常是原子或者元组。使用receive等待消息,它的用法比较像case。
#以下代码都在同一个文件中。
defmodule Spawn1 dodef greet doreceive do{sender, msg} ->send sender, { :ok, "Hello #{msg}" }# codeendend endpid = spawn(Spawn1, :greet, []) send pid, {self, "World!"}receive do{:ok, message} ->IO.puts message end
上述代码如果想要发送第二条数据,就会导致iex被挂起。因为greet函数在处理完receive后,就退出了。我们永远等不来receive的相应。我们可以使用 after 指定receive在规定时间内未到达就超时。
receive do{:ok, message} ->IO.puts messageafter 500 -> #receive在等待500毫秒后,消息未到达就会退出IO.puts "The greeter han gone away" end
我们可以使用递归来处理多条消息。greet:
def greet doreceive do{sender, msg} ->send, sender, { :ok, "Hello, #{msg}" }greetendend endpid = spawn(Spawn1, :greet, []) send pid, {self, "World!"}receive do{:ok, message} ->IO.puts message # "Hello, World!" endpid = spawn(Spawn1, :greet, []) send pid, {self, "kermit!"}receive do{:ok, message} ->IO.puts message # "Hello, Kermit!"after 500 ->IO.puts "The greeter has gone away" end
进程开销
很小。
进程调用exit(:boom)会以状态码99退出。
关联两个进程,使用spawn_link会创建一个进程并和调用者相关联。
defmodule Link2 do import :timer, only: [ sleep: 1 ]def sad_function dosleep 500exit(:boom)enddef run dospawn_link(Link2, :sad_function, [])receive domsg ->IO.puts "<MESSAGE RECEIVED: #{inspect msg}"after 1000 ->IO.puts "Nothing ... "endend end
子进程结束,然后它会结束整个应用,这是关联进程的默认行为,当其中一个进程非正常退出时会把另一个进程也杀死。
设置Peocess.flag(:trap_exit, true),可以捕获进程退出时的消息。
创建进程时,可以使用spawn_monitor开启监控(创建 + 监控),也可以使用Process.monitor监控已经存在的进程。当使用Process.monitor时,在调用监控完成前被监控进程死了,就不会受到通知。然而,spawn_link和spawn_monitor操作符合原子性,所以总能捕获到错误。
并行map
普通map返回列表,该列表是某个收集的每个元素应用于某个函数的结果。并行版本做同样的事情,但每个元素在独立的进程里应用于函数。
defmodule Parallel dodef pmap(collection, fun) dome = selfcollection|> Enum.map(fn (elem) -> #双重嵌套函数spawn_link fn -> (send me, { self, fun.(elem) } ) end #返回一组PID,每个PID内都运行了一个计算函数,将自身PID和结果发送给主进程end)|> Enum.map(fn (pid) -> #在主进程中等待结果receive do {^pid, resuet } -> result endend)end end
斐波拉契数服务器
编写一个服务程序来计算斐波拉契数。当计算器准备好接受下一个数字时,它会发送 :ready 消息给调度器,如果仍有任务为完成,调度器会发送 :fib 消息给计算器;否则发送shutdown 给计算器。当计算器接受到 :fib 消息就计算给定的斐波拉契数,并以 :answer 返回结果。如果收到shutdown就退出。
defmodule FibSolver do #计算模块def fib(scheduler) dosend scheduler, {:ready, self}receive do{ :fib, n, client} ->send client, { :answer, n, fib_calc(n), self}fib(scheduler){ :shutdown } ->exit(:normal)endenddefp fib_calc(0) do0enddefp fib_calc(1) do1enddefp fib_calc(n) dofib_calc(n - 1) + fib_calc(n - 2)end enddefmodule Scheduler do #调度模块def run(num_process, module, func, to_calculate) do(1..num_process)|> Enum.map(fn (_) -> spawn(module, func, [self]) end)|> scheduler_process(to_calculate, [])enddefp scheduler_process(processes, queue, results) doreceive do{:ready, pid} when length(queue) > 0 ->[next | tail] = queuesend pid, {:fib, next, self}scheduler_process(processes, tail, results){:ready, pid} ->send pid, {:shutdown}if length(processes) > 1 doscheduler_process(List.delete(processes, pid), queue, results)elseEnum.sort(results, fn {n1, _}, {n2, _} -> n1 <= n2 end)end{:answer, number, result, _pid}->scheduler_process(processes, queue, [ {number,result} | results])endend endto_process = [ 37, 37, 37, 37, 37, 37] #外部调用模块Enum.each 1..10, fn num_processes ->{time, result} = :timer.tc(Scheduler, :run, [num_processes, FibSolver, :fib, to_process])if num_processes == 1 doIO.puts inspect resultIO.puts "\n # time (s)"end:io.format "~2B ~.2f~n", [num_processes, time/1000000.0] end