配置自定义vpc
开启自定义vpc网关配置
对于自定义的vpc网关需要configmap配置文件进行开启
-
ovn-vpc-nat-config中指定了自定义vpc网关的pod使用的镜像,自定义vpc的网关就是一个pod,在pod中通过配置iptables来实现了eip、snat、dnat、fip
-
对于ovn-vpc-nat-gw-config指定了启动自定义vpc网关
kind: ConfigMap
apiVersion: v1
metadata:name: ovn-vpc-nat-confignamespace: kube-system
data:image: 'docker.io/kubeovn/vpc-nat-gateway:v1.12.4'
---
kind: ConfigMap
apiVersion: v1
metadata:name: ovn-vpc-nat-gw-confignamespace: kube-system
data:enable-vpc-nat-gw: 'true'
配置外部子网
下面的配置是创建外部子网
-
cidrBlock是外部子网的网段
-
gateway外部子网网关
-
excludeIps用来排除不可使用的网段,防止分配的eip和外部冲突
-
NetworkAttachmentDefinition资源用来定义macvlan,macvlan可以将一个物理网卡虚拟成多个虚拟网卡。
-
因此每创建一个eip便会根据ens37这个物理网卡创建一个虚拟网卡,配置kubeovn分配的ip地址,并将之放入自定义vpc网关的pod中
apiVersion: kubeovn.io/v1
kind: Subnet
metadata:name: ovn-vpc-external-network
spec:protocol: IPv4provider: ovn-vpc-external-network.kube-systemcidrBlock: 192.168.100.0/24gateway: 192.168.100.1 # IP address of the physical gatewayexcludeIps:- 192.168.100.1..192.168.100.220
---
apiVersion: "k8s.cni.cncf.io/v1"
kind: NetworkAttachmentDefinition
metadata:name: ovn-vpc-external-networknamespace: kube-system
spec:config: '{"cniVersion": "0.3.0","type": "macvlan","master": "ens37","mode": "bridge","ipam": {"type": "kube-ovn","server_socket": "/run/openvswitch/kube-ovn-daemon.sock","provider": "ovn-vpc-external-network.kube-system"}}'
创建自定义vpc
创建一个自定义vpc,每创建一个vpc,kube-ovn会创建一个ovn逻辑路由器,在vpc中定义的staticRoutes会被添加到ovn逻辑路由器上,下面vpc定义的静态路由是将所有的报文的nexthop到10.0.1.254,10.0.1.254是vpc内部子网的一个ip,是vpc的网关ip
kind: Vpc
apiVersion: kubeovn.io/v1
metadata:name: test-vpc-1
spec:namespaces:- ns1staticRoutes:- cidr: 0.0.0.0/0nextHopIP: 10.0.1.254policy: policyDst
在自定义vpc下创建一个子网
在自定义vpc下创建了一个子网和一个pod
kind: Subnet
apiVersion: kubeovn.io/v1
metadata:name: net1
spec:vpc: test-vpc-1cidrBlock: 10.0.1.0/24protocol: IPv4namespaces:- ns1
---
apiVersion: v1
kind: Pod
metadata:annotations:ovn.kubernetes.io/logical_switch: net1namespace: ns1name: vpc1-pod
spec:containers:- name: vpc1-podimage: docker.io/library/nginx:alpine
创建自定义vpc的网关
这里创建了自定义vpc的网关
-
vpc指定了这个网关属于test-vpc-1
-
subnet是test-vpc-1下的一个子网
-
lanIp是步骤2子网里的一个ip地址,用来作为vpc的网关ip,这个ip必须和test-vpc-1里面的静态路由一致
-
externalSubnets指定了外部子网,在创建vpc网关的pod时会从这个外部子网里分配一个eip,用来连接外部网络
kind: VpcNatGateway
apiVersion: kubeovn.io/v1
metadata:name: gw1
spec:vpc: test-vpc-1subnet: net1lanIp: 10.0.1.254externalSubnets:- ovn-vpc-external-network
给自定义vpc添加eip
下面配置给自定义vpc网关的pod创建了两个eip,并且配置了snat规则
-
一个自定义vpc的pod可以创建多个eip,下面便是创建了两个eip
-
v4ip表示给这个eip指定的地址
-
natGwDp表示在哪个网关上添加eip
-
在IptablesSnatRule中eip、internalCIDR表示在自定义vpc网关pod里面创建的snat规则,表示添加源10.1.1.0/24的源ip修改为192.168.100.230的iptables规则
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:name: eip1
spec:v4ip: 192.168.100.230natGwDp: gw1
---
kind: IptablesSnatRule
apiVersion: kubeovn.io/v1
metadata:name: snat01
spec:eip: eip1internalCIDR: 10.1.1.0/24---
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:name: eip2
spec:v4ip: 192.168.100.231natGwDp: gw1
---
kind: IptablesSnatRule
apiVersion: kubeovn.io/v1
metadata:name: snat01
spec:eip: eip2internalCIDR: 10.1.1.0/24
给自定义vpc添加fip
fip表示浮动ip,用来将自定义vpc内部的一个internalIp和eip绑定,这样便可以实现外部网络能够直接访问pod。
-
创建eip和上面的操作一致,关键是fip规则
-
fip规则中指定了eip和internalIp,会在自定义vpc网关的pod里面添加iptables规则,来将eip和internalIp一一绑定
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:name: eip3
spec:v4ip: 192.168.100.232natGwDp: gw1
---
kind: IptablesFIPRule
apiVersion: kubeovn.io/v1
metadata:name: fip01
spec:eip: eip3internalIp: 10.0.1.5
自定义vpc源码分析
开启自定义vpc网关的代码
在上面的配置中需要添加下面的配置才能够使用自定义vpc的网关,下面代码看一下这块配置的作用
kind: ConfigMap
apiVersion: v1
metadata:name: ovn-vpc-nat-confignamespace: kube-system
data:image: 'docker.io/kubeovn/vpc-nat-gateway:v1.12.4'
---
kind: ConfigMap
apiVersion: v1
metadata:name: ovn-vpc-nat-gw-confignamespace: kube-system
data:enable-vpc-nat-gw: 'true'
-
resyncVpcNatGwConfig这个函数是定时执行的,会定时检查是否已经创建了ovn-vpc-nat-config和ovn-vpc-nat-gw-config两个configmap文件,从中获取自定义vpc网关使用pod镜像
-
resyncVpcNatImage中获取配置中的自定义vpc网关的镜像
-
当两个配置都被指定后会更新自定义vpc网关的资源,这是因为在配置文件创建之前可能已经创建了自定义vpc网关
// TODO 是定时执行的 判断是否创建vpc网关的configmap
func (c *Controller) resyncVpcNatGwConfig() {//TODO 從configmap中獲取ovn-vpc-nat-gw-configcm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatGatewayConfig)if err != nil && !k8serrors.IsNotFound(err) {klog.Errorf("failed to get ovn-vpc-nat-gw-config, %v", err)return}if k8serrors.IsNotFound(err) || cm.Data["enable-vpc-nat-gw"] == "false" {if vpcNatEnabled == "false" {return}klog.Info("start to clean up vpc nat gateway")if err := c.cleanUpVpcNatGw(); err != nil {klog.Errorf("failed to clean up vpc nat gateway, %v", err)return}vpcNatEnabled = "false"VpcNatCmVersion = ""klog.Info("finish clean up vpc nat gateway")return}if vpcNatEnabled == "true" && VpcNatCmVersion == cm.ResourceVersion {return}gws, err := c.vpcNatGatewayLister.List(labels.Everything())if err != nil {klog.Errorf("failed to get vpc nat gateway, %v", err)return}//TODO 获取vpc网关镜像if err = c.resyncVpcNatImage(); err != nil {klog.Errorf("failed to resync vpc nat config, err: %v", err)return}vpcNatEnabled = "true"VpcNatCmVersion = cm.ResourceVersionfor _, gw := range gws {c.addOrUpdateVpcNatGatewayQueue.Add(gw.Name)}klog.Info("finish establishing vpc-nat-gateway")
}
// TODO 在部署vpc网关的时候需要创建这个configmap,代码里需要从这个cm中获取镜像
func (c *Controller) resyncVpcNatImage() error {cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatConfig)if err != nil {err = fmt.Errorf("failed to get ovn-vpc-nat-config, %v", err)klog.Error(err)return err}image, exist := cm.Data["image"]if !exist {err = fmt.Errorf("%s should have image field", util.VpcNatConfig)klog.Error(err)return err}vpcNatImage = imagereturn nil
}
创建自定义vpc代码
kind: Vpc
apiVersion: kubeovn.io/v1
metadata:name: test-vpc-1
spec:namespaces:- ns1staticRoutes:- cidr: 0.0.0.0/0nextHopIP: 10.0.1.254policy: policyDst
-
对于创建自定义vpc可以添加静态路由和路由策略
-
ovn会创建一个逻辑交换机
-
给逻辑交换机添加静态路由和路由策略
-
nextHopIp很重要,是网关的ip,kube-ovn会在自定义vpc的逻辑路由器上添加一条将所有流量都引入到10.0.1.254的静态路由,是网关的一个内部ip地址。
func (c *Controller) handleAddOrUpdateVpc(key string) error {c.vpcKeyMutex.LockKey(key)defer func() { _ = c.vpcKeyMutex.UnlockKey(key) }()klog.Infof("handle add/update vpc %s", key)// get latest vpc infovar (vpc, cachedVpc *kubeovnv1.Vpcerr error)cachedVpc, err = c.vpcsLister.Get(key)if err != nil {if k8serrors.IsNotFound(err) {return nil}klog.Error(err)return err}vpc = cachedVpc.DeepCopy()//TODO 对创建的vpc做一些默认值设置,并且检查vpc字段是否符合格式if err = formatVpc(vpc, c); err != nil {klog.Errorf("failed to format vpc %s: %v", key, err)return err}//TODO 给新的vpc创建一个ovn的逻辑路由器,kube-ovn的vpc对应于ovn的逻辑路由器if err = c.createVpcRouter(key); err != nil {return err}//TODO 这个应该是在vpc中指定与之相连的另一端,暂时不考虑var newPeers []stringfor _, peering := range vpc.Spec.VpcPeerings {if err = util.CheckCidrs(peering.LocalConnectIP); err != nil {klog.Errorf("invalid cidr %s", peering.LocalConnectIP)return err}newPeers = append(newPeers, peering.RemoteVpc)if err := c.OVNNbClient.CreatePeerRouterPort(vpc.Name, peering.RemoteVpc, peering.LocalConnectIP); err != nil {klog.Errorf("create peer router port for vpc %s, %v", vpc.Name, err)return err}}for _, oldPeer := range vpc.Status.VpcPeerings {if !util.ContainsString(newPeers, oldPeer) {if err = c.OVNNbClient.DeleteLogicalRouterPort(fmt.Sprintf("%s-%s", vpc.Name, oldPeer)); err != nil {klog.Errorf("delete peer router port for vpc %s, %v", vpc.Name, err)return err}}}// handle static routevar (staticExistedRoutes []*ovnnb.LogicalRouterStaticRoutestaticTargetRoutes []*kubeovnv1.StaticRoutestaticRouteMapping map[string][]*kubeovnv1.StaticRoute)//TODO 获取vpc下的静态路由条目staticExistedRoutes, err = c.OVNNbClient.ListLogicalRouterStaticRoutes(vpc.Name, nil, nil, "", nil)if err != nil {klog.Errorf("failed to get vpc %s static route list, %v", vpc.Name, err)return err}//; kind: Vpc//; apiVersion: kubeovn.io/v1//; metadata://; name: test-vpc-1//; spec://; staticRoutes://; - cidr: 0.0.0.0/0//; nextHopIP: 10.0.1.254//; policy: policyDst//; - cidr: 172.31.0.0/24//; nextHopIP: 10.0.1.253//; policy: policySrc//; routeTable: "rtb1"//TODO 构建vpc下路由表名和条目的mapstaticRouteMapping = c.getRouteTablesByVpc(vpc)//TODO 在vpc下指定的静态路由staticTargetRoutes = vpc.Spec.StaticRoutes//TODO 对于默认vpc的处理if vpc.Name == c.config.ClusterRouter {if _, ok := staticRouteMapping[util.MainRouteTable]; !ok {staticRouteMapping[util.MainRouteTable] = nil}joinSubnet, err := c.subnetsLister.Get(c.config.NodeSwitch)if err != nil {if !k8serrors.IsNotFound(err) {klog.Error("failed to get node switch subnet %s: %v", c.config.NodeSwitch)return err}}gatewayV4, gatewayV6 := util.SplitStringIP(joinSubnet.Spec.Gateway)if gatewayV4 != "" {for tabele := range staticRouteMapping {staticTargetRoutes = append(staticTargetRoutes,&kubeovnv1.StaticRoute{Policy: kubeovnv1.PolicyDst,CIDR: "0.0.0.0/0",NextHopIP: gatewayV4,RouteTable: tabele,},)}}if gatewayV6 != "" {for tabele := range staticRouteMapping {staticTargetRoutes = append(staticTargetRoutes,&kubeovnv1.StaticRoute{Policy: kubeovnv1.PolicyDst,CIDR: "::/0",NextHopIP: gatewayV6,RouteTable: tabele,},)}}if c.config.EnableEipSnat {cm, err := c.configMapsLister.ConfigMaps(c.config.ExternalGatewayConfigNS).Get(util.ExternalGatewayConfig)if err == nil {nextHop := cm.Data["external-gw-addr"]if nextHop == "" {externalSubnet, err := c.subnetsLister.Get(c.config.ExternalGatewaySwitch)if err != nil {klog.Errorf("failed to get subnet %s, %v", c.config.ExternalGatewaySwitch, err)return err}nextHop = externalSubnet.Spec.Gatewayif nextHop == "" {klog.Errorf("no available gateway address")return fmt.Errorf("no available gateway address")}}if strings.Contains(nextHop, "/") {nextHop = strings.Split(nextHop, "/")[0]}lr, err := c.OVNNbClient.GetLogicalRouter(vpc.Name, false)if err != nil {klog.Errorf("failed to get logical router %s: %v", vpc.Name, err)return err}for _, nat := range lr.Nat {info, err := c.OVNNbClient.GetNATByUUID(nat)if err != nil {klog.Errorf("failed to get nat ip info for vpc %s, %v", vpc.Name, err)return err}if info.LogicalIP != "" {for table := range staticRouteMapping {staticTargetRoutes = append(staticTargetRoutes,&kubeovnv1.StaticRoute{Policy: kubeovnv1.PolicySrc,CIDR: info.LogicalIP,NextHopIP: nextHop,RouteTable: table,},)}}}}}}//TODO 比较出vpc需要删除和增加的路由routeNeedDel, routeNeedAdd, err := diffStaticRoute(staticExistedRoutes, staticTargetRoutes)if err != nil {klog.Errorf("failed to diff vpc %s static route, %v", vpc.Name, err)return err}//TODO 删除vpc下的路由for _, item := range routeNeedDel {klog.Infof("vpc %s del static route: %+v", vpc.Name, item)policy := convertPolicy(item.Policy)if err = c.OVNNbClient.DeleteLogicalRouterStaticRoute(vpc.Name, &item.RouteTable, &policy, item.CIDR, item.NextHopIP); err != nil {klog.Errorf("del vpc %s static route failed, %v", vpc.Name, err)return err}}//TODO 添加vpc静态路由for _, item := range routeNeedAdd {if item.BfdID != "" {klog.Infof("vpc %s add static ecmp route: %+v", vpc.Name, item)if err = c.OVNNbClient.AddLogicalRouterStaticRoute(vpc.Name, item.RouteTable, convertPolicy(item.Policy), item.CIDR, &item.BfdID, item.NextHopIP,); err != nil {klog.Errorf("failed to add bfd static route to vpc %s , %v", vpc.Name, err)return err}} else {klog.Infof("vpc %s add static route: %+v", vpc.Name, item)if err = c.OVNNbClient.AddLogicalRouterStaticRoute(vpc.Name, item.RouteTable, convertPolicy(item.Policy), item.CIDR, nil, item.NextHopIP,); err != nil {klog.Errorf("failed to add normal static route to vpc %s , %v", vpc.Name, err)return err}}}// handle policy routevar (policyRouteExisted, policyRouteNeedDel, policyRouteNeedAdd []*kubeovnv1.PolicyRoutepolicyRouteLogical []*ovnnb.LogicalRouterPolicyexternalIDs = map[string]string{"vendor": util.CniTypeName})//TODO 比较出需要删除和添加的路由策略if vpc.Name == c.config.ClusterRouter {policyRouteExisted = reversePolicies(vpc.Annotations[util.VpcLastPolicies])// diff listpolicyRouteNeedDel, policyRouteNeedAdd = diffPolicyRouteWithExisted(policyRouteExisted, vpc.Spec.PolicyRoutes)} else {if vpc.Spec.PolicyRoutes == nil {// do not clean default vpc policy routesif err = c.OVNNbClient.ClearLogicalRouterPolicy(vpc.Name); err != nil {klog.Errorf("clean all vpc %s policy route failed, %v", vpc.Name, err)return err}} else {policyRouteLogical, err = c.OVNNbClient.ListLogicalRouterPolicies(vpc.Name, -1, nil)if err != nil {klog.Errorf("failed to get vpc %s policy route list, %v", vpc.Name, err)return err}// diff vpc policy routepolicyRouteNeedDel, policyRouteNeedAdd = diffPolicyRouteWithLogical(policyRouteLogical, vpc.Spec.PolicyRoutes)}}//TODO 删除VPC路由策略// delete policies non-existfor _, item := range policyRouteNeedDel {klog.Infof("delete policy route for router: %s, priority: %d, match %s", vpc.Name, item.Priority, item.Match)if err = c.OVNNbClient.DeleteLogicalRouterPolicy(vpc.Name, item.Priority, item.Match); err != nil {klog.Errorf("del vpc %s policy route failed, %v", vpc.Name, err)return err}}//TODO 添加vpc路由策略// add new policiesfor _, item := range policyRouteNeedAdd {klog.Infof("add policy route for router: %s, match %s, action %s, nexthop %s, externalID %v", c.config.ClusterRouter, item.Match, string(item.Action), item.NextHopIP, externalIDs)if err = c.OVNNbClient.AddLogicalRouterPolicy(vpc.Name, item.Priority, item.Match, string(item.Action), []string{item.NextHopIP}, externalIDs); err != nil {klog.Errorf("add policy route to vpc %s failed, %v", vpc.Name, err)return err}}vpc.Status.Router = keyvpc.Status.Standby = truevpc.Status.VpcPeerings = newPeers//TODO 这里的lb是ovn的lb吗?if c.config.EnableLb {vpcLb, err := c.addLoadBalancer(key)if err != nil {klog.Error(err)return err}vpc.Status.TCPLoadBalancer = vpcLb.TCPLoadBalancervpc.Status.TCPSessionLoadBalancer = vpcLb.TCPSessLoadBalancervpc.Status.UDPLoadBalancer = vpcLb.UDPLoadBalancervpc.Status.UDPSessionLoadBalancer = vpcLb.UDPSessLoadBalancervpc.Status.SctpLoadBalancer = vpcLb.SctpLoadBalancervpc.Status.SctpSessionLoadBalancer = vpcLb.SctpSessLoadBalancer}bytes, err := vpc.Status.Bytes()if err != nil {klog.Error(err)return err}//TODO 修改vpcvpc, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Patch(context.Background(), vpc.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")if err != nil {klog.Error(err)return err}if len(vpc.Annotations) != 0 && strings.ToLower(vpc.Annotations[util.VpcLbAnnotation]) == "on" {if err = c.createVpcLb(vpc); err != nil {return err}} else if err = c.deleteVpcLb(vpc); err != nil {return err}subnets, err := c.subnetsLister.List(labels.Everything())if err != nil {klog.Error(err)return err}for _, subnet := range subnets {if subnet.Spec.Vpc == key {c.addOrUpdateSubnetQueue.Add(subnet.Name)}}if vpc.Name != util.DefaultVpc {if cachedVpc.Spec.EnableExternal {if !cachedVpc.Status.EnableExternal {// connect vpc to default externalklog.Infof("connect external network with vpc %s", vpc.Name)if err := c.handleAddVpcExternalSubnet(key, c.config.ExternalGatewaySwitch); err != nil {klog.Errorf("failed to add default external connection for vpc %s, error %v", key, err)return err}}if vpc.Spec.EnableBfd {klog.Infof("remove normal static ecmp route for vpc %s", vpc.Name)// auto remove normal type static route, if using ecmp based bfdif err := c.reconcileCustomVpcDelNormalStaticRoute(vpc.Name); err != nil {klog.Errorf("failed to reconcile del vpc %q normal static route", vpc.Name)return err}}if !vpc.Spec.EnableBfd {// auto add normal type static route, if not use ecmp based bfdklog.Infof("add normal external static route for enable external vpc %s", vpc.Name)if err := c.reconcileCustomVpcAddNormalStaticRoute(vpc.Name); err != nil {klog.Errorf("failed to reconcile vpc %q bfd static route", vpc.Name)return err}}if cachedVpc.Spec.ExtraExternalSubnets != nil {sort.Strings(vpc.Spec.ExtraExternalSubnets)}// add external subnets only in spec and delete external subnets only in statusif !reflect.DeepEqual(vpc.Spec.ExtraExternalSubnets, vpc.Status.ExtraExternalSubnets) {for _, subnetStatus := range cachedVpc.Status.ExtraExternalSubnets {if !slices.Contains(cachedVpc.Spec.ExtraExternalSubnets, subnetStatus) {klog.Infof("delete external subnet %s connection for vpc %s", subnetStatus, vpc.Name)if err := c.handleDelVpcExternalSubnet(vpc.Name, subnetStatus); err != nil {klog.Errorf("failed to delete external subnet %s connection for vpc %s, error %v", subnetStatus, vpc.Name, err)return err}}}for _, subnetSpec := range cachedVpc.Spec.ExtraExternalSubnets {if !slices.Contains(cachedVpc.Status.ExtraExternalSubnets, subnetSpec) {klog.Infof("connect external subnet %s with vpc %s", subnetSpec, vpc.Name)if err := c.handleAddVpcExternalSubnet(key, subnetSpec); err != nil {klog.Errorf("failed to add external subnet %s connection for vpc %s, error %v", subnetSpec, key, err)return err}}}if err := c.updateVpcAddExternalStatus(key, true); err != nil {klog.Error("failed to update additional external subnets status, %v", err)return err}}}if !cachedVpc.Spec.EnableBfd && cachedVpc.Status.EnableBfd {lrpEipName := fmt.Sprintf("%s-%s", key, c.config.ExternalGatewaySwitch)if err := c.OVNNbClient.DeleteBFD(lrpEipName, ""); err != nil {klog.Error(err)return err}if err := c.handleDeleteVpcStaticRoute(key); err != nil {klog.Errorf("failed to delete bfd route for vpc %s, error %v", key, err)return err}}if !cachedVpc.Spec.EnableExternal && cachedVpc.Status.EnableExternal {// disconnect vpc to default externalif err := c.handleDelVpcExternalSubnet(key, c.config.ExternalGatewaySwitch); err != nil {klog.Errorf("failed to delete external connection for vpc %s, error %v", key, err)return err}}if cachedVpc.Status.ExtraExternalSubnets != nil && !cachedVpc.Spec.EnableExternal {// disconnect vpc to extra external subnetsfor _, subnet := range cachedVpc.Status.ExtraExternalSubnets {klog.Infof("disconnect external network %s to vpc %s", subnet, vpc.Name)if err := c.handleDelVpcExternalSubnet(key, subnet); err != nil {klog.Error(err)return err}}if err := c.updateVpcAddExternalStatus(key, false); err != nil {klog.Error("failed to update additional external subnets status, %v", err)return err}}}return nil
}
创建自定义vpc网关
kind: VpcNatGateway
apiVersion: kubeovn.io/v1
metadata:name: gw1
spec:vpc: test-vpc-1subnet: net1lanIp: 10.0.1.254externalSubnets:- ovn-vpc-external-network
-
会检查这个vpc是否存在
-
会检查这个vpc的网关是否存在
-
会检查自定义vpc的子网是否存在
-
这里的lanIp必须和上面创建自定义vpc时指定的nextHopIp一致,会在网关pod上创建一个以lanIp为ip地址的网卡,用于作为这个自定义vpc的网关内部的地址
-
externalSubnets指定了使用的外部网络,会从这个子网中分配一个ip地址作为外部网关地址
-
然后会根据上面指定的网关镜像构造一个StatefulSets并且创建自定vpc的StatefulSets对象,里面有网关pod
func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error {// create nat gw statefulsetc.vpcNatGwKeyMutex.LockKey(key)defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()klog.Infof("handle add/update vpc nat gateway %s", key)if vpcNatEnabled != "true" {return fmt.Errorf("iptables nat gw not enable")}//TODO 检查指定vpc网关是否存在gw, err := c.vpcNatGatewayLister.Get(key)if err != nil {if k8serrors.IsNotFound(err) {return nil}klog.Error(err)return err}//TODO 检查vpc网关指定的vpc是否存在if _, err := c.vpcsLister.Get(gw.Spec.Vpc); err != nil {err = fmt.Errorf("failed to get vpc '%s', err: %v", gw.Spec.Vpc, err)klog.Error(err)return err}//TODO 检查vpc内子网是否存在,会在vpc网关pod里面创建一个以这个子网中ip为网关的网卡if _, err := c.subnetsLister.Get(gw.Spec.Subnet); err != nil {err = fmt.Errorf("failed to get subnet '%s', err: %v", gw.Spec.Subnet, err)klog.Error(err)return err}// check or create statefulsetneedToCreate := falseneedToUpdate := false//TODO 判断vpc网关pod的StatefulSets是否存在,如果存在表示是更新操作,如果不存在表示需要创建网关podoldSts, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).Get(context.Background(), util.GenNatGwStsName(gw.Name), metav1.GetOptions{})if err != nil {if k8serrors.IsNotFound(err) {needToCreate = true} else {klog.Error(err)return err}}//TODO 构建vpc网关pod StatefulSets的配置文件newSts := c.genNatGwStatefulSet(gw, oldSts.DeepCopy())if !needToCreate && isVpcNatGwChanged(gw) {needToUpdate = true}switch {case needToCreate://TODO 创建vpc网关pod的StatefulSets。以这个为pod为vpc的出网网关// if pod create successfully, will add initVpcNatGatewayQueueif _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).Create(context.Background(), newSts, metav1.CreateOptions{}); err != nil {err := fmt.Errorf("failed to create statefulset '%s', err: %v", newSts.Name, err)klog.Error(err)return err}if err = c.patchNatGwStatus(key); err != nil {klog.Errorf("failed to patch nat gw sts status for nat gw %s, %v", key, err)return err}return nilcase needToUpdate:if _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).Update(context.Background(), newSts, metav1.UpdateOptions{}); err != nil {err := fmt.Errorf("failed to update statefulset '%s', err: %v", newSts.Name, err)klog.Error(err)return err}if err = c.patchNatGwStatus(key); err != nil {klog.Errorf("failed to patch nat gw sts status for nat gw %s, %v", key, err)return err}default:// check if need to change qosif gw.Spec.QoSPolicy != gw.Status.QoSPolicy {if gw.Status.QoSPolicy != "" {if err = c.execNatGwQoS(gw, gw.Status.QoSPolicy, QoSDel); err != nil {klog.Errorf("failed to add qos for nat gw %s, %v", key, err)return err}}if gw.Spec.QoSPolicy != "" {if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil {klog.Errorf("failed to del qos for nat gw %s, %v", key, err)return err}}if err := c.updateCrdNatGwLabels(key, gw.Spec.QoSPolicy); err != nil {err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err)klog.Error(err)return err}// if update qos success, will update nat gw statusif err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil {klog.Errorf("failed to patch nat gw qos status for nat gw %s, %v", key, err)return err}}}return nil
}
-
自定义vpc网关pod创建完成后会执行下面函数
func (c *Controller) handleInitVpcNatGw(key string) error {if vpcNatEnabled != "true" {return fmt.Errorf("iptables nat gw not enable")}c.vpcNatGwKeyMutex.LockKey(key)defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()klog.Infof("handle init vpc nat gateway %s", key)gw, err := c.vpcNatGatewayLister.Get(key)if err != nil {if k8serrors.IsNotFound(err) {return nil}klog.Error(err)return err}// subnet for vpc-nat-gw has been checked when create vpc-nat-gworiPod, err := c.getNatGwPod(key)if err != nil {err := fmt.Errorf("failed to get nat gw %s pod: %v", gw.Name, err)klog.Error(err)return err}pod := oriPod.DeepCopy()if pod.Status.Phase != corev1.PodRunning {time.Sleep(10 * time.Second)return fmt.Errorf("failed to init vpc nat gateway, pod is not ready")}//TODO 判断是否已经初始化了,已经初始化后直接返回if _, hasInit := pod.Annotations[util.VpcNatGatewayInitAnnotation]; hasInit {return nil}//TODO 没有初始化,开始进行初始化natGwCreatedAT = pod.CreationTimestamp.Format("2006-01-02T15:04:05")klog.V(3).Infof("nat gw pod '%s' inited at %s", key, natGwCreatedAT)//TODO 在vpc的网关pod中执行,指定nat规则,后面需要细看if err = c.execNatGwRules(pod, natGwInit, []string{fmt.Sprintf("%s,%s", c.config.ServiceClusterIPRange, pod.Annotations[util.GatewayAnnotation])}); err != nil {err = fmt.Errorf("failed to init vpc nat gateway, %v", err)klog.Error(err)return err}//TODO 在vpc的网关pod中执行,进行qosif gw.Spec.QoSPolicy != "" {if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil {klog.Errorf("failed to add qos for nat gw %s, %v", key, err)return err}}// if update qos success, will update nat gw statusif gw.Spec.QoSPolicy != gw.Status.QoSPolicy {if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil {klog.Errorf("failed to patch status for nat gw %s, %v", key, err)return err}}if err := c.updateCrdNatGwLabels(gw.Name, gw.Spec.QoSPolicy); err != nil {err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err)klog.Error(err)return err}c.updateVpcFloatingIPQueue.Add(key)c.updateVpcDnatQueue.Add(key)c.updateVpcSnatQueue.Add(key)c.updateVpcSubnetQueue.Add(key)c.updateVpcEipQueue.Add(key)pod.Annotations[util.VpcNatGatewayInitAnnotation] = "true"patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)if err != nil {klog.Error(err)return err}if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {err := fmt.Errorf("patch pod %s/%s failed %v", pod.Name, pod.Namespace, err)klog.Error(err)return err}return nil
}
给自定义vpc创建eip和snat规则
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:name: eip1
spec:v4ip: 192.168.100.230natGwDp: gw1
---
kind: IptablesSnatRule
apiVersion: kubeovn.io/v1
metadata:name: snat01
spec:eip: eip1internalCIDR: 10.1.1.0/24
-
下面代码是创建eip的过程
-
首先会判断eip是否已经存在
-
判断在EIP资源中是否指定了ip,如果没有指定则根据外部子网随机生成
-
在自定义vpc网关的pod中创建网卡,设置ip地址
func (c *Controller) handleAddIptablesEip(key string) error {if vpcNatEnabled != "true" {return fmt.Errorf("iptables nat gw not enable")}c.vpcNatGwKeyMutex.LockKey(key)defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()klog.Infof("handle add iptables eip %s", key)cachedEip, err := c.iptablesEipsLister.Get(key)if err != nil {if k8serrors.IsNotFound(err) {return nil}klog.Error(err)return err}//TODO eip资源已经ok,直接返回if cachedEip.Status.Ready && cachedEip.Status.IP != "" {// already okreturn nil}var v4ip, v6ip, mac, eipV4Cidr, v4Gw string//TODO 如果没有指定,默认是ovn-vpc-external-networkexternalNetwork := util.GetExternalNetwork(cachedEip.Spec.ExternalSubnet)//TODO ovn-vpc-external-network.kube-systemexternalProvider := fmt.Sprintf("%s.%s", externalNetwork, attachmentNs)portName := ovs.PodNameToPortName(cachedEip.Name, cachedEip.Namespace, externalProvider)if cachedEip.Spec.V4ip != "" {//TODO 创建eip时指定了ip,从ovn-vpc-external-network中分配ipif v4ip, v6ip, mac, err = c.acquireStaticEip(cachedEip.Name, cachedEip.Namespace, portName, cachedEip.Spec.V4ip, externalNetwork); err != nil {klog.Errorf("failed to acquire static eip, err: %v", err)return err}} else {//TODO 随机生成ip,从ovn-vpc-external-network中分配ip// Random allocateif v4ip, v6ip, mac, err = c.acquireEip(cachedEip.Name, cachedEip.Namespace, portName, externalNetwork); err != nil {klog.Errorf("failed to allocate eip, err: %v", err)return err}}//TODO ovn-vpc-external-network子网的的eip和掩码if eipV4Cidr, err = c.getEipV4Cidr(v4ip, externalNetwork); err != nil {klog.Errorf("failed to get eip cidr, err: %v", err)return err}//TODO ovn-vpc-external-network子网的网关if v4Gw, _, err = c.GetGwBySubnet(externalNetwork); err != nil {klog.Errorf("failed to get gw, err: %v", err)return err}//kind: IptablesEIP//apiVersion: kubeovn.io/v1// metadata:// name: eip-static//spec:// natGwDp: gw1// v4ip: 10.0.1.111//TODO 参数:vpc网关pod名字、ovn-vpc-external-network子网的网关、ovn-vpc-external-network子网的的eip和掩码//TODO 这个函数给vpc网关pod创建了网卡ipif err = c.createEipInPod(cachedEip.Spec.NatGwDp, v4Gw, eipV4Cidr); err != nil {klog.Errorf("failed to create eip '%s' in pod, %v", key, err)return err}if cachedEip.Spec.QoSPolicy != "" {if err = c.addEipQoS(cachedEip, v4ip); err != nil {klog.Errorf("failed to add qos '%s' in pod, %v", key, err)return err}}if err = c.createOrUpdateCrdEip(key, v4ip, v6ip, mac, cachedEip.Spec.NatGwDp, cachedEip.Spec.QoSPolicy, externalNetwork); err != nil {klog.Errorf("failed to update eip %s, %v", key, err)return err}return nil
}
-
下面是创建snat规则的函数
-
获取snat绑定的eip,获取需要进行snat的网段和snat需要设置成的ip地址
-
进入自定义vpc网关的pod中执行iptables规则,来设置。是直接调用的脚本,后面会列出脚本的内容
// TODO iptables snat规则的配置文件
func (c *Controller) handleAddIptablesSnatRule(key string) error {if vpcNatEnabled != "true" {return fmt.Errorf("iptables nat gw not enable")}c.vpcNatGwKeyMutex.LockKey(key)defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()klog.Infof("handle add iptables snat rule %s", key)snat, err := c.iptablesSnatRulesLister.Get(key)if err != nil {if k8serrors.IsNotFound(err) {return nil}klog.Error(err)return err}//TODO 判断 snat资源是否已经就绪,如果就绪直接退出if snat.Status.Ready && snat.Status.V4ip != "" {// already okreturn nil}klog.V(3).Infof("handle add iptables snat %s", key)//TODO 获得做snat规则的源ip地址eipName := snat.Spec.EIPif eipName == "" {return fmt.Errorf("failed to create snat rule, should set eip")}//TODO 获取eip对象eip, err := c.GetEip(eipName)if err != nil {klog.Errorf("failed to get eip, %v", err)return err}// create snat//TODO 获取需要做snat的ip范围v4Cidr, _ := util.SplitStringIP(snat.Spec.InternalCIDR)if v4Cidr == "" {// only support IPv4 snaterr = fmt.Errorf("failed to get snat v4 internal cidr, original cidr is %s", snat.Spec.InternalCIDR)return err}//TODO 进入vpc网关pod 添加snat规则if err = c.createSnatInPod(eip.Spec.NatGwDp, eip.Status.IP, v4Cidr); err != nil {klog.Errorf("failed to create snat, %v", err)return err}//TODO 修改iptbales snat对象状态,为就绪状态if err = c.patchSnatStatus(key, eip.Status.IP, eip.Spec.V6ip, eip.Spec.NatGwDp, "", true); err != nil {klog.Errorf("failed to update status for snat %s, %v", key, err)return err}//TODO 修改iptbales snat对象label信息if err = c.patchSnatLabel(key, eip); err != nil {klog.Errorf("failed to patch label for snat %s, %v", key, err)return err}if err = c.handleAddIptablesSnatFinalizer(key); err != nil {klog.Errorf("failed to handle add finalizer for snat, %v", err)return err}//TODO 修改eip对象状态if err = c.patchEipStatus(eipName, "", "", "", true); err != nil {// refresh eip natsklog.Errorf("failed to patch snat use eip %s, %v", key, err)return err}return nil
}
给自定义vpc创建fip
首先说明一下fip的作用,fip全称的floating Ip,用于将一个外网ip和ovn的内部ip一一绑定,这样ovn内部的ip可以直接通过访问fip来进行访问。
从下面的配置中可以看到其实fip就是eip和fiprule的组合而成的一个概念
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:name: eip3
spec:v4ip: 192.168.100.232natGwDp: gw1
---
kind: IptablesFIPRule
apiVersion: kubeovn.io/v1
metadata:name: fip01
spec:eip: eip3internalIp: 10.0.1.5
-
eip的创建上面已经介绍了,下面主要烤fiprule,主要有两个参数eip和internalIp,这个规则的作用就是将ovn的内部ip地址和一个外部ip地址进行一一绑定
-
对于internalIp和eip的绑定也是通过在自定义vpc网关的pod中执行iptables规则实现的,下面主要说明iptables规则
# TODO 将internalIp和eip使用iptables规则进行一一对应。
function add_floating_ip() {# make sure initedcheck_initedfor rule in $@doarr=(${rule//,/ })eip=(${arr[0]//\// })internalIp=${arr[1]}# check if already existiptables-save | grep "EXCLUSIVE_DNAT" | grep -w "\-d $eip/32" | grep "destination" && exit 0exec_cmd "iptables -t nat -A EXCLUSIVE_DNAT -d $eip -j DNAT --to-destination $internalIp"exec_cmd "iptables -t nat -A EXCLUSIVE_SNAT -s $internalIp -j SNAT --to-source $eip"done
}
可以看到主要添加了两条iptables规则,如下所示
EXCLUSIVE_DNAT 和 EXCLUSIVE_SNAT 是kube-ovn自定义的两条链
直接将EXCLUSIVE_DNAT理解为nat表下的PREROUTING链,将EXCLUSIVE_SNAT理解为POSTROUTING链即可,添加了snat和dnat规则。
iptables -t nat -A EXCLUSIVE_DNAT -d $eip -j DNAT --to-destination
iptables -t nat -A EXCLUSIVE_SNAT -s $internalIp -j SNAT --to-source $eip
我们自己也是可以实现的
eg:iptables -t nat -A PREROUTING -d $eip -j DNAT --to-destination $internalIp
iptables -t nat -A POSTROUTING -s $internalIp -j SNAT --to-source $eip
可以分析一下具体的流量走势:
-
对于自定义vpc下的一个pod需要访问互联网时,首先会将报文转发到自定义vpc网关的pod里面,在流量进入网关pod的协议栈时会首先进入到PREROUTING链,在这个链里面没有匹配到规则,然后会进行路由发现不是进入到本地的因此会进入到FORWARD链同样不会匹配到,再然后进入到POSTROUTING链,因为我们在POSTROUTING链添加了规则能够匹配上,因此会进行snat转换,将源ip设置为eip的地址,因此数据包便能够出去了。
-
对于互联网想要访问ovn内部网络的internalIp地址时,因为对外服务的ip是eip,因此外部互联网用户想要访问的肯定是eip,当流量进入到自定义vpc网关pod时,会进入到PREROUTING链,因为我们添加了dnat规则,因此能够匹配到然后将目的ip地址设置为internalIp,然后进行路由选择,判定为不是本机ip地址,因此会进入到FORWARD链,没有规则被匹配,再进入到POSTROUTING链没有被匹配,然后会将报文送入到ovn网络中