CSI-external-provisioner

main()

这段Go代码是一个CSI(容器存储接口)Provisioner(供应器)的实现,用于在Kubernetes集群中动态提供持久卷。代码涉及多个组件和步骤,下面是对关键部分的解释:

  1. 初始化和配置
  • 命令行标志和环境变量:代码使用flag包处理命令行参数,如feature-gateskubeconfig等。同时,从环境变量中获取配置,如NODE_NAME
  • 日志和度量:使用klog进行日志记录,并配置Prometheus度量收集器。
  1. Kubernetes客户端配置
  • 构建KubeConfig:根据是否提供了masterkubeconfig参数,决定是使用集群内配置(rest.InClusterConfig)还是外部配置(clientcmd.BuildConfigFromFlags)。
  • 客户端创建:使用配置创建Kubernetes客户端(kubernetes.NewForConfig)和CSI快照客户端(snapclientset.NewForConfig)。
  1. CSI驱动连接和验证
  • 连接到CSI驱动:通过gRPC连接到CSI驱动,并进行基本的探测(ctrl.Probe)以确保驱动可用。
  • 获取驱动名称和能力:从CSI驱动获取驱动名称(ctrl.GetDriverName)和能力(ctrl.GetDriverCapabilities)。
  1. 拓扑和节点信息
  • 拓扑支持:如果CSI驱动支持拓扑,则创建相应的informer来监视节点和CSINode对象。
  • 节点部署:如果启用了节点部署(--enable-node-deployment),则获取节点信息并配置节点部署对象。
  1. Provisioner和Controller创建
  • Provisioner创建:使用获取的配置和客户端创建CSI Provisioner对象,该对象实现了Provisioner接口。
  • 容量控制器:如果启用了容量功能(--enable-capacity),则创建容量控制器来发布存储容量信息。
  1. HTTP服务器和度量
  • HTTP服务器:如果指定了度量地址(--metrics-address)或HTTP端点(--http-endpoint),则启动HTTP服务器来暴露度量和可能的调试端点(如pprof)。
  1. Informers和队列
  • Informer和队列:创建各种资源的Informer来监视Kubernetes对象的变化,并使用工作队列处理事件。
  1. 运行
  • 启动Informer和控制器:启动Informer工厂和控制器,开始监视和处理事件。

总结
这段代码是一个复杂的CSI Provisioner实现,它集成了Kubernetes客户端、CSI驱动、度量收集、拓扑感知、容量管理等多个组件。通过精心设计的架构和模块化的代码,它能够在Kubernetes集群中高效地提供和管理持久卷。

