LiveKit的agent介绍

概念

LiveKit核心概念:

  • Room(房间)
  • Participant(参会人)
  • Track(信息流追踪)

Agent 架构图

订阅信息流

agent交互流程

客户端操作

加入房间

房间创建方式

手动

赋予用户创建房间的权限,在客户的加入并创建房间。

自动

客户的指定ws_url和token,加入指定房间。

room = LiveKit.create(appContext = applicationContext)
room.connect(wsUrl, token)

离开房间

调用 Room.disconnect() 通知 LiveKit 离开事件。如果应用程序在未通知 LiveKit 的情况下关闭,则将继续显示参与者在 Room 中 15 秒。

Swift上,当应用程序退出时,会自动调用 Room.disconnect

发送消息

发送方式

客户端通过LocalParticipant.publishData API 向房间中的任何参与者发布任意数据消息。房间数据通过 WebRTC 数据通道发布到SFU;LiveKit 服务器会将该数据转发给聊天室中的一个或多个参与者。

给指定用户发消息,通过设置destinationIdentities ,它表示用户的身份。

// 发送消息
coroutineScope.launch {val data: ByteArray = //...// 发送有损消息给全员,LOSSY表示数据发送一次,无顺序保证。这对于优先考虑交付速度的实时更新来说是理想的选择。room.localParticipant.publishData(data, DataPublishReliability.LOSSY)// 发送可靠的消息给指定成员,RELIABLE表示发送数据时最多重试3次并保证顺序。适合优先保证交付而不是抵延迟的场景,例如室内聊天。val identities = listOf(Participant.Identity("alice"),Participant.Identity("bob"),)room.localParticipant.publishData(data, DataPublishReliability.RELIABLE, identities)
}// 处理接收到的消息
coroutineScope.launch {room.events.collect { event ->if(event is RoomEvent.DataReceived) {// Process data}}
}

消息大小限制

由于 SCTP 协议的限制,对大于 16 KiB 的消息使用数据通道是不切实际的,包括 LiveKit 的协议包装器。我们建议将消息大小保持在 15 KiB 以下。详细了解数据通道大小限制。

消息的topic

消息可以指定topic,在接收方通过topic进行过滤出感兴趣的消息。

发送信息流

livekit默认支持摄像头、麦克风、录屏3个流,也支持用户发布自定义流的配置。

音视频流

// Turns camera track on
room.localParticipant.setCameraEnabled(true)// Turns microphone track on
room.localParticipant.setMicrophoneEnabled(true)

录屏流

// Create an intent launcher for screen capture
// This *must* be registered prior to onCreate(), ideally as an instance val
val screenCaptureIntentLauncher = registerForActivityResult(ActivityResultContracts.StartActivityForResult()
) { result ->val resultCode = result.resultCodeval data = result.dataif (resultCode != Activity.RESULT_OK || data == null) {return@registerForActivityResult}lifecycleScope.launch {room.localParticipant.setScreenShareEnabled(true, data)}
}// When it's time to enable the screen share, perform the following
val mediaProjectionManager =getSystemService(MEDIA_PROJECTION_SERVICE) as MediaProjectionManager
screenCaptureIntentLauncher.launch(mediaProjectionManager.createScreenCaptureIntent())

自定义流配置

// Option 1: set room defaults
val options = RoomOptions(audioTrackCaptureDefaults = LocalAudioTrackOptions(noiseSuppression = true,echoCancellation = true,autoGainControl = true,highPassFilter = true,typingNoiseDetection = true,),videoTrackCaptureDefaults = LocalVideoTrackOptions(deviceId = "",position = CameraPosition.FRONT,captureParams = VideoPreset169.H1080.capture,),audioTrackPublishDefaults = AudioTrackPublishDefaults(audioBitrate = 20_000,dtx = true,),videoTrackPublishDefaults = VideoTrackPublishDefaults(videoEncoding = VideoPreset169.H1080.encoding,)
)
var room = LiveKit.create(...roomOptions = options,
)// Option 2: create tracks manually
val localParticipant = room.localParticipant
val audioTrack = localParticipant.createAudioTrack("audio")
localParticipant.publishAudioTrack(audioTrack)val videoTrack = localParticipant.createVideoTrack("video", LocalVideoTrackOptions(CameraPosition.FRONT,VideoPreset169.H1080.capture
))
localParticipant.publishVideoTrack(videoTrack)

订阅信息流

默认用户进入房间,会监听所有信息流。

coroutineScope.launch {room.events.collect { event ->when(event) {is RoomEvent.TrackSubscribed -> {// Audio tracks are automatically played.val videoTrack = event.track as? VideoTrack ?: return@collectvideoTrack.addRenderer(videoRenderer)}else -> {}}}
}

监听事件

事件分为:room事件和参与者事件。这是事件列表:

EVENT

DESCRIPTION

ROOM EVENT

PARTICIPANT EVENT

ParticipantConnected 参与者Connected

A RemoteParticipant joins after the local participant.

✔️

RemoteParticipant 在本地参与者之后加入。

ParticipantDisconnected 参与者断开连接

A RemoteParticipant leaves

✔️

RemoteParticipant 离开

Reconnecting 重新连接

The connection to the server has been interrupted and it's attempting to reconnect.

✔️

与服务器的连接已中断,它正在尝试重新连接。

Reconnected 重新

Reconnection has been successful

✔️

重新连接成功

Disconnected 断开

Disconnected from room due to the room closing or unrecoverable failure

✔️

由于会议室关闭或无法恢复的故障而与会议室断开连接

TrackPublished 轨迹已发布

A new track is published to room after the local participant has joined

✔️

✔️

本地参加者加入后,新轨道将发布到聊天室

TrackUnpublished TrackUnpublished (未发布)

A RemoteParticipant has unpublished a track

✔️

✔️

RemoteParticipant 已取消发布轨道

TrackSubscribed

The LocalParticipant has subscribed to a track

✔️

✔️

LocalParticipant 已订阅跟踪

TrackUnsubscribed 跟踪Unsubscribed

A previously subscribed track has been unsubscribed

✔️

✔️

之前订阅的曲目已取消订阅

TrackMuted TrackMuted (轨道静音)

A track was muted, fires for both local tracks and remote tracks

✔️

✔️

轨道已静音,本地轨道和远程轨道均触发

TrackUnmuted TrackUnmuted (轨道未静音)

A track was unmuted, fires for both local tracks and remote tracks

✔️

✔️

轨道已取消静音,本地轨道和远程轨道均触发

LocalTrackPublished LocalTrack已发布

A local track was published successfully

✔️

✔️

已成功发布本地轨道

LocalTrackUnpublished

A local track was unpublished

✔️

✔️

本地曲目未发布

ActiveSpeakersChanged ActiveSpeakers已更改

Current active speakers has changed

✔️

当前当前活跃的发言人已更改

IsSpeakingChanged

The current participant has changed speaking status

✔️

当前参与者已更改发言状态

ConnectionQualityChanged 连接质量已更改

Connection quality was changed for a Participant

✔️

✔️

参与者的连接质量已更改

ParticipantMetadataChanged

A participant's metadata was updated via server API

✔️

✔️

参与者的元数据已通过服务器 API 更新

RoomMetadataChanged RoomMetadataChanged 的

Metadata associated with the room has changed

✔️

与聊天室关联的元数据已更改

DataReceived 已接收数据

Data received from another participant or server

✔️

✔️

从其他参与者或服务器接收的数据

TrackStreamStateChanged TrackStreamStateChanged (已更改)

Indicates if a subscribed track has been paused due to bandwidth

✔️

✔️

指示订阅的曲目是否因带宽而暂停

TrackSubscriptionPermissionChanged

One of subscribed tracks have changed track-level permissions for the current participant

✔️

✔️

其中一个已订阅的轨道已更改当前参与者的轨道级别权限

ParticipantPermissionsChanged

When the current participant's permissions have changed

✔️

✔️

ParticipantPermissions已更改

当前参与者的权限发生更改时

服务端操作

生成用户token

需要LiveKit服务的API_KEY和API-SECRET,通过LiveKit API生成JWT令牌。

通过登录JWT获取到用户的信息,identify=user_id+场景,name=用户昵称(默认值),room名称=场景名(user_id)

# server.py
import os
from livekit import api
from flask import Flaskapp = Flask(__name__)@app.route('/getToken')
def getToken():token = api.AccessToken(os.getenv('LIVEKIT_API_KEY'), os.getenv('LIVEKIT_API_SECRET')) \.with_identity("identity") \.with_name("my name") \.with_grants(api.VideoGrants(room_join=True,room="my-room",))return token.to_jwt()

开发环境可以通过CLI快速创建token:

livekit-cli token create   --api-key devkey --api-secret secret   --join --room test_room --identity test_user   --valid-for 24h

token属性

基于JWT的令牌,包含用户身份、放假名称、功能、权限等。按照场景颁发token,也就是对应的房间。

聊天室权限在解码的加入令牌的 video 字段中指定。它可能包含以下一个或多个属性:

FIELD

TYPE

DESCRIPTION

roomCreate room创建

bool

Permission to create or delete rooms

创建或删除聊天室的权限

roomList roomList 会议室

bool

Permission to list available rooms

列出可用会议室的权限

roomJoin room加入

bool

Permission to join a room

加入聊天室的权限

roomAdmin roomAdmin 管理员

bool

Permission to moderate a room

管理聊天室的权限

roomRecord roomRecord (房间记录)

bool

Permissions to use Egress service

使用 Egress 服务的权限

ingressAdmin 入口管理员

bool 布尔

Permissions to use Ingress service

Ingress 服务使用权限

room 房间

string 字符串

Name of the room, required if join or admin is set

聊天室的名称,如果设置了 join 或 admin,则为必填项

canPublish 可以发布

bool 布尔

Allow participant to publish tracks

允许参与者发布轨迹

canPublishData

bool 布尔

Allow participant to publish data to the room

允许参与者将数据发布到聊天室

canPublishSources

string[] 字符串[]

When set, only listed source can be published. (camera, microphone, screen_share, screen_share_audio)

设置后,只能发布列出的源。(摄像头、麦克风、screen_share、screen_share_audio)

canSubscribe canSubscribe 订阅

bool 布尔

Allow participant to subscribe to tracks

允许参加者订阅曲目

canUpdateOwnMetadata

bool 布尔

Allow participant to update its own metadata

允许参与者更新自己的元数据

hidden 隐藏

bool 布尔

Hide participant from others in the room

对聊天室中的其他人隐藏参与者

kind 类

string 字符串

Type of participant (standard, ingress, egress, sip, or agent). this field is typically set by LiveKit internals.

参与者类型(标准、入口、出口、SIP 或代理)。此字段通常由 LiveKit 内部设置。

session断开操作

用户离开房间后,回话会结束,通过add_shutdown_callback回调,可以处理后续操作。例如:发送聊天结束事件。

async def entrypoint(ctx: JobContext):async def my_shutdown_hook():# save user state...ctx.add_shutdown_callback(my_shutdown_hook)

Agent操作

创建Agent服务节点

LiveKit的Agent框架现在只支持python的SDK,文档地址如下:https://docs.livekit.io/agents/quickstart/

这是官方给的demo:

import asynciofrom livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli, llm
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero# This function is the entrypoint for the agent.
async def entrypoint(ctx: JobContext):# Create an initial chat context with a system promptinitial_ctx = llm.ChatContext().append(role="system",text=("You are a voice assistant created by LiveKit. Your interface with users will be voice. ""You should use short and concise responses, and avoiding usage of unpronouncable punctuation."),)# Connect to the LiveKit room# indicating that the agent will only subscribe to audio tracksawait ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)# VoiceAssistant is a class that creates a full conversational AI agent.# See https://github.com/livekit/agents/tree/main/livekit-agents/livekit/agents/voice_assistant# for details on how it works.assistant = VoiceAssistant(vad=silero.VAD.load(),stt=deepgram.STT(),llm=openai.LLM(),tts=openai.TTS(),chat_ctx=initial_ctx,)# Start the voice assistant with the LiveKit roomassistant.start(ctx.room)await asyncio.sleep(1)# Greets the user with an initial messageawait assistant.say("Hey, how can I help you today?", allow_interruptions=True)if __name__ == "__main__":# Initialize the worker with the entrypointcli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

Agent的生命周期

  1. 当worker程序启动时,会通过websocket连接到LiveKit服务器,将自己注册成worker。一个worker下会有多个子进程(Agent)来处理请求。
  2. 当用户进入房间时,LiveKit服务器通过负载均衡选择一个worker,为用户提供服务。
  3. 子进程处理来自用户的消息,并给出回复。
  4. 当用户退出房间时,房间滚啊比,并且断开与agent的连接。

Agent内部执行流程

agent在处理请求时,包含几个节点:

  1. request handler:判断能否处理请求,不能请求则LiveKit会讲任务交给其他worker
  2. entrypoint:agent进入房间之前,执行的初始化操作
  3. prewarm function:agent进程启动时调用,可以执行加载模型等耗时的操作

Worker类型

opts = WorkerOptions(...# when omitted, the default is JobType.JT_ROOMworker_type=JobType.JT_ROOM,
)

JobType 枚举有两个选项:

  • JT_ROOM:将为每个房间创建一个新的代理实例。
  • JT_PUBLISHER:将为房间里的每个参与者创建一个新的代理实例。

Agent处理请求

处理音频流

@ctx.room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track,publication: rtc.TrackPublication,participant: rtc.RemoteParticipant,
):# 监听音频流if track.kind == rtc.TrackKind.KIND_AUDIO:audio_stream = rtc.AudioStream(track)async for event in audio_stream:do_something(event.frame)

发布音频流

发布音频涉及将流拆分为长度固定的音频帧。内部缓冲区保存 50 毫秒长的音频队列,实时发送。用于发送新帧的 capture_frame 方法是阻塞的,在缓冲区接收整个帧之前阻塞在那里。这样可以更轻松地处理中断。

为了发布音轨,需要事先确定采样率和声道数,以及每帧的长度(样本数)。下面的示例是在 10ms 长帧中以 48kHz 传输恒定的 16 位正弦波:

SAMPLE_RATE = 48000
NUM_CHANNELS = 1 # mono audio
AMPLITUDE = 2 ** 8 - 1
SAMPLES_PER_CHANNEL = 480 # 10ms at 48kHzasync def entrypoint(ctx: JobContext):await ctx.connect()source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS)track = rtc.LocalAudioTrack.create_audio_track("example-track", source)# since the agent is a participant, our audio I/O is its "microphone"options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)# ctx.agent is an alias for ctx.room.local_participantpublication = await ctx.agent.publish_track(track, options)frequency = 440async def _sinewave():audio_frame = rtc.AudioFrame.create(SAMPLE_RATE, NUM_CHANNELS, SAMPLES_PER_CHANNEL)audio_data = np.frombuffer(audio_frame.data, dtype=np.int16)time = np.arange(SAMPLES_PER_CHANNEL) / SAMPLE_RATEtotal_samples = 0while True:time = (total_samples + np.arange(SAMPLES_PER_CHANNEL)) / SAMPLE_RATEsinewave = (AMPLITUDE * np.sin(2 * np.pi * frequency * time)).astype(np.int16)np.copyto(audio_data, sinewave)# send this frame to the trackawait source.capture_frame(frame)total_samples += samples_per_channel

处理文本消息

监听data_received事件,处理用户发来的消息;通过publish_data()发送消息给用户。

@room.on("data_received")
def on_data_received(data: rtc.DataPacket):logging.info("received data from %s: %s", data.participant.identity, data.data)# string payload will be encoded to bytes with UTF-8
await room.local_participant.publish_data("my payload",reliable=True,destination_identities=["identity1", "identity2"],topic="topic1")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/53681.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript - Api学习 Day03 (日期对象、节点操作、两种定时器、本地存储)

文章目录 一、日期对象1.1 实例化1.2 日期对象方法 二、节点操作2.1 父子兄弟节点1. 父节点查找2. 子节点查找3. 兄弟关系查找 2.2 增删节点1. 创建节点 - createElement2. 添加节点2.1 appendChild() 方法2.2 insertBefore() 方法2.3. 克隆节点 - cloneNode 3. 删除节点3.1 re…

[数据集][目标检测]抽烟检测数据集VOC+YOLO格式22559张2类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):22559 标注数量(xml文件个数):22559 标注数量(txt文件个数):22559 标…

VisualStudio环境搭建C++

Visual Studio环境搭建 说明 C程序编写中,经常需要链接头文件(.h/.hpp)和源文件(.c/.cpp)。这样的好处是:控制主文件的篇幅,让代码架构更加清晰。一般来说头文件里放的是类的申明,函数的申明,全局变量的定义等等。源…

echarts图 图例跑上面去了 不在右边

legend 中缺少 "right": "1%", "top": "center",

PyTorch----模型运维与实战

一、PyTorch是什么 PyTorch 由Facebook开源的神经网络框架,专门针对 GPU 加速的深度神经网络(DNN)编程。 二、PyTorch安装 首先确保你已经安装了GPU环境,即Anaconda、CUDA和CUDNN 随后进入Pytorch官网​​​​​​PyTorch 官…

【机器学习】高斯网络的基本概念和应用领域以及在python中的实例

引言 高斯网络(Gaussian Network)通常指的是一个概率图模型,其中所有的随机变量(或节点)都遵循高斯分布 文章目录 引言一、高斯网络(Gaussian Network)1.1 高斯过程(Gaussian Proces…

细说STM32F407通用定时器的基础知识

目录 一、通用定时器功能概述 二、细说2通道定时器的功能 1.时钟信号和触发控制器 2.时基单元工作原理 (1)计数寄存器(CNT) (2)预分频寄存器(PSC) (3)自动重载寄存器(ARR) (4)时基单元的控制位 3.捕获/比较通道 三、生成PWM波 1.生成PWM波的原理 2.与生成PWM波相关的HA…

2023下半年软考网络规划

【考情分析】2023下半年软考网络规划设计师机考考情分析-真题解析公开课视频!_哔哩哔哩_bilibili2023年11月软考网络规划设计师案例分析解析与考后复盘_哔哩哔哩_bilibili全网首发!2023年下半年软考【高级】网规真题试卷--案例分析(部分回忆版…

表观遗传系列1:DNA 甲基化以及组蛋白修饰

1. 表观遗传 表观遗传信息很多为化学修饰,包括 DNA 甲基化以及组蛋白修饰,即DNA或蛋白可以通过化学修饰添加附加信息。 DNA位于染色质(可视为微环境)中,并不是裸露的,因此DNA分子研究需要跟所处环境结合起…

Tuxera NTFS for Mac破解版下载 Tuxera NTFS for Mac2023激活码 mac电脑ntfs磁盘软件

Tuxera NTFS for Mac是一款优秀的Mac系统完全读写软件,提供Fat32、NTFS、Exfat、mac os扩展格式的转换,稳定性好,传输速度极快。Tuxera NTFS for Mac功能丰富,能修复NTFS卷、创建NTFS磁盘映像、创建NTFS分区等等。同时软件支持所有…

【EI会议末轮截稿通知】第三届电子信息技术国际学术会议(EIT 2024)

第三届电子信息技术国际学术会议(EIT 2024) The 3rd International Conference on Electronic Information Technology 重要信息 大会官网:www.ic-eit.net 三轮截稿时间:2024年9月16日23:59分(后续不再征稿&#x…

【C++登堂入室】类和对象(中)——类的6个默认成员函数

目录 一、类的6个默认成员函数 ​编辑二、构造函数 2.1 概念 2.2 特性 三、析构函数 3.1 概念 3.2 特性 四、拷贝构造函数 4.1 概念 4.2 特征 五、赋值运算符重载 5.1 运算符重载 5.2 赋值运算符重载 5.3 前置和后置重载 六、日期类的实现 七、const成员 八、…

Tranformer分布式特辑

随着大模型的发展,如何进行分布式训练也成了每位开发者必备的技能。 1. 单机训练 CPU OffloadingGradient Checkpointing 正向传播时,不存储当前节点的中间结果,在反向传播时重新计算,从而起到降低显存占用的作用 Low Precision…

跨境独立站支付收款常见问题排雷篇1.0丨出海笔记

最近小伙伴们在社群讨论挺多关于独立站支付问题的,鉴于不少朋友刚接触独立站,我整理了一些独立站支付相关的问题和解决方案,供大家参考,百度网上一堆媒体的那些软文大家就别看了,都是软广或者抄来抄去,让大…

语义分割数据集|河流湖泊分割|水灾预警

江河湖泊自然水灾检测数据集,数据集整理不易,获取地址在最后,具体信息如下: 总数:290张 类别:1类 数据集大小:约106M 数据整理不易,数据集获取地址如下: https://…

基于JAVA+SpringBoot+Vue的前后端分离企业oa管理系统

基于JAVASpringBootVue的前后端分离企业oa管理系统 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末附源码下载链接&#x1…

springboot项目中 前端浏览器访问时遇到跨域请求问题CORS怎么解决?has been blocked by CORS policy

文章目录 现象解决方案1. **全局配置 CORS**2. **使用 CrossOrigin 注解**3. **配置 Spring Security**4. **自定义 CORS 过滤器** Spring Security 6.x 及其后续版本解决方案1. 使用 SecurityFilterChain 配置 CORS2. 重要配置说明3. 在生产环境中的最佳实践 现象 前端浏览器…

【linux】进程控制(2)

3. 进程等待 1. 是什么 通过系统调用 wait/waitpid 对子进程的退出状态进行检测和回收的功能 2. 为什么 僵尸进程无法杀死,通过进程等待来杀掉它,进而解决内存泄漏的问题 (一)进程等待的方法 a. wait : 代码 wait : 等待任意一…

某仿soul欲音社交系统存在任意文件读取漏洞

1 阅读须知 技术文章仅供参考,此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等(包括但不限于)进行检测或维护参考,未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直…

医院管理|基于java的医院管理系统小程序(源码+数据库+文档)

医院管理系统小程序 目录 基于java的医院管理系统小程序 一、前言 二、系统设计 三、系统功能设计 医生信息管理 排班信息管理 科室信息管理 科室预约 病历信息 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取:…