源码基于:Android U
0. 前言
最近在研究 Android 自带的系统数据指标采集功能,框架依旧很严谨、完美,这里做个分享。
1. Android S 之后变化
stats 的代码从 framework 或 system/core 中转移到了 packages/modules/StatsD 目录中。
2. 框架图
大的框架分两层:
-
system_sever;
-
statsd;
Java 层创建两个 SystemService:
-
StatsCompanion.Lifecycle:会启动两个Service 用以与 native daemon 通信;
-
StatsPullAtomService:SystemServer 中用来采集数据;
StatsPullAtomService 用以采集数据,并将数据填充到参数中,在StatsManager.PullAtomCallbackInternal 中会转换成 Parcel 格式传入到 native。
frameworks/base/services/core/java/com/android/server/stats/pull/StatsPullAtomService.javaprivate class StatsPullAtomCallbackImpl implements StatsManager.StatsPullAtomCallback {@Overridepublic int onPullAtom(int atomTag, List<StatsEvent> data) {...try {switch (atomTag) {...case FrameworkStatsLog.PROCESS_MEMORY_STATE:case FrameworkStatsLog.PROCESS_MEMORY_HIGH_WATER_MARK:case FrameworkStatsLog.PROCESS_MEMORY_SNAPSHOT:case FrameworkStatsLog.SYSTEM_ION_HEAP_SIZE:case FrameworkStatsLog.ION_HEAP_SIZE:case FrameworkStatsLog.PROCESS_SYSTEM_ION_HEAP_SIZE:case FrameworkStatsLog.PROCESS_DMABUF_MEMORY:case FrameworkStatsLog.SYSTEM_MEMORY:case FrameworkStatsLog.VMSTAT:...}} finally {}}
该函数是针对不同类型的 puller 的回调处理,在此之前会调用 registerPullers
函数将所有的puller 注册到 StatsManager 中:
private void registerProcessMemoryHighWaterMark() {int tagId = FrameworkStatsLog.PROCESS_MEMORY_HIGH_WATER_MARK;mStatsManager.setPullAtomCallback(tagId,null, // use default PullAtomMetadata valuesDIRECT_EXECUTOR,mStatsCallbackImpl);}
StatsCallbackPuller::PullInternal 中会回调StatsManager.PullAtomCallbackInternal 的onPullAtom
,该函数中会回调StatsCallbackPuller::PullInternal 中定义的PullResultReceiver
的pullFinished
函数,并将 StatsPullAtomService
端采集的数据存入StatsCallbackPuller::PullInternal
的入参中;
packages/modules/StatsD/framework/java/android/app/StatsManager.javapublic void onPullAtom(int atomTag, IPullAtomResultReceiver resultReceiver) {final long token = Binder.clearCallingIdentity();try {mExecutor.execute(() -> {List<StatsEvent> data = new ArrayList<>(); //上层的采集数据存入dataint successInt = mCallback.onPullAtom(atomTag, data); //callback,开始采集boolean success = successInt == PULL_SUCCESS;StatsEventParcel[] parcels = new StatsEventParcel[data.size()];for (int i = 0; i < data.size(); i++) {parcels[i] = new StatsEventParcel();parcels[i].buffer = data.get(i).getBytes(); //转换成parcel,传入native}try {resultReceiver.pullFinished(atomTag, success, parcels); //receiver回调} catch (RemoteException e) {...}});} finally {Binder.restoreCallingIdentity(token);}}
packages/modules/StatsD/statsd/src/external/StatsCallbackPuller.cppPullErrorCode StatsCallbackPuller::PullInternal(vector<shared_ptr<LogEvent>>* data) {...shared_ptr<vector<shared_ptr<LogEvent>>> sharedData =make_shared<vector<shared_ptr<LogEvent>>>();shared_ptr<PullResultReceiver> resultReceiver = SharedRefBase::make<PullResultReceiver>([cv_mutex, cv, pullFinish, pullSuccess, sharedData]( //receiver回调int32_t atomTag, bool success, const vector<StatsEventParcel>& output) {{lock_guard<mutex> lk(*cv_mutex);for (const StatsEventParcel& parcel: output) {shared_ptr<LogEvent> event = make_shared<LogEvent>(/*uid=*/-1, /*pid=*/-1);bool valid = event->parseBuffer((uint8_t*)parcel.buffer.data(),parcel.buffer.size());if (valid) {sharedData->push_back(event); //解析上层采集数据,存入sharedData} else {StatsdStats::getInstance().noteAtomError(event->GetTagId(),/*pull=*/true);}}*pullSuccess = success;*pullFinish = true;}cv->notify_one();});// Initiate the pull. This is a oneway call to a different process, except// in unit tests. In process calls are not oneway.Status status = mCallback->onPullAtom(mTagId, resultReceiver); //callback,等待receiver回调...{unique_lock<mutex> unique_lk(*cv_mutex);// Wait until the pull finishes, or until the pull timeout.cv->wait_for(unique_lk, chrono::nanoseconds(mPullTimeoutNs),[pullFinish] { return *pullFinish; });if (!*pullFinish) {...} else {if (*pullSuccess) {*data = std::move(*sharedData); //数据最终填充到入参data中}...}
}
StatsPullerManager::OnAlarmFired 最终会调用 StatsCallbackPuller::PullInternal
,参数也是这里传入的。所以采集的数据最终会回到 StatsPullerManager::OnAlarmFired
中,最终会将这些数据通过注册在StatsPullerManager中的 mReceivers
的回调函数 onDataPulled
处理。
packages/modules/StatsD/statsd/src/external/StatsPullerManager.cppvoid StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {std::lock_guard<std::mutex> _l(mLock);int64_t wallClockNs = getWallClockNs();int64_t minNextPullTimeNs = NO_ALARM_UPDATE;vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;for (auto& pair : mReceivers) { //最终数据的处理都是在mReceivers中vector<ReceiverInfo*> receivers;if (pair.second.size() != 0) {for (ReceiverInfo& receiverInfo : pair.second) {sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();const bool pullNecessary = receiverPtr != nullptr && receiverPtr->isPullNeeded();if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && pullNecessary) {receivers.push_back(&receiverInfo);} else {if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /receiverInfo.intervalNs;receiverInfo.nextPullTimeNs +=(numBucketsAhead + 1) * receiverInfo.intervalNs;}minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);}}if (receivers.size() > 0) {needToPull.push_back(make_pair(&pair.first, receivers));}}}for (const auto& pullInfo : needToPull) {vector<shared_ptr<LogEvent>> data; //采集的数据根本是存在这里的临时变量PullResult pullResult =PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data) //调用PullLocked 将数据采集回来? PullResult::PULL_RESULT_SUCCESS: PullResult::PULL_RESULT_FAIL;if (pullResult == PullResult::PULL_RESULT_FAIL) {VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);}// Convention is to mark pull atom timestamp at request time.// If we pull at t0, puller starts at t1, finishes at t2, and send back// at t3, we mark t0 as its timestamp, which should correspond to its// triggering event, such as condition change at t0.// Here the triggering event is alarm fired from AlarmManager.// In ValueMetricProducer and GaugeMetricProducer we do same thing// when pull on condition change, etc.for (auto& event : data) {event->setElapsedTimestampNs(elapsedTimeNs);event->setLogdWallClockTimestampNs(wallClockNs);}for (const auto& receiverInfo : pullInfo.second) {sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();if (receiverPtr != nullptr) {receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs); //发送给receiver处理// We may have just come out of a coma, compute next pull time.int numBucketsAhead =(elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);} else {VLOG("receiver already gone.");}}}VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,(long long)minNextPullTimeNs);mNextPullTimeNs = minNextPullTimeNs; //alarm时间点更新updateAlarmLocked(); //调用updateAlarmLocked 告知Java service 中的AlarmManagerService
}