func main() {var config *rest.Configvar err errorflag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n"))klog.InitFlags(nil)flag.CommandLine.AddGoFlagSet(goflag.CommandLine)flag.Set("logtostderr", "true")flag.Parse()ctx := context.Background()if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {klog.Fatal(err)}node := os.Getenv("NODE_NAME")if *enableNodeDeployment && node == "" {klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")}if *showVersion {fmt.Println(os.Args[0], version)os.Exit(0)}klog.Infof("Version: %s", version)if *metricsAddress != "" && *httpEndpoint != "" {klog.Error("only one of `--metrics-address` and `--http-endpoint` can be set.")os.Exit(1)}addr := *metricsAddressif addr == "" {addr = *httpEndpoint}// get the KUBECONFIG from env if specified (useful for local/debug cluster)kubeconfigEnv := os.Getenv("KUBECONFIG")if kubeconfigEnv != "" {klog.Infof("Found KUBECONFIG environment variable set, using that..")kubeconfig = &kubeconfigEnv}if *master != "" || *kubeconfig != "" {klog.Infof("Either master or kubeconfig specified. building kube config from that..")config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)} else {klog.Infof("Building kube configs for running in cluster...")config, err = rest.InClusterConfig()}if err != nil {klog.Fatalf("Failed to create config: %v", err)}config.QPS = *kubeAPIQPSconfig.Burst = *kubeAPIBurstclientset, err := kubernetes.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create client: %v", err)}// snapclientset.NewForConfig creates a new Clientset for  VolumesnapshotV1ClientsnapClient, err := snapclientset.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create snapshot client: %v", err)}var gatewayClient gatewayclientset.Interfaceif utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {// gatewayclientset.NewForConfig creates a new Clientset for GatewayClientgatewayClient, err = gatewayclientset.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create gateway client: %v", err)}}metricsManager := metrics.NewCSIMetricsManagerWithOptions("", /* driverName */// Will be provided via default gatherer.metrics.WithProcessStartTime(false),metrics.WithSubsystem(metrics.SubsystemSidecar),)grpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)if err != nil {klog.Error(err.Error())os.Exit(1)}err = ctrl.Probe(ctx, grpcClient, *operationTimeout)if err != nil {klog.Error(err.Error())os.Exit(1)}// Autodetect provisioner nameprovisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)if err != nil {klog.Fatalf("Error getting CSI driver name: %s", err)}klog.V(2).Infof("Detected CSI driver %s", provisionerName)metricsManager.SetDriverName(provisionerName)translator := csitrans.New()supportsMigrationFromInTreePluginName := ""if translator.IsMigratedCSIDriverByName(provisionerName) {supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName)if err != nil {klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err)}klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName)// Create a new connection with the metrics manager with migrated labelmetricsManager = metrics.NewCSIMetricsManagerWithOptions(provisionerName,// Will be provided via default gatherer.metrics.WithProcessStartTime(false),metrics.WithMigration())migratedGrpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)if err != nil {klog.Error(err.Error())os.Exit(1)}grpcClient.Close()grpcClient = migratedGrpcClienterr = ctrl.Probe(ctx, grpcClient, *operationTimeout)if err != nil {klog.Error(err.Error())os.Exit(1)}}// Prepare http endpoint for metrics + leader election healthzmux := http.NewServeMux()gatherers := prometheus.Gatherers{// For workqueue and leader election metrics, set up via the anonymous imports of:// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go//// Also to happens to include Go runtime and process metrics:// https://github.com/kubernetes/kubernetes/blob/9780d88cb6a4b5b067256ecb4abf56892093ee87/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.goL46-L49legacyregistry.DefaultGatherer,// For CSI operations.metricsManager.GetRegistry(),}pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)if err != nil {klog.Fatalf("Error getting CSI driver capabilities: %s", err)}// Generate a unique ID for this provisionertimeStamp := time.Now().UnixNano() / int64(time.Millisecond)identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerNameif *enableNodeDeployment {identity = identity + "-" + node}factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)var factoryForNamespace informers.SharedInformerFactory // usually nil, only used for CSIStorageCapacity// -------------------------------// Listers// Create informer to prevent hit the API server for all resource requestscLister := factory.Storage().V1().StorageClasses().Lister()claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()var vaLister storagelistersv1.VolumeAttachmentListerif controllerCapabilities[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] {klog.Info("CSI driver supports PUBLISH_UNPUBLISH_VOLUME, watching VolumeAttachments")vaLister = factory.Storage().V1().VolumeAttachments().Lister()} else {klog.Info("CSI driver does not support PUBLISH_UNPUBLISH_VOLUME, not watching VolumeAttachments")}var nodeDeployment *ctrl.NodeDeploymentif *enableNodeDeployment {nodeDeployment = &ctrl.NodeDeployment{NodeName:         node,ClaimInformer:    factory.Core().V1().PersistentVolumeClaims(),ImmediateBinding: *nodeDeploymentImmediateBinding,BaseDelay:        *nodeDeploymentBaseDelay,MaxDelay:         *nodeDeploymentMaxDelay,}nodeInfo, err := ctrl.GetNodeInfo(grpcClient, *operationTimeout)if err != nil {klog.Fatalf("Failed to get node info from CSI driver: %v", err)}nodeDeployment.NodeInfo = *nodeInfo}var nodeLister listersv1.NodeListervar csiNodeLister storagelistersv1.CSINodeListerif ctrl.SupportsTopology(pluginCapabilities) {if nodeDeployment != nil {// Avoid watching in favor of fake, static objects. This is particularly relevant for// Node objects, which can generate significant traffic.csiNode := &storagev1.CSINode{ObjectMeta: metav1.ObjectMeta{Name: nodeDeployment.NodeName,},Spec: storagev1.CSINodeSpec{Drivers: []storagev1.CSINodeDriver{{Name:   provisionerName,NodeID: nodeDeployment.NodeInfo.NodeId,},},},}node := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeDeployment.NodeName,},}if nodeDeployment.NodeInfo.AccessibleTopology != nil {for key := range nodeDeployment.NodeInfo.AccessibleTopology.Segments {csiNode.Spec.Drivers[0].TopologyKeys = append(csiNode.Spec.Drivers[0].TopologyKeys, key)}node.Labels = nodeDeployment.NodeInfo.AccessibleTopology.Segments}klog.Infof("using local topology with Node = %+v and CSINode = %+v", node, csiNode)// We make those fake objects available to the topology code via informers which// never change.stoppedFactory := informers.NewSharedInformerFactory(clientset, 1000*time.Hour)csiNodes := stoppedFactory.Storage().V1().CSINodes()nodes := stoppedFactory.Core().V1().Nodes()csiNodes.Informer().GetStore().Add(csiNode)nodes.Informer().GetStore().Add(node)csiNodeLister = csiNodes.Lister()nodeLister = nodes.Lister()} else {csiNodeLister = factory.Storage().V1().CSINodes().Lister()nodeLister = factory.Core().V1().Nodes().Lister()}}var referenceGrantLister referenceGrantv1beta1.ReferenceGrantListervar gatewayFactory gatewayInformers.SharedInformerFactoryif utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {gatewayFactory = gatewayInformers.NewSharedInformerFactory(gatewayClient, ctrl.ResyncPeriodOfReferenceGrantInformer)referenceGrants := gatewayFactory.Gateway().V1beta1().ReferenceGrants()referenceGrantLister = referenceGrants.Lister()}// -------------------------------// PersistentVolumeClaims informerrateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()// Setup optionsprovisionerOptions := []func(*controller.ProvisionController) error{controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.controller.FailedProvisionThreshold(0),controller.FailedDeleteThreshold(0),controller.RateLimiter(rateLimiter),controller.Threadiness(int(*workerThreads)),controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),controller.ClaimsInformer(claimInformer),controller.NodesLister(nodeLister),}if utilfeature.DefaultFeatureGate.Enabled(features.HonorPVReclaimPolicy) {provisionerOptions = append(provisionerOptions, controller.AddFinalizer(true))}if supportsMigrationFromInTreePluginName != "" {provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))}// Create the provisioner: it implements the Provisioner interface expected by// the controllercsiProvisioner := ctrl.NewCSIProvisioner(clientset,*operationTimeout,identity,*volumeNamePrefix,*volumeNameUUIDLength,grpcClient,snapClient,provisionerName,pluginCapabilities,controllerCapabilities,supportsMigrationFromInTreePluginName,*strictTopology,*immediateTopology,translator,scLister,csiNodeLister,nodeLister,claimLister,vaLister,referenceGrantLister,*extraCreateMetadata,*defaultFSType,nodeDeployment,*controllerPublishReadOnly,*preventVolumeModeConversion,)var capacityController *capacity.Controllerif *enableCapacity {// Publishing storage capacity information uses its own client// with separate rate limiting.config.QPS = *kubeAPICapacityQPSconfig.Burst = *kubeAPICapacityBurstclientset, err := kubernetes.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create client: %v", err)}namespace := os.Getenv("NAMESPACE")if namespace == "" {klog.Fatal("need NAMESPACE env variable for CSIStorageCapacity objects")}var controller *metav1.OwnerReferenceif *capacityOwnerrefLevel >= 0 {podName := os.Getenv("POD_NAME")if podName == "" {klog.Fatal("need POD_NAME env variable to determine CSIStorageCapacity owner")}var err errorcontroller, err = owner.Lookup(config, namespace, podName,schema.GroupVersionKind{Group:   "",Version: "v1",Kind:    "Pod",}, *capacityOwnerrefLevel)if err != nil {klog.Fatalf("look up owner(s) of pod: %v", err)}klog.Infof("using %s/%s %s as owner of CSIStorageCapacity objects", controller.APIVersion, controller.Kind, controller.Name)}var topologyInformer topology.Informerif nodeDeployment == nil {topologyInformer = topology.NewNodeTopology(provisionerName,clientset,factory.Core().V1().Nodes(),factory.Storage().V1().CSINodes(),workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"),)} else {var segment topology.Segmentif nodeDeployment.NodeInfo.AccessibleTopology != nil {for key, value := range nodeDeployment.NodeInfo.AccessibleTopology.Segments {segment = append(segment, topology.SegmentEntry{Key: key, Value: value})}}klog.Infof("producing CSIStorageCapacity objects with fixed topology segment %s", segment)topologyInformer = topology.NewFixedNodeTopology(&segment)}go topologyInformer.RunWorker(ctx)managedByID := "external-provisioner"if *enableNodeDeployment {managedByID = getNameWithMaxLength(managedByID, node, validation.DNS1035LabelMaxLength)}// We only need objects from our own namespace. The normal factory would give// us an informer for the entire cluster. We can further restrict the// watch to just those objects with the right labels.factoryForNamespace = informers.NewSharedInformerFactoryWithOptions(clientset,ctrl.ResyncPeriodOfCsiNodeInformer,informers.WithNamespace(namespace),informers.WithTweakListOptions(func(lo *metav1.ListOptions) {lo.LabelSelector = labels.Set{capacity.DriverNameLabel: provisionerName,capacity.ManagedByLabel:  managedByID,}.AsSelector().String()}),)// We use the V1 CSIStorageCapacity API if available.clientFactory := capacity.NewV1ClientFactory(clientset)cInformer := factoryForNamespace.Storage().V1().CSIStorageCapacities()// This invalid object is used in a v1 Create call to determine// based on the resulting error whether the v1 API is supported.invalidCapacity := &storagev1.CSIStorageCapacity{ObjectMeta: metav1.ObjectMeta{Name: "%123-invalid-name",},}createdCapacity, err := clientset.StorageV1().CSIStorageCapacities(namespace).Create(ctx, invalidCapacity, metav1.CreateOptions{})switch {case err == nil:klog.Fatalf("creating an invalid v1.CSIStorageCapacity didn't fail as expected, got: %s", createdCapacity)case apierrors.IsNotFound(err):// We need to bridge between the v1beta1 API on the// server and the v1 API expected by the capacity code.klog.Info("using the CSIStorageCapacity v1beta1 API")clientFactory = capacity.NewV1beta1ClientFactory(clientset)cInformer = capacity.NewV1beta1InformerBridge(factoryForNamespace.Storage().V1beta1().CSIStorageCapacities())case apierrors.IsInvalid(err):klog.Info("using the CSIStorageCapacity v1 API")default:klog.Fatalf("unexpected error when checking for the V1 CSIStorageCapacity API: %v", err)}capacityController = capacity.NewCentralCapacityController(csi.NewControllerClient(grpcClient),provisionerName,clientFactory,// Metrics for the queue is available in the default registry.workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),controller,managedByID,namespace,topologyInformer,factory.Storage().V1().StorageClasses(),cInformer,*capacityPollInterval,*capacityImmediateBinding,*operationTimeout,)legacyregistry.CustomMustRegister(capacityController)// Wrap Provision and Delete to detect when it is time to refresh capacity.csiProvisioner = capacity.NewProvisionWrapper(csiProvisioner, capacityController)}if addr != "" {// Start HTTP server, regardless whether we are the leader or not.// Register provisioner metrics manually to be able to add multiplexer in front of itm := libmetrics.New("controller")reg := prometheus.NewRegistry()reg.MustRegister([]prometheus.Collector{m.PersistentVolumeClaimProvisionTotal,m.PersistentVolumeClaimProvisionFailedTotal,m.PersistentVolumeClaimProvisionDurationSeconds,m.PersistentVolumeDeleteTotal,m.PersistentVolumeDeleteFailedTotal,m.PersistentVolumeDeleteDurationSeconds,}...)provisionerOptions = append(provisionerOptions, controller.MetricsInstance(m))gatherers = append(gatherers, reg)// This is similar to k8s.io/component-base/metrics HandlerWithReset// except that we gather from multiple sources. This is necessary// because both CSI metrics manager and component-base manage// their own registry. Probably could be avoided by making// CSI metrics manager a bit more flexible.mux.Handle(*metricsPath,promhttp.InstrumentMetricHandler(reg,promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{})))if *enableProfile {klog.InfoS("Starting profiling", "endpoint", httpEndpoint)mux.HandleFunc("/debug/pprof/", pprof.Index)mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)mux.HandleFunc("/debug/pprof/profile", pprof.Profile)mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)mux.HandleFunc("/debug/pprof/trace", pprof.Trace)}go func() {klog.Infof("ServeMux listening at %q", addr)err := http.ListenAndServe(addr, mux)if err != nil {klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, *metricsPath, err)}}()}logger := klog.FromContext(ctx)provisionController = controller.NewProvisionController(logger,clientset,provisionerName,csiProvisioner,provisionerOptions...,)csiClaimController := ctrl.NewCloningProtectionController(clientset,claimLister,claimInformer,claimQueue,controllerCapabilities,)run := func(ctx context.Context) {factory.Start(ctx.Done())if factoryForNamespace != nil {// Starting is enough, the capacity controller will// wait for sync.factoryForNamespace.Start(ctx.Done())}cacheSyncResult := factory.WaitForCacheSync(ctx.Done())for _, v := range cacheSyncResult {if !v {klog.Fatalf("Failed to sync Informers!")}}if utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {if gatewayFactory != nil {gatewayFactory.Start(ctx.Done())}gatewayCacheSyncResult := gatewayFactory.WaitForCacheSync(ctx.Done())for _, v := range gatewayCacheSyncResult {if !v {klog.Fatalf("Failed to sync Informers for gateway!")}}}if capacityController != nil {go capacityController.Run(ctx, int(*capacityThreads))}if csiClaimController != nil {go csiClaimController.Run(ctx, int(*finalizerThreads))}provisionController.Run(ctx)}if !*enableLeaderElection {run(ctx)} else {// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/controller// to preserve backwards compatibilitylockName := strings.Replace(provisionerName, "/", "-", -1)// create a new clientset for leader electionleClientset, err := kubernetes.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create leaderelection client: %v", err)}le := leaderelection.NewLeaderElection(leClientset, lockName, run)if *httpEndpoint != "" {le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)}if *leaderElectionNamespace != "" {le.WithNamespace(*leaderElectionNamespace)}le.WithLeaseDuration(*leaderElectionLeaseDuration)le.WithRenewDeadline(*leaderElectionRenewDeadline)le.WithRetryPeriod(*leaderElectionRetryPeriod)le.WithIdentity(identity)if err := le.Run(); err != nil {klog.Fatalf("failed to initialize leader election: %v", err)}}
}

