要了解 connection 和 session 的概念,可以先从 ConnectionState 和 SessionState 入手:
// 省略部分代码 public class ConnectionState {ConnectionInfo info;private final ConcurrentHashMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>();private final ConcurrentHashMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>();private final List<DestinationInfo> tempDestinations = Collections.synchronizedList(new ArrayList<DestinationInfo>());private final AtomicBoolean shutdown = new AtomicBoolean(false);private boolean connectionInterruptProcessingComplete = true;private HashMap<ConsumerId, ConsumerInfo> recoveringPullConsumers;public ConnectionState(ConnectionInfo info) {this.info = info;// Add the default session id.addSession(new SessionInfo(info, -1));} }
从代码可以看出,连接里有事务集合、会话集合、临时队列集合等,这说明:
1. 事务属于一个连接; 2. 会话属于一个连接; 3. 临时队列的生存期是连接的有效期
// 省略部分代码 public class SessionState {final SessionInfo info;private final Map<ProducerId, ProducerState> producers = new ConcurrentHashMap<ProducerId, ProducerState>();private final Map<ConsumerId, ConsumerState> consumers = new ConcurrentHashMap<ConsumerId, ConsumerState>();private final AtomicBoolean shutdown = new AtomicBoolean(false);public SessionState(SessionInfo info) {this.info = info;} }
从上面能看出,producer 和 consumer 是属于某个会话的,producer 和 consumer 都有唯一的 ID 。
// 省略部分代码 public class ProducerState {final ProducerInfo info;private TransactionState transactionState; }public class ConsumerState { final ConsumerInfo info; }
ProducerState 和 ConsumerState 只是做了简单的封装。
其中 ConnectionInfo, SessionInfo, ProducerInfo, ConsumerInfo 都是消息类型,均继承自 BaseCommand 接口。