之前的文章已经介绍过Publisher
和Subscriber
,对于概念类的东西这里就不多介绍了,在介绍Publisher
和Subscriber
的交互流程之前,先补充一下前面没有提到过的Subscription
。
Subscription
Subscription
是一个协议,实现该协议的对象负责将订阅者链接到发布者。只要它在内存中,订阅者就会继续接收值。它只包含一个方法:
public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {/// Tells a publisher that it may send more values to the subscriber.func request(_ demand: Subscribers.Demand)
}
当订阅者在Publisher
中接收到subscription
对象后,便开始调用request
方法,demand
参数决定了订阅者要从发布者那里获取多少个值。
demand
参数有几个可选的参数值:
none
:表示订阅者一个值都不会收到。max(value)
: 表示订阅者要接收value个值。unlimited
:表示订阅者要接受无限个值。
Subscription
的实例对象中包含了一个Subscriber
的引用,以使其保持最新状态。
Subscription
协议有继承了Cancellable
协议,所以有了cancel
方法,而在自定义Subscription
的时候,request
和cancel
方法都是必须实现的。
Publisher和Subscriber的交互流程
介绍完了Subscription
协议,现在看看Publisher
和Subscriber
是如何建立的联系。
- 由
Publisher
调用subscribe(_:)
方法开启链接申请,同时参数传入Subscriber
实例对象。 - 在第一步调用
subscribe(_:)
方法后,即触发Publisher
内部调用receive(subscriber:)
方法,在该方法中创建一个连接Publisher
和Subscriber
的Subscription
对象,然后调用Subscriber
的receive(subscription:)
方法,将Subscription
对象传给Subscriber
。 - 在
Subscriber
的receive(subscription:)
方法中,使用传进来的subscription
对象调用request
方法,并设置Subscriber
的请求次数。 - 在
Subscription
的request
方法中,知道了Subscriber
的请求次数,经过相关的逻辑处理后,在此方法中给Subscriber
发送数据。 - 通过
Subscriber
的receive(_:)
方法向Subscriber
发送数据。 - 通过
Subscriber
的receive(completion:)
方法向Subscriber
发送结束或者失败信息。
因为Subscription
是起了一个桥梁的作用,属于幕后,所以上面第5条、第6条从语义上来说相当于Publisher
通过receive(_:)
方法或receive(completion:)
方法向Subscriber
发送数据或者结束信息。实际上Subscription
替Publisher
做了向下游发送数据的事情。
自定义Subscriber
首先看一下Subscriber
协议的定义:
public protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible {associatedtype Inputassociatedtype Failure : Errorfunc receive(subscription: any Subscription)func receive(_ input: Self.Input) -> Subscribers.Demandfunc receive(completion: Subscribers.Completion<Self.Failure>)
}
协议中有两个类型,三个方法。自定义的Subscriber
需要使用class
定义,而非struct
,否则会报错,另外struct
是值类型,Subscription
没有持有最初的那个Subscriber
对象。
// 自定义Subscriber
class CustomSubscriber: Subscriber {// 确定输入类型,需要和Publisher的输出类型一致。typealias Input = Int// 确定失败类型,需要和Publisher的失败类型一致,永远不会失败就定义为Never。typealias Failure = Never/** 交互流程中第3步* 接收subscription对象的方法。* 方法内subscription对象调用request方法,设置请求次数。*/func receive(subscription: any Subscription) {debugPrint("CustomSubscriber subscription.request")subscription.request(.max(5))}/** 交互流程中第5步* 接收Publisher发送数据的方法。* 该方法返回`Subscribers.Demand`,用于在request方法中计算请求次数。*/func receive(_ input: Int) -> Subscribers.Demand {print("New value \(input)")return .none}/** 交互流程中第6步* 接收Publisher发送结束的方法,或者正常结束,或者失败。*/func receive(completion: Subscribers.Completion<Never>) {print("Completion: \(completion)")}
}
自定义Publisher
在自定义Publisher
前,再看一下Publisher
协议的定义:
public protocol Publisher<Output, Failure> {associatedtype Outputassociatedtype Failure : Errorfunc receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}
自定义的Publisher
需要继承这个协议,比如:
// 自定义Publisher
class CustomPublisher: Publisher {// 确定输出类型,需要和Subscriber的输入类型一致。typealias Output = Int// 确定失败类型,需要和Subscriber的失败类型一致,永远不会失败就定义为Never。typealias Failure = Never/** 交互流程中第2步* 接收subscriber对象的方法。方法传入Subscriber实例对象,开始建立联系。* 方法内创建Subscription对象,然后调用Subscriber的receive(subscription:)方法,将Subscription对象传给Subscriber。*/func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {// 创建Subscription对象let subscription = CustomSubscription(subscriber: subscriber)debugPrint("CustomPublisher subscriber.receive")// 将Subscription对象传给Subscribersubscriber.receive(subscription: subscription)}
}
自定义Subscription
先看一下Subscription
协议:
public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {/// Tells a publisher that it may send more values to the subscriber.func request(_ demand: Subscribers.Demand)
}
该协议中规定了要实现request
方法,因为继承了Cancellable
,所以还需要实现一个cancel
方法。
public protocol Cancellable {func cancel()
}
同Subscriber
一样,自定义的Subscription
需要使用class
定义,而非struct
,否则会报错,另外创建的Subscription
实例对象需要在内存中保持,否则订阅就失效了。
下面是自定义Subscription
:
// 自定义Subscription
class CustomSubscription<S: Subscriber>: Subscription where S.Input == Int, S.Failure == Never {// 持有传入进来的Subscriber对象。private var subscriber: Sprivate var counter = 0private var isCompleted = false// 初始化的时候将Subscriber对象传入进来,并持有,待后续发送数据使用。init(subscriber: S) {self.subscriber = subscriber}/** 交互流程中第4步* 该方法传入请求数据的次数,并给Subscriber发送数据。*/func request(_ demand: Subscribers.Demand) {debugPrint("CustomSubscription request")guard !isCompleted else { return }for _ in 0..<(demand.max ?? 10) {_ = subscriber.receive(counter) // 给Subscriber发送数据counter += 1}if counter >= 5 {subscriber.receive(completion: .finished) // 通知Subscriber结束。isCompleted = true}}// 该方法中执行一些取消订阅的操作。func cancel() {isCompleted = true}
}
如何使用
定义完了上面的,现在看看怎么使用吧。还是依托SwiftUI
的界面,我们在对应的ViewModel
中添加方法,使用上面自定义的类。
首先定义一个ViewModel
。
class CustomCombineViewModel: ObservableObject {var subscription: AnyCancellable?func testMethod1() {// 创建自定义的Publisherlet publisher = CustomPublisher()// 创建自定义的Subscriberlet subscriber = CustomSubscriber()debugPrint("Begin subscribe")/** 交互流程中第1步,申请订阅。* 由Publisher对象调用subscribe方法,传入Subscriber对象开始。*/publisher.subscribe(subscriber)}func testMethod2() {// 创建自定义的Publisherlet publisher = CustomPublisher()// 通过sink方法申请订阅,并将创建的subscription持有,否则订阅失败,sink方法返回的时AnyCancellable,这里做了类型抹除。subscription = publisher.sink { completion inprint("sink completion: \(completion)")} receiveValue: { value inprint("sink new value \(value)")}}
}
在上面代码中的testMethod1
方法中,分别创建了Publisher
和Subscriber
,并用Publisher
对象调用subscribe
方法开启订阅,这也是订阅的开启入口。
当执行testMethod1
时候,输出打印:
"Begin subscribe"
"CustomPublisher subscriber.receive"
"CustomSubscriber subscription.request"
"CustomSubscription request"
New value 0
New value 1
New value 2
New value 3
New value 4
Completion: finished
上面的输出也反应了从开始订阅到发送数据结束的过程。打印了5个数据是应为我们在Subscriber
类中调用request
方法的时候传入了.max(5)
,最多发送5个数据。
再看一下第二个方法testMethod2()
,这个方法中没有明确的Publisher
调用subscribe
方法呢?
Subscribers
有两个内置的Subscriber
,分别为Subscribers.Sink
和Subscribers.Assign
。当调用sink
或者assign
方法的时候,就开启了订阅流程。
当执行testMethod2
时候,输出打印:
"CustomPublisher subscriber.receive"
"CustomSubscription request"
sink new value 0
sink new value 1
sink new value 2
sink new value 3
sink new value 4
sink new value 5
sink new value 6
sink new value 7
sink new value 8
sink new value 9
sink completion: finished
因为sink
请求的是无限次数数据,所以将我们在Subscription
中的数据都打印出来了。
Subscribers.Sink
Sink
创建的时候会立即调用 Subscription
对象的 request(.unlimited)
。
Publisher
有两个 sink
扩展方法:
sink(receiveCompletion:receiveValue:)
sink(receiveValue:)
Subscribers.Assign
Assign
会将接收到的值赋值给一个类对象的属性或者一个另一个 @Published
publisher 上,它对 publisher 的 demand
也是 .unlimited
。
Publisher
有两个 assign
扩展方法:
assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root)
assign(to published: inout Published<Self.Output>.Publisher)
写在最后
现在我们完全理解了Combine
订阅交互流程,是不是对Combine
框架有了进一步的认识呢?
在实际开发过程中,不建议我们自己去实现Publisher
,Subscriber
和Subscription
,因为一个逻辑错误可能会破坏发布者和订阅者之间的所有连接,这可能会导致意想不到的结果。
最后,希望能够帮助到有需要的朋友,如果觉得有帮助,还望点个赞,添加个关注,笔者也会不断地努力,写出更多更好用的文章。