NewProvisionController()

  1. 获取主机名和生成唯一ID
    • 使用os.Hostname()获取当前主机名,如果获取失败,则记录错误日志并退出程序。
    • 将主机名与一个UUID结合,生成一个唯一的ID,以避免在同一主机上运行的多个进程发生冲突。
  2. 初始化事件记录器
    • 使用record.NewBroadcaster()创建一个事件广播器,并配置其进行结构化日志记录和事件记录。
    • 创建一个eventRecorder,用于记录事件。
  3. 创建并初始化ProvisionController实例
    • 初始化ProvisionController结构体,包括客户端、供应器名称、供应器实现、ID、组件名、事件记录器等字段。
    • 设置一系列默认值,如重同步周期、错误时的指数退避策略、线程数、失败阈值等。
    • 初始化指标相关配置。
  4. 处理选项函数
    • 遍历传入的选项函数列表,对每个函数进行调用,以配置ProvisionController实例。如果某个选项函数执行失败,则记录错误日志并退出程序。
  5. 初始化速率限制器和工作队列
    • 根据配置创建速率限制器,并用于初始化claimQueuevolumeQueue两个工作队列。
  6. 初始化Informer和事件处理器
    • 使用informers.NewSharedInformerFactory创建共享Informer工厂。
    • 为PersistentVolumeClaims(PVCs)、PersistentVolumes(PVs)和StorageClasses分别设置事件处理器和Indexer。
    • Informer用于监听Kubernetes资源的变化,并根据变化触发相应的事件处理函数。
  7. 初始化VolumeStore
    • 根据配置选择使用NewVolumeStoreQueueNewBackoffStore来初始化volumeStore,用于处理PV的创建和保存逻辑。
// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(logger klog.Logger,client kubernetes.Interface,provisionerName string,provisioner Provisioner,options ...func(*ProvisionController) error,
) *ProvisionController {id, err := os.Hostname()if err != nil {logger.Error(err, "Error getting hostname")klog.FlushAndExit(klog.ExitFlushTimeout, 1)}// add a uniquifier so that two processes on the same host don't accidentally both become activeid = id + "_" + string(uuid.NewUUID())component := provisionerName + "_" + id// TODO: Once the following PR is merged, change to use StartLogging and StartRecordingToSinkWithContext// https://github.com/kubernetes/kubernetes/pull/120729v1.AddToScheme(scheme.Scheme)broadcaster := record.NewBroadcaster()broadcaster.StartStructuredLogging(0)broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)})eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})controller := &ProvisionController{client:                    client,provisionerName:           provisionerName,provisioner:               provisioner,id:                        id,component:                 component,eventRecorder:             eventRecorder,resyncPeriod:              DefaultResyncPeriod,exponentialBackOffOnError: DefaultExponentialBackOffOnError,threadiness:               DefaultThreadiness,failedProvisionThreshold:  DefaultFailedProvisionThreshold,failedDeleteThreshold:     DefaultFailedDeleteThreshold,leaderElection:            DefaultLeaderElection,leaderElectionNamespace:   getInClusterNamespace(),leaseDuration:             DefaultLeaseDuration,renewDeadline:             DefaultRenewDeadline,retryPeriod:               DefaultRetryPeriod,metrics:                   metrics.New(controllerSubsystem),metricsPort:               DefaultMetricsPort,metricsAddress:            DefaultMetricsAddress,metricsPath:               DefaultMetricsPath,addFinalizer:              DefaultAddFinalizer,hasRun:                    false,hasRunLock:                &sync.Mutex{},}for _, option := range options {err := option(controller)if err != nil {logger.Error(err, "Error processing controller options")klog.FlushAndExit(klog.ExitFlushTimeout, 1)}}var rateLimiter workqueue.RateLimiterif controller.rateLimiter != nil {// rateLimiter set via parameter takes precedencerateLimiter = controller.rateLimiter} else if controller.exponentialBackOffOnError {rateLimiter = workqueue.NewMaxOfRateLimiter(workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second),&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},)} else {rateLimiter = workqueue.NewMaxOfRateLimiter(workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second),&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},)}controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)// ----------------------// PersistentVolumeClaimsclaimHandler := cache.ResourceEventHandlerFuncs{AddFunc:    func(obj interface{}) { controller.enqueueClaim(obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },DeleteFunc: func(obj interface{}) {// NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual// or it's not in claimsInProgress and then we don't care},}if controller.claimInformer != nil {controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)} else {controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()controller.claimInformer.AddEventHandler(claimHandler)}err = controller.claimInformer.AddIndexers(cache.Indexers{uidIndex: func(obj interface{}) ([]string, error) {uid, err := getObjectUID(obj)if err != nil {return nil, err}return []string{uid}, nil}})if err != nil {logger.Error(err, "Error setting indexer for pvc informer", "indexer", uidIndex)klog.FlushAndExit(klog.ExitFlushTimeout, 1)}controller.claimsIndexer = controller.claimInformer.GetIndexer()// -----------------// PersistentVolumesvolumeHandler := cache.ResourceEventHandlerFuncs{AddFunc:    func(obj interface{}) { controller.enqueueVolume(obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) },DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) },}if controller.volumeInformer != nil {controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)} else {controller.volumeInformer = informer.Core().V1().PersistentVolumes().Informer()controller.volumeInformer.AddEventHandler(volumeHandler)}controller.volumes = controller.volumeInformer.GetStore()// --------------// StorageClasses// no resource event handler needed for StorageClassesif controller.classInformer == nil {controller.classInformer = informer.Storage().V1().StorageClasses().Informer()}controller.classes = controller.classInformer.GetStore()if controller.createProvisionerPVLimiter != nil {logger.V(2).Info("Using saving PVs to API server in background")controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter, controller.claimsIndexer, controller.eventRecorder)} else {if controller.createProvisionedPVBackoff == nil {// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.if controller.createProvisionedPVInterval == 0 {controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval}if controller.createProvisionedPVRetryCount == 0 {controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount}controller.createProvisionedPVBackoff = &wait.Backoff{Duration: controller.createProvisionedPVInterval,Factor:   1, // linear backoffSteps:    controller.createProvisionedPVRetryCount,// Cap:      controller.createProvisionedPVInterval,}}logger.V(2).Info("Using blocking saving PVs to API server")controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller)}return controller
}

syncClaim()

  1. 判断是否应该进行供给:
    • 调用ctrl.shouldProvision(ctx, claim)方法来判断是否需要对这个PVC进行供给操作。如果返回错误,则更新供给统计信息并返回错误。
    • 如果shouldtrue,表示需要进行供给操作。
  2. 供给操作:
    • 记录供给操作的开始时间。
    • 从上下文中获取logger对象。
    • 调用ctrl.provisionClaimOperation(ctx, claim)方法进行供给操作,返回操作状态和可能的错误。
    • 更新供给统计信息,传入错误和开始时间。
  3. 处理供给操作的结果:
    • 如果供给操作没有错误或者状态是ProvisioningFinished,表示供给操作已经完成或者不需要进行。根据错误类型进行不同的处理:
      • 如果没有错误,记录日志并删除该PVC在claimsInProgress中的记录。
      • 如果错误是errStopProvision,记录日志并将错误置为nil(调用者会重新排队处理)。
      • 其他错误类型,记录日志。
    • 如果供给状态是ProvisioningInBackground,表示供给操作正在后台进行,记录日志并将PVC添加到claimsInProgress中。
    • 如果供给状态是ProvisioningNoChange,不做任何修改,保持claimsInProgress的状态不变。
  4. 返回错误:
    • 如果不需要进行供给操作或者供给操作已经完成并且没有需要处理的错误,则返回nil
    • 否则,返回供给操作中的错误。
      这段代码的主要逻辑是围绕PVC的供给状态进行操作,根据供给的结果更新内部状态(如claimsInProgress),并记录相关的日志信息。通过这种方式,ProvisionController能够管理多个PVC的供给过程,确保每个PVC都能够被正确地处理。
func (ctrl *ProvisionController) syncClaim(ctx context.Context, obj interface{}) error {claim, ok := obj.(*v1.PersistentVolumeClaim)if !ok {return fmt.Errorf("expected claim but got %+v", obj)}should, err := ctrl.shouldProvision(ctx, claim)if err != nil {ctrl.updateProvisionStats(claim, err, time.Time{})return err} else if should {startTime := time.Now()logger := klog.FromContext(ctx)status, err := ctrl.provisionClaimOperation(ctx, claim)ctrl.updateProvisionStats(claim, err, startTime)if err == nil || status == ProvisioningFinished {// Provisioning is 100% finished / not in progress.switch err {case nil:logger.V(5).Info("Claim processing succeeded, removing PVC from claims in progress", "claimUID", claim.UID)case errStopProvision:logger.V(5).Info("Stop provisioning, removing PVC from claims in progress", "claimUID", claim.UID)// Our caller would requeue if we pass on this special error; return nil instead.err = nildefault:logger.V(2).Info("Final error received, removing PVC from claims in progress", "claimUID", claim.UID)}ctrl.claimsInProgress.Delete(string(claim.UID))return err}if status == ProvisioningInBackground {// Provisioning is in progress in background.logger.V(2).Info("Temporary error received, adding PVC to claims in progress", "claimUID", claim.UID)ctrl.claimsInProgress.Store(string(claim.UID), claim)} else {// status == ProvisioningNoChange.// Don't change claimsInProgress:// - the claim is already there if previous status was ProvisioningInBackground.// - the claim is not there if if previous status was ProvisioningFinished.}return err}return nil
}
shouldProvision()
  1. 检查PVC是否已指定卷名
    • 如果claim.Spec.VolumeName不为空,表示这个PVC已经绑定到了一个具体的卷上,因此不需要再进行供给。方法返回false, nil
  2. 检查Provisioner是否实现了Qualifier接口
    • 通过类型断言ctrl.provisioner.(Qualifier)检查ctrl.provisioner是否实现了Qualifier接口。
    • 如果实现了,并且Qualifier接口的ShouldProvision方法返回false,则表示不需要进行供给。方法返回false, nil
  3. 检查PVC的注解以确定Provisioner
    • 首先尝试从PVC的注解中获取annStorageProvisioner的值。
    • 如果不存在,则尝试获取annBetaStorageProvisioner的值。
    • 这两个注解用于指定负责供给卷的Provisioner。
  4. 检查找到的Provisioner是否是已知的
    • 如果找到了Provisioner的注解,并且这个Provisioner是控制器已知的(通过ctrl.knownProvisioner(provisioner)检查),则继续下一步。
  5. 检查StorageClass的VolumeBindingMode
    • 通过util.GetPersistentVolumeClaimClass(claim)获取PVC所属的StorageClass。
    • 通过ctrl.getStorageClass(claimClass)获取这个StorageClass的详细信息。
    • 检查StorageClass的
      检查StorageClass的VolumeBindingMode。如果设置为storage.VolumeBindingWaitForFirstConsumer(即延迟绑定模式),则需要进一步检查PVC的注解中是否有annSelectedNode
      • 如果有annSelectedNode且其值不为空,表示已经选定了节点,可以进行供给。方法返回true, nil
      • 如果没有或值为空,则不进行供给。方法返回false, nil
  6. 默认进行供给
    • 如果StorageClass的VolumeBindingMode不是延迟绑定模式,或者没有找到VolumeBindingMode,则默认需要进行供给。方法返回true, nil
  7. 未找到Provisioner
    • 如果在PVC的注解中没有找到任何Provisioner的标识,则不进行供给。方法返回false, nil

总结来说,这段代码通过检查PVC的各种属性和注解,以及关联的StorageClass的配置,来决定是否需要对这个PVC进行卷的供给。这涉及到检查是否已经指定了卷、是否满足特定的供给条件、是否使用了延迟绑定模式等多个方面

// shouldProvision returns whether a claim should have a volume provisioned for
// it, i.e. whether a Provision is "desired"
func (ctrl *ProvisionController) shouldProvision(ctx context.Context, claim *v1.PersistentVolumeClaim) (bool, error) {if claim.Spec.VolumeName != "" {return false, nil}if qualifier, ok := ctrl.provisioner.(Qualifier); ok {if !qualifier.ShouldProvision(ctx, claim) {return false, nil}}provisioner, found := claim.Annotations[annStorageProvisioner]if !found {provisioner, found = claim.Annotations[annBetaStorageProvisioner]}if found {if ctrl.knownProvisioner(provisioner) {claimClass := util.GetPersistentVolumeClaimClass(claim)class, err := ctrl.getStorageClass(claimClass)if err != nil {return false, err}if class.VolumeBindingMode != nil && *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {// When claim is in delay binding mode, annSelectedNode is// required to provision volume.// Though PV controller set annStorageProvisioner only when// annSelectedNode is set, but provisioner may remove// annSelectedNode to notify scheduler to reschedule again.if selectedNode, ok := claim.Annotations[annSelectedNode]; ok && selectedNode != "" {return true, nil}return false, nil}return true, nil}}return false, nil
}
provisionClaimOperation()
  1. 获取PVC的类别
    • 使用util.GetPersistentVolumeClaimClass(claim)获取PVC的存储类别(StorageClass)。
  2. 日志记录
    • 使用Kubernetes的日志库klog来记录日志,包括PVC和StorageClass的信息。
  3. 检查PV是否已经存在
    • 通过ctrl.getProvisionedVolumeNameForClaim(claim)获取预期的PV名称,然后检查这个PV是否已经在ctrl.volumes中存在。如果存在,说明PV已经被分配,函数返回ProvisioningFinishederrStopProvision
  4. 获取PVC的引用
    • 使用ref.GetReference(scheme.Scheme, claim)获取PVC的引用,以便在后续操作中引用这个PVC对象。
  5. 检查是否可以分配
    • 调用ctrl.canProvision(ctx, claim)检查当前的ProvisionController是否可以处理这个PVC的分配请求。如果不能,记录事件并返回错误。
  6. 获取StorageClass信息
    • 通过ctrl.getStorageClass(claimClass)获取PVC指定的StorageClass的信息。如果获取失败或StorageClass的Provisioner不被当前ProvisionController支持,则记录错误并返回。
  7. 获取选定的节点
    • 如果PVC的注解中指定了选定的节点(annSelectedNodeannAlphaSelectedNode),则尝试获取这个节点的信息。如果节点不存在,调用ctrl.provisionVolumeErrorHandling处理错误。
  8. 准备分配选项
    • 创建一个ProvisionOptions对象,包含StorageClass、PV名称、PVC对象和选定的节点信息。
  9. 记录正常事件
    • 使用ctrl.eventRecorder.Event记录一个正常事件,表示外部Provisioner正在为PVC分配存储卷。
  10. 调用Provisioner进行分配
    • 调用ctrl.provisioner.Provision(ctx, options)尝试分配存储卷。如果分配失败,根据错误类型进行相应的错误处理。
  11. 设置PVC的引用和Finalizer
    • 如果分配成功,设置PV的ClaimRef为PVC的引用,并根据需要添加Finalizer。
  12. 更新PV的元数据和存储类别
    • 更新PV的注解和存储类别信息。
  13. 存储和添加PV
    • 使用ctrl.volumeStore.StoreVolume存储PV信息,并将PV添加到ctrl.volumes中。
  14. 返回结果
    • 如果所有操作都成功,函数返回ProvisioningFinishednil表示成功完成分配。

这个函数涵盖了从检查PV是否存在到实际分配存储卷,再到更新内部状态和记录相关事件的整个过程。它是Kubernetes存储卷分配流程中的一个关键部分,确保了PVC能够被正确地处理和分配存储资源。

func (ctrl *ProvisionController) provisionClaimOperation(ctx context.Context, claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {// Most code here is identical to that found in controller.go of kube's PV controller...claimClass := util.GetPersistentVolumeClaimClass(claim)logger := klog.LoggerWithValues(klog.FromContext(ctx), "PVC", klog.KObj(claim), "StorageClass", claimClass)logger.V(4).Info("Started")//  A previous doProvisionClaim may just have finished while we were waiting for//  the locks. Check that PV (with deterministic name) hasn't been provisioned//  yet.pvName := ctrl.getProvisionedVolumeNameForClaim(claim)_, exists, err := ctrl.volumes.GetByKey(pvName)if err == nil && exists {// Volume has been already provisioned, nothing to do.logger.V(4).Info("PersistentVolume already exists, skipping", "PV", pvName)return ProvisioningFinished, errStopProvision}// Prepare a claimRef to the claim early (to fail before a volume is// provisioned)claimRef, err := ref.GetReference(scheme.Scheme, claim)if err != nil {logger.Error(err, "Unexpected error getting claim reference")return ProvisioningNoChange, err}// Check if this provisioner can provision this claim.if err = ctrl.canProvision(ctx, claim); err != nil {ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())logger.Error(err, "Failed to provision volume")return ProvisioningFinished, errStopProvision}// For any issues getting fields from StorageClass (including reclaimPolicy & mountOptions),// retry the claim because the storageClass can be fixed/(re)created independently of the claimclass, err := ctrl.getStorageClass(claimClass)if err != nil {logger.Error(err, "Error getting claim's StorageClass's fields")return ProvisioningFinished, err}if !ctrl.knownProvisioner(class.Provisioner) {// class.Provisioner has either changed since shouldProvision() or// annDynamicallyProvisioned contains different provisioner than// class.Provisioner.logger.Error(nil, "Unknown provisioner requested in claim's StorageClass", "provisioner", class.Provisioner)return ProvisioningFinished, errStopProvision}var selectedNode *v1.Node// Get SelectedNodeif nodeName, ok := getString(claim.Annotations, annSelectedNode, annAlphaSelectedNode); ok {if ctrl.nodeLister != nil {selectedNode, err = ctrl.nodeLister.Get(nodeName)} else {selectedNode, err = ctrl.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) // TODO (verult) cache Nodes}if err != nil {// if node does not exist, reschedule and remove volume.kubernetes.io/selected-node annotationif apierrs.IsNotFound(err) {ctx2 := klog.NewContext(ctx, logger)return ctrl.provisionVolumeErrorHandling(ctx2, ProvisioningReschedule, err, claim)}err = fmt.Errorf("failed to get target node: %v", err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())return ProvisioningNoChange, err}}options := ProvisionOptions{StorageClass: class,PVName:       pvName,PVC:          claim,SelectedNode: selectedNode,}ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", klog.KObj(claim)))volume, result, err := ctrl.provisioner.Provision(ctx, options)if err != nil {if ierr, ok := err.(*IgnoredError); ok {// Provision ignored, do nothing and hope another provisioner will provision it.logger.V(4).Info("Volume provision ignored", "reason", ierr)return ProvisioningFinished, errStopProvision}ctx2 := klog.NewContext(ctx, logger)err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)return ctrl.provisionVolumeErrorHandling(ctx2, result, err, claim)}logger.V(4).Info("Volume is provisioned", "PV", volume.Name)// Set ClaimRef and the PV controller will bind and set annBoundByController for usvolume.Spec.ClaimRef = claimRef// Add external provisioner finalizer if it doesn't already have itif ctrl.addFinalizer && !ctrl.checkFinalizer(volume, finalizerPV) {volume.ObjectMeta.Finalizers = append(volume.ObjectMeta.Finalizers, finalizerPV)}metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, class.Provisioner)volume.Spec.StorageClassName = claimClasslogger.V(4).Info("Succeeded")if err := ctrl.volumeStore.StoreVolume(logger, claim, volume); err != nil {return ProvisioningFinished, err}if err = ctrl.volumes.Add(volume); err != nil {utilruntime.HandleError(err)}return ProvisioningFinished, nil
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/76783.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

react中通过 EventEmitter 在组件间传递状态

要在 Reply 组件中通过 statusChangeEvent 发送状态值,并在 Select 组件中接收这个状态值 status,你可以按照以下步骤实现: //Event.jsimport EventEmitter from events;export const statusChangeEvent new EventEmitter();// 工单状态切换…

1534. 统计好三元组

1534. 统计好三元组 - 力扣&#xff08;LeetCode&#xff09; 给你一个整数数组 arr &#xff0c;以及 a、b 、c 三个整数。请你统计其中好三元组的数量。 如果三元组 (arr[i], arr[j], arr[k]) 满足下列全部条件&#xff0c;则认为它是一个 好三元组 。 0 < i < j &l…

如何配置AWS EKS自动扩展组:实现高效弹性伸缩

本文详细讲解如何在AWS EKS中配置节点组&#xff08;Node Group&#xff09;和Pod的自动扩展&#xff0c;优化资源利用率并保障应用高可用。 一、准备工作 工具安装 安装并配置AWS CLI 安装eksctl&#xff08;EKS管理工具&#xff09; 安装kubectl&#xff08;Kubernetes命令…

FPGA_UART

1.UART 概述 &#xff08;通用异步收发传输器&#xff09; 1. 基本定义 UART&#xff08;Universal Asynchronous Receiver/Transmitter&#xff09;是一种常见的串行通信协议&#xff0c;用于在设备间通过异步串行通信传输数据。它不依赖独立的时钟信号&#xff0c;而是通过预…

openwrt软路由配置4--文件共享

1.安装samba opkg update opkg install luci-app-samba4安装好之后重启设备&#xff0c;系统界面服务下面会多一个network shares 2.创建磁盘分区并挂载到共享目录 openwrt刚刚安装的时候空间都是很小的&#xff0c;共享目录我是打算用来存放一些电影视频之类的大文件。所以我…

Vue ‘v-model‘ directives require the attribute value which is valid as LHS.

1、问题描述 在项目开发中&#xff0c;如果将el-checkbox组件的v-model指令改为使用三元表达式时&#xff0c;会报出【vue/valid-v-model】的错误&#xff0c;如下图所示&#xff1a; 2、分析原因 根据错误提示&#xff0c;是因为v-model指令始终把Vue实例的data视为数据真实…

基于 Qt 的 BMP 图像数据存取至 SQLite 数据库的实现

基于 Qt 的 BMP 图像数据存取至 SQLite 数据库的实现说明 本项目通过 Qt 框架实现了将 BMP 图像文件以二进制形式存入 SQLite 数据库&#xff0c;并可从数据库中读取还原为 BMP 图像文件的功能&#xff0c;适用于需要图像与结构化数据统一管理的场景。 整个流程分为两个主要部…

嵌入式基础(三)基础外设

嵌入式基础&#xff08;三&#xff09;基础外设 1.什么是UART&#xff1f;与USART有什么区别⭐⭐⭐ (1)什么是UART 通用异步收发传输器&#xff08;Universal Asynchronous Receiver/Transmitter)&#xff0c;通常称作UART。是一种异步全双工串行通信协议&#xff0c;它将要…

人力资源管理方向论文怎么写?

目录 一、人力资源管理方向论文选题 二、人力资源管理方向论文参考资料 随着经济的蓬勃发展&#xff0c;企业日益意识到引才、善用人才、留住人才对于业务发展的至关重要性。人力资源管理逐渐成为企业管理中的核心职能&#xff0c;其角色日益凸显。近年来&#xff0c;“人力资…

机器学习 从入门到精通 day_05

1. 线性回归 前面介绍了很多分类算法&#xff0c;分类的目标变量是标称型数据&#xff0c;回归是对连续型的数据做出预测。 标称型数据&#xff08;Nominal Data&#xff09;是统计学和数据分析中的一种数据类型&#xff0c;它用于分类或标记不同的类别或组别,数据点之间并没有…

神经子图同构计数

摘要 本文研究了一个新的图学习问题&#xff1a;学习计算子图同构。与其他传统的图学习问题&#xff0c;如节点分类和链接预测不同&#xff0c;子图同构计数是NP完全的&#xff0c;需要更多的全局推理来监督整个图。为了使其可扩展为大规模的图形和模式&#xff0c;我们提出了一…

开源模型应用落地-模型上下文协议(MCP)-第三方MCP Server实战指南(五)

一、前言 在AI技术高速发展的2025年,如何让大语言模型(LLM)更灵活地调用外部工具与数据,成为开发者关注的焦点。​模型上下文协议(MCP)​作为AI与外部资源的“万能接口”,通过标准化交互框架解决了传统集成中的碎片化问题。而第三方MCP Server的引入,进一步降低了开发门…

【2025年认证杯数学中国数学建模网络挑战赛】C题 数据预处理与问题一二求解

目录 【2025年认证杯数学建模挑战赛】C题数据预处理与问题一求解三、数据预处理及分析3.1 数据可视化3.2 滑动窗口相关系数统计与动态置信区间耦合分析模型3.3 耦合关系分析结果 四、问题一代码数据预处理问题一 【2025年认证杯数学建模挑战赛】C题 数据预处理与问题一求解 三…

AI Agent开发大全第二十八课-MCP实现本地命令调用怎么做的?

开篇 MCP很强大,Client端一旦实现了稳定的连接和执行流程后任Server端随意改动都可兼容,这就是热插拨功能。 如果我们仅仅满足于MCP查点网上资料、读点图片即文字型的功能肯定是不能充分发挥MCP的强大之处的,正应了Google以及Anthropic最近的研究报告上说的:不要再在chat…

AJAX原理与XMLHttpRequest

目录 一、XMLHttpRequest使用步骤 基本语法 步骤 1&#xff1a;创建 XHR 对象 步骤 2&#xff1a;调用 open() 方法 步骤 3&#xff1a;监听 loadend 事件 步骤 4&#xff1a;调用 send() 方法 二、完整示例 1. GET 请求&#xff08;带查询参数&#xff09; 2. POST 请…

python写个0~12个月宝宝喂养规划表

下载字体&#xff1a;https://github.com/adobe-fonts/source-han-sans/releases 下载fpdf2 pip uninstall fpdf pip install fpdf2运行代码 ​from fpdf import FPDF from fpdf.enums import XPos, YPos# 创建 PDF 类 class BabyFeedingPDF(FPDF):def header(self):self.s…

集中趋势描述

一、集中趋势的定义与核心目标 集中趋势指数据向其中心值聚集的倾向,反映数据的典型水平或分布中心。其核心是通过统计指标(如众数、中位数、均值)概括数据的核心特征,帮助快速理解数据分布的核心位置。 核心作用:简化复杂数据、指导业务决策(如确定用户平均消费水平)、…

【NLP】Attention机制

1.模型对比 RNN(马尔科夫链式编码) 通过递归计算逐个处理 token,当前编码结果 h t h_t ht​仅依赖前一步的隐藏状态 h t − 1 h_{t-1} ht−1​和当前输入 x t x_t xt​局限性:序列建模需严格串行,无法并行;长距离依赖易丢失(梯度消失/爆炸)例:双向 LSTM 需正向+反向两…

基于OpenCV与PyTorch的智能相册分类器全栈实现教程

引言&#xff1a;为什么需要智能相册分类器&#xff1f; 在数字影像爆炸的时代&#xff0c;每个人的相册都存储着数千张未整理的照片。手动分类不仅耗时&#xff0c;还容易遗漏重要瞬间。本文将手把手教你构建一个基于深度学习的智能相册分类系统&#xff0c;实现&#xff1a;…

活动安排问题 之 前缀和与差分

文章目录 D. Robert Hood and Mrs Hood 考虑到一个活动开始时间和结束时间s,e&#xff0c;那么可以影响到的范围就是 s-d1,e,所以我们只需对这个每一个活动可以影响到的区域进行标记即可&#xff0c;当然为了降低时间复杂度&#xff0c;我们将使用前缀和与差分 t int(input()…