在 Swift
的 Combine
框架中,Scheduler
是一个重要的概念,用于控制任务的调度和执行。本文将详细介绍 Scheduler
的作用、常见的 Scheduler
类型以及如何使用 Scheduler
来管理任务的执行。
Scheduler 的定义
Scheduler
用于管理任务的调度和执行,可以控制任务何时执行、在哪个线程或队列上执行以及执行的顺序。
你可以使用调度器来尽可能快地执行代码,或者在将来的某个时间执行。不同的调度器实现会根据自己的需要使用不同的时间保持系统。调度器将此表示为 SchedulerTimeType。由于该类型符合 SchedulerTimeIntervalConvertible,因此您可以始终使用 .milliseconds(500) 等方便函数来表示这些时间。调度器可以接受选项来控制它们执行传递给它们的操作的方式。这些选项可能控制哪些线程或分发队列执行操作等因素。
常见的 Scheduler 类型
常见的 Scheduler
类型包括一下几种:
DispatchQueue
: GCD(Grand Central Dispatch)中的调度器,用于在特定的调度队列上执行任务:串行、并发、主线程和全局队列。通常情况下,将后台任务分配到串行和全局队列上,而将与用户界面相关的任务分配到主线程队列上。RunLoop
:用于处理事件循环的机制,可以在特定的RunLoop
上执行任务,如主线程的RunLoop
。OperationQueue
:在特定的操作队列中执行任务。与DispatchQueue
类似,使用OperationQueue.main
进行UI操作,并使用其他队列进行后台操作。ImmediateScheduler
: 是一个立即执行任务的调度器,通常用于测试和调试目的。
使用RunLoop.main
, DispatchQueue.main
和 OperationQueue.main
都是和UI操作相关的,三者之间基本没什么区别。
默认Scheduler 类型
如果没有指定Scheduler
,那么系统会默认给添加一个,而这个Scheduler
的类型和Publisher
发送数据时的类型一致,比如主线程发送的数据,那么接收也在主线程。
class SchedulerViewModel: ObservableObject {let publisher = PassthroughSubject<String, Never>()private var cancellable = Set<AnyCancellable>()init() {setUpSubscription()}func setUpSubscription() {publisher.sink { value inprint("\(value) is on main thread: \(Thread.isMainThread)")}.store(in: &cancellable)}func sendMessage() {// on main threadpublisher.send("Text 1")// on other threadDispatchQueue.global().async { [weak self] inself?.publisher.send("Text 2")}}
}
上面的代码中,在sendMessage()
方法中直接调用了两次send
方法,第一个在主线程中调用,第二个在异步线程调用。sink
的闭包中输出结果为:
Text 1 is on main thread: true
Text 2 is on main thread: false
Scheduler的使用
Combine
框架提供了两个基本的操作符来使用调度器:
subscribe(on:)
和subscribe(on:options:)
在指定的Scheduler
上创建订阅(开始工作)。receive(on:)
和receive(on:options:)
在指定的Scheduler
上传递值。
subscribe(on:)
subscribe(on:)
会将当前订阅设置在你希望管理的调度器上。该操作符用于创建订阅、取消订阅和请求输入时使用的调度器。
从创建订阅到最后订阅者接收到数据的整个过程可以认为时一个管道,数据从上游流向下游。subscribe(on:)
会将调度器设置为订阅管道上游所使用的调度器。同时有个副作用是,subscribe(on:)
还会更改其下游调度器。
这里面需要说明一下什么时管道上游和管道下游,在stackoverflow中有人这么解释:
上游:
- The actual performance of the subscription (receive subscription)
- Requests from a subscriber to the upstream publisher asking for a new value
- Cancel messages (these percolate upwards from the final subscriber)
上游大概指的就是在创建订阅,建立连接,请求数据和取消订阅的过程。
下游:
- Values
- Completions, consisting of either a failure (error) or completion-in-good-order (reporting that the publisher emitted its last value)
下游大概指的是Publisher
发送数据一直到订阅者收到数据的过程。
当我们创建了Publisher
,然后添加operator
,最后sink
或者assign
的整个过程都应该算是下游。
subscribe(on:)
大多情况下影响的是管道上游,比如我们自定义一个Publisher
,起名MyPublisher
。
extension Publishers {struct MyPublisher: Publisher {typealias Output = Inttypealias Failure = Neverfunc receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, Int == S.Input {debugPrint("receive: \(Thread.isMainThread)")subscriber.receive(subscription: Subscriptions.empty)_ = subscriber.receive(666)}}
}
在MyPublisher
中,定义了输出类型和失败类型,另外还必须要实现receive(subscriber:)
方法。
receive(subscriber:)
方法用于接受订阅者,并创建一个订阅对象,然后将该订阅对象提供给订阅者,以建立发布者和订阅者之间的连接。那么该方法内做的事情都属于管道上游的事情。我们在方法中加上一个打印。并且直接想订阅者发送了一个数据666.
Publishers.MyPublisher().map { _ inprint("Map: \(Thread.isMainThread)")}.sink { _ inprint("Sink: \(Thread.isMainThread)")}.store(in: &cancellable)
当调用上面代码的时候,打印输出:
"receive: true"
Map: true
Sink: true
因为没有调用subscribe(on:)
设置,所有默认都是在主线程执行的。
下面加上subscribe(on:)
方法,并设置为异步线程。
Publishers.MyPublisher().map { _ inprint("Map: \(Thread.isMainThread)")}.subscribe(on: DispatchQueue.global()) // 设置为异步线程。.sink { _ inprint("Sink: \(Thread.isMainThread)")}.store(in: &cancellable)
执行后打印结果为:
"receive: false"
Map: false
Sink: false
看结果都是在异步线程执行的,.subscribe(on: DispatchQueue.global())
这段代码即是放在map
操作符前面结果也是一样的。
subscribe(on:)
就像上面说的不仅能影响管道上游的执行线程,也能影响管道下游的执行线程。
另外说一点,像是Just
,Sequence
这类的Publisher
的管道上游就是在主线程执行的,而像URLSession.DataTaskPublisher
就是在异步线程执行的。
如果sink
闭包中要处理UI相关的东西,那就不能让subscribe(on:)
影响到,除了在sink
方法中手动切换到主线程,还有就是要用到receive(on:)
方法了。
receive(on:)
针对Publisher
使用subscribe(on:)
并不多见,除非你的需求复杂一点,更多的时候我们使用receive(on:)
方法,它允许你指定应该使用哪个调度程序向订阅者传递值。
说的直白一些就是在整个Publisher
链中,在receive(on:)
方法后面执行的操作都在receive(on:)
方法决定的线程中执行。
比如刚才的代码中,在subscribe(on:)
方法后面加上了receive(on:)
方法。
Publishers.MyPublisher().map { _ inprint("Map: \(Thread.isMainThread)")}.subscribe(on: DispatchQueue.global()).receive(on: DispatchQueue.main).sink { _ inprint("Sink: \(Thread.isMainThread)")}.store(in: &cancellable)
执行打印结果:
"receive: false"
Map: false
Sink: true
可以看到sink
方法实在主线程中调用的。
在整个Publisher链中,如果使用了subscribe(on:)
方法,那么最好在后面也要使用receive(on:)
方法。
我们知道URLSession.DataTaskPublisher
时异步执行的,数据返回时通过一系列的转型操作等,最终还是要走到sink
方法里,那么就可以在sink
之前加上receive(on:)
方法,并传入主线程参数。
下面在看一个只有receive(on:)
方法的示例。
Publishers.MyPublisher().map { _ inprint("Map: \(Thread.isMainThread)")}.receive(on: DispatchQueue.global()).sink { _ inprint("Sink: \(Thread.isMainThread)")}.store(in: &cancellable)
在map后加上了receive(on:)
方法,并传入异步线程参数。打印结果如下:
"receive: true"
Map: true
Sink: false
在receive(on:)
方法方法之前都是在主线程执行,receive(on:)
方法之后就是异步线程执行了。
如果加一个receive(on:)
方法不过瘾,再看看加两个的。
Publishers.MyPublisher().receive(on: DispatchQueue.global()) // 1 异步线程.map { _ inprint("Map: \(Thread.isMainThread)")}.receive(on: DispatchQueue.main) // 2 主线程.sink { _ inprint("Sink: \(Thread.isMainThread)")}.store(in: &cancellable)
先看打印结果:
"receive: true"
Map: false
Sink: true
管道上游肯定是在主线程了,然后调用了.receive(on: DispatchQueue.global())
,之后的map
操作就是异步线程执行了,然后又调用了.receive(on: DispatchQueue.main)
,回到了主线程,所以最后的sink
就在主线程执行了。
在开发过程中使用receive(on:)
方法的频率会多余subscribe(on:)
方法。
写在最后
Scheduler
在 Combine
中扮演着重要的角色,用于控制任务的调度和执行。通过指定不同的 Scheduler
,可以控制任务在不同的线程或队列上执行,确保任务按照预期顺序执行。使用 Scheduler
可以帮助我们更好地管理任务的执行,提高代码的可读性和性能。
最后,希望能够帮助到有需要的朋友,如果觉得有帮助,还望点个赞,添加个关注,笔者也会不断地努力,写出更多更好用的文章。