如何使用client-go构建pod web shell

代码示例及原理

  • 原理是利用websocket协议实现对pod的exec登录,利用client-go构造与远程apiserver的长连接,将对pod容器的输入和pod容器的输出重定向到我们的io方法中,从而实现浏览器端的虚拟终端的效果
  • 消息体结构如下
type Connection struct {WsSocket    *websocket.Conn // 主websocket连接OutWsSocket *websocket.ConnInChan      chan *WsMessage // 输入消息管道OutChan     chan *WsMessage // 输出消息管道Mutex     sync.Mutex // 并发控制IsClosed  bool // 是否关闭CloseChan chan byte // 关闭连接管道
}
// 消息体
type WsMessage struct {MessageType int    `json:"messageType"`Data        []byte `json:"data"`
}
// terminal的行宽和列宽
type XtermMessage struct {Rows uint16 `json:"rows"`Cols uint16 `json:"cols"`
}
  • 下面需要一个handler来控制终端和接收消息,ResizeEvent用来控制终端变更的事件
type ContainerStreamHandler struct {WsConn      *ConnectionResizeEvent chan remotecommand.TerminalSize
}
  • 为了控制终端,我们需要重写TerminalSize的Next方法,这个方法是client-go定义的接口,如下所示
/*
Copyright 2017 The Kubernetes Authors.Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/package remotecommand// TerminalSize and TerminalSizeQueue was a part of k8s.io/kubernetes/pkg/util/term
// and were moved in order to decouple client from other term dependencies// TerminalSize represents the width and height of a terminal.
type TerminalSize struct {Width  uint16Height uint16
}// TerminalSizeQueue is capable of returning terminal resize events as they occur.
type TerminalSizeQueue interface {// Next returns the new terminal size after the terminal has been resized. It returns nil when// monitoring has been stopped.Next() *TerminalSize
}
  • 我们重写的Next方法如下
func (handler *ContainerStreamHandler) Next() (size *remotecommand.TerminalSize) {select {case ret := <-handler.ResizeEvent:size = &retcase <-handler.WsConn.CloseChan:return nil // 这里很重要, 具体见最后的解释}return
}
  • 当我们从ResizeEvent管道中接收到调整终端大小的事件之后,这个事件会被client-go接收到,源码如下
func (p *streamProtocolV3) handleResizes() {if p.resizeStream == nil || p.TerminalSizeQueue == nil {return}go func() {defer runtime.HandleCrash()encoder := json.NewEncoder(p.resizeStream)for {size := p.TerminalSizeQueue.Next() // 接收到我们的调整终端大小的事件if size == nil {return}if err := encoder.Encode(&size); err != nil {runtime.HandleError(err)}}}()
}
  • 最后我们给出核心的实现(只是一个大概的框架,具体细节有问题可以留言)
func ExecCommandInContainer(ctx context.Context, conn *tty.Connection, podName, namespace, containerName string) (err error) {kubeClient, err := k8s.CreateClientFromConfig([]byte(env.Kubeconfig))if err != nil {return}restConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(env.Kubeconfig))if err != nil {return}// 构造请求req := kubeClient.CoreV1().RESTClient().Post().Resource("pods").Name(podName).Namespace(namespace).SubResource("exec").VersionedParams(&corev1.PodExecOptions{Command:   []string{"/bin/sh", "-c", "export LANG=\"en_US.UTF-8\"; [ -x /bin/bash ] && exec /bin/bash || exec /bin/sh"},Container: containerName,Stdin:     true,Stdout:    true,Stderr:    true,TTY:       true,}, scheme.ParameterCodec)// 使用spdy协议对http协议进行增量升级 exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())if err != nil {return err}handler := &tty.ContainerStreamHandler{WsConn:      conn,ResizeEvent: make(chan remotecommand.TerminalSize),}// 核心函数,重定向标准输入和输出err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{Stdin:             handler,Stdout:            handler,Stderr:            handler,TerminalSizeQueue: handler,Tty:               true,})return
}
  • 整个函数的核心是StreamWithContext函数,这个函数是client-go的一个方法,接下来我们详细分析一下
// StreamWithContext opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects or the context is done.
func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {conn, streamer, err := e.newConnectionAndStream(ctx, options)if err != nil {return err}defer conn.Close()panicChan := make(chan any, 1) // panic管道errorChan := make(chan error, 1) // error管道go func() {defer func() {if p := recover(); p != nil {panicChan <- p}}()errorChan <- streamer.stream(conn)}()select {case p := <-panicChan:panic(p)case err := <-errorChan:return errcase <-ctx.Done():return ctx.Err()}
}
  • 这个方法首先初始化了一个连接,然后定义了两个管道,分别接收panic事件和error事件,核心是streamer.stream()方法,接下来我们分析一下这个方法
func (p *streamProtocolV4) stream(conn streamCreator) error {// 创建一个与apiserver的连接传输流if err := p.createStreams(conn); err != nil {return err}// now that all the streams have been created, proceed with reading & copying// 观察流中的错误errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})// 监听终端调整的事件p.handleResizes()// 将我们的标准输入拷贝到remoteStdin也就是远端的标准输入当中p.copyStdin()var wg sync.WaitGroup// 将远端的标准输出拷贝到我们的标准输出当中p.copyStdout(&wg)p.copyStderr(&wg)// we're waiting for stdout/stderr to finish copyingwg.Wait()// waits for errorStream to finish reading with an error or nilreturn <-errorChan
}
  • 整体逻辑还是很清晰的,具体实现细节看源码吧

OOM 问题

  • 这个功能上线之后,发现内存不断攀升,如下图(出现陡降是因为我重启了服务)
    在这里插入图片描述
  • 使用pprof进行问题排查,在你的main.go文件中加入下面的内容
import _ "net/http/pprof"func main(){go func() {http.ListenAndServe("localhost:6060", nil)}()
}
  • 然后你可以在http://localhost:6060/debug/pprof/中看到下面的页面
    在这里插入图片描述
  • 点击full goroutine stack dump,你会看到goroutine的堆栈存储情况,如下图
    在这里插入图片描述
  • 查看之后发现出现了很多的残留goroutine,出现在Next方法中,如下图
    在这里插入图片描述
  • 这个问题出现的原因是我们重写的Next方法,当断开连接的时候必须要主动返回一个nil,否则会残留一个go func,具体看上面的handleResizes方法中有一个for循环,必须收到一个sizenil,才能跳出此func,一开始我写的方法如下,这样写的话当浏览器退出的时候,是不会给我一个nil的终端调整的事件的
// 错误写法
func (handler *ContainerStreamHandler) Next() (size *remotecommand.TerminalSize) {ret := <-handler.ResizeEvent:size = &retreturn
}
// 正确写法
func (handler *ContainerStreamHandler) Next() (size *remotecommand.TerminalSize) {select {case ret := <-handler.ResizeEvent:size = &retcase <-handler.WsConn.CloseChan:return nil // 当发现管道关闭的时候,主动返回一个nil}return
}

有问题欢迎交流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/8714.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Meta更低的训练成本取得更好的性能: 多token预测(Multi-Token Prediction)

Meta提出了一种透过多token预测(Multi-token Prediction)来训练更好、更快的大型语言模型的方法。这篇论文的重点如下: 训练语言模型同时预测多个未来的token,可以提高样本效率(sample efficiency)。 在推论阶段,使用多token预测可以达到最高3倍的加速。 论文的主要贡献包括: …

Django中如何使用WebSocket实时更新数据?

在Django中使用WebSocket实时更新数据&#xff0c;可以通过使用第三方库Django Channels实现。Django Channels是基于WebSocket的实时通信框架&#xff0c;它使得Django应用可以处理实时的、异步的任务。 下面是使用Django Channels实时更新数据的一般步骤&#xff1a; 安装D…

ES集群数据备份与迁移

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、文章涉及概念讲解二、操作步骤1.创建 snapshot repository操作主机hadoop1分别操作从机hadoop2和hadoop3 2. 查看仓库信息3. 备份索引&#xff0c;生成快照…

【S32K UDS BootLoader】-1.1-Unified bootloader Demo和ECUBus工具的使用

<--返回「Autosar_MCAL高阶配置」专栏主页--> 目录 1 下载S32K1/S32K3/S12Z Unified bootloader Demo 1.1 在S32DS中编译S32K312_CAN_bootloader_RTD2d0工程并烧录 2 ECUBus工具使用 2.1 PCAN环境搭建 1.1.1 安装PCAN驱动 1.1.2 安装PCAN-View 2.2 下载并安装ECU…

蓝桥杯 BASIC-26 基础练习 报时助手

蓝桥杯 BASIC-26 基础练习 报时助手 问题描述 给定当前的时间&#xff0c;请用英文的读法将它读出来。 时间用时h和分m表示&#xff0c;在英文的读法中&#xff0c;读一个时间的方法是&#xff1a; 如果m为0&#xff0c;则将时读出来&#xff0c;然后加上“o’clock”&#xff…

嵌入式C语言的变量和函数存储类型

目录 概述 1 嵌入式C的数据类型 2 嵌入式C语言存储类型 2.1 auto存储类型 2.2 extern存储类型 2.3 register存储类型 2.4 static存储类型 概述 本文主要介绍嵌入式C语言中的数据变量的类型&#xff0c;包括其数据长度&#xff0c;在内存中的存储方式。还介绍了数据的存储…

C语言 | Leetcode C语言题解之第77题组合

题目&#xff1a; 题解&#xff1a; int** combine(int n, int k, int* returnSize, int** returnColumnSizes) {int* temp malloc(sizeof(int) * (k 1));int tempSize 0;int** ans malloc(sizeof(int*) * 200001);int ansSize 0;// 初始化// 将 temp 中 [0, k - 1] 每个…

Vue项目中使用echarts教程

Vue项目中使用echarts教程 步骤npm 安装ECharts引入 ECharts老版本引入方式 &#xff08;v4版本&#xff09;新版本引入方式 &#xff08;v5版本&#xff09; ECharts初体验ECharts组件化&#xff08;进阶写法&#xff09; 步骤 npm 安装ECharts npm install echarts --save引…

回答篇:测试开发高频面试题目

引用之前文章&#xff1a;《测试开发高频面试题目》 https://blog.csdn.net/qq_41214208/article/details/138193469?spm1001.2014.3001.5502 本篇文章是回答篇&#xff08;持续更新中&#xff09; 1. 什么是测试开发以及其在软件开发流程中的作用。 a. 测试开发是指测试人员或…

关于Anaconda常用的命令

常用命令 查看当前环境下的环境&#xff1a;conda env list查看当前conda的版本&#xff1b;conda --version conda create -n your_env_name pythonX.X&#xff08;2.7、3.6等)命令创建python版本为X.X。名字为your_env_name的虚拟环境。your_env_name文件可以在Anaconda安装…

收银系统源码--什么是千呼智慧新零售系统?

千呼智慧新零售系统是一套针对零售行业线上线下一体化收银系统。给门店提供线下称重收银、o2o线上商城、erp进销存、精细化会员管理、丰富营销插件等一体化解决方案。多端数据打通&#xff0c;实现线上线下一体化&#xff0c;提升门店工作效率&#xff0c;实现数字化升级&#…

前端项目加载离线的百度地图,利用工具进行切指定区域的地图影像,自定义图层getTilesUrl

百度地图在开发中我们经常使用&#xff0c;但是有些项目是需要在内网进行&#xff0c;这时候我们不得不考虑项目中一些功能需要请求外网静态资源&#xff0c;比如百度地图。只有把包下载到本地&#xff0c;才能让静态资源文件的正常的访问。 目录 获取百度地图开发秘钥 引入在…

设计模式——装饰者模式(Decorator)

装饰者模式&#xff08;Decorator Pattern&#xff09;是一种结构型设计模式&#xff0c;它允许你动态地给一个对象添加一些额外的职责&#xff0c;就增加功能来说&#xff0c;装饰者模式相比生成子类更为灵活。在装饰者模式中&#xff0c;一个装饰类会包装一个对象&#xff08…

Transformer优化加速--xformers

一、定义 1 作用 2 优化创新点 3. 使用demo 二、实现 作用 facebook 提出&#xff0c; xformers能够有效加速attention计算并降低显存。 参考&#xff1a; https://github.com/facebookresearch/xformers https://zhuanlan.zhihu.com/p/688745007 接口&#xff1a;https://f…

Java | Leetcode Java题解之第78题子集

题目&#xff1a; 题解&#xff1a; class Solution {List<Integer> t new ArrayList<Integer>();List<List<Integer>> ans new ArrayList<List<Integer>>();public List<List<Integer>> subsets(int[] nums) {dfs(0, nums…

C++容器——map和pair对组

pair&#xff08;对组&#xff09; 是一种模板类&#xff0c;允许将两个不同类型的值组合在一起。它由两个数据成员first和second组成&#xff0c;分别用来保存这两个值。 头文件 加头文件 #include<utility> 对于 C11 及以上标准&#xff0c;pair 类型可以在不包含头…

牛客网刷题 | BC81 KiKi求质数个数

目前主要分为三个专栏&#xff0c;后续还会添加&#xff1a; 专栏如下&#xff1a; C语言刷题解析 C语言系列文章 我的成长经历 感谢阅读&#xff01; 初来乍到&#xff0c;如有错误请指出&#xff0c;感谢&#xff01; 描述 KiKi知道了什么是质…

【离散数学】集合上二元关系性质判定的实现(c语言实现)

实验要求 关系矩阵的初始化和打印 我们将关系矩阵存入一个二维数组中&#xff0c;因为集合元素个数不会超过5个所以就用一个5行5列二维数组来表示。 在我们得到了集合元素个数之后我们就可以对数组进行0,1随机赋值 //初始关系矩阵 void init_matrix(int array[][5], int n) {…

python使用f-string时如何保留原始的{}

如果想在 f-string 中使用 {} 符号&#xff0c;但又不想让它被解释成 f-string 的占位符&#xff0c;可以使用两个连续的 {} 来表示一个单独的 {} 符号&#xff0c;从而使其保留原始的形式。 例如&#xff1a; name "John" age 30 text f"{{Hello {name}, …

力扣:1005. K 次取反后最大化的数组和

1005. K 次取反后最大化的数组和 给你一个整数数组 nums 和一个整数 k &#xff0c;按以下方法修改该数组&#xff1a; 选择某个下标 i 并将 nums[i] 替换为 -nums[i] 。 重复这个过程恰好 k 次。可以多次选择同一个下标 i 。 以这种方式修改数组后&#xff0c;返回数组 可能…