概念
LiveKit核心概念:
- Room(房间)
- Participant(参会人)
- Track(信息流追踪)
Agent 架构图
订阅信息流
agent交互流程
客户端操作
加入房间
房间创建方式
手动
赋予用户创建房间的权限,在客户的加入并创建房间。
自动
客户的指定ws_url和token,加入指定房间。
room = LiveKit.create(appContext = applicationContext)
room.connect(wsUrl, token)
离开房间
调用 Room.disconnect()
通知 LiveKit 离开事件。如果应用程序在未通知 LiveKit 的情况下关闭,则将继续显示参与者在 Room 中 15 秒。
Swift上,当应用程序退出时,会自动调用 Room.disconnect
。
发送消息
发送方式
客户端通过LocalParticipant.publishData
API 向房间中的任何参与者发布任意数据消息。房间数据通过 WebRTC 数据通道发布到SFU;LiveKit 服务器会将该数据转发给聊天室中的一个或多个参与者。
给指定用户发消息,通过设置destinationIdentities
,它表示用户的身份。
// 发送消息
coroutineScope.launch {val data: ByteArray = //...// 发送有损消息给全员,LOSSY表示数据发送一次,无顺序保证。这对于优先考虑交付速度的实时更新来说是理想的选择。room.localParticipant.publishData(data, DataPublishReliability.LOSSY)// 发送可靠的消息给指定成员,RELIABLE表示发送数据时最多重试3次并保证顺序。适合优先保证交付而不是抵延迟的场景,例如室内聊天。val identities = listOf(Participant.Identity("alice"),Participant.Identity("bob"),)room.localParticipant.publishData(data, DataPublishReliability.RELIABLE, identities)
}// 处理接收到的消息
coroutineScope.launch {room.events.collect { event ->if(event is RoomEvent.DataReceived) {// Process data}}
}
消息大小限制
由于 SCTP 协议的限制,对大于 16 KiB 的消息使用数据通道是不切实际的,包括 LiveKit 的协议包装器。我们建议将消息大小保持在 15 KiB 以下。详细了解数据通道大小限制。
消息的topic
消息可以指定topic,在接收方通过topic进行过滤出感兴趣的消息。
发送信息流
livekit默认支持摄像头、麦克风、录屏3个流,也支持用户发布自定义流的配置。
音视频流
// Turns camera track on
room.localParticipant.setCameraEnabled(true)// Turns microphone track on
room.localParticipant.setMicrophoneEnabled(true)
录屏流
// Create an intent launcher for screen capture
// This *must* be registered prior to onCreate(), ideally as an instance val
val screenCaptureIntentLauncher = registerForActivityResult(ActivityResultContracts.StartActivityForResult()
) { result ->val resultCode = result.resultCodeval data = result.dataif (resultCode != Activity.RESULT_OK || data == null) {return@registerForActivityResult}lifecycleScope.launch {room.localParticipant.setScreenShareEnabled(true, data)}
}// When it's time to enable the screen share, perform the following
val mediaProjectionManager =getSystemService(MEDIA_PROJECTION_SERVICE) as MediaProjectionManager
screenCaptureIntentLauncher.launch(mediaProjectionManager.createScreenCaptureIntent())
自定义流配置
// Option 1: set room defaults
val options = RoomOptions(audioTrackCaptureDefaults = LocalAudioTrackOptions(noiseSuppression = true,echoCancellation = true,autoGainControl = true,highPassFilter = true,typingNoiseDetection = true,),videoTrackCaptureDefaults = LocalVideoTrackOptions(deviceId = "",position = CameraPosition.FRONT,captureParams = VideoPreset169.H1080.capture,),audioTrackPublishDefaults = AudioTrackPublishDefaults(audioBitrate = 20_000,dtx = true,),videoTrackPublishDefaults = VideoTrackPublishDefaults(videoEncoding = VideoPreset169.H1080.encoding,)
)
var room = LiveKit.create(...roomOptions = options,
)// Option 2: create tracks manually
val localParticipant = room.localParticipant
val audioTrack = localParticipant.createAudioTrack("audio")
localParticipant.publishAudioTrack(audioTrack)val videoTrack = localParticipant.createVideoTrack("video", LocalVideoTrackOptions(CameraPosition.FRONT,VideoPreset169.H1080.capture
))
localParticipant.publishVideoTrack(videoTrack)
订阅信息流
默认用户进入房间,会监听所有信息流。
coroutineScope.launch {room.events.collect { event ->when(event) {is RoomEvent.TrackSubscribed -> {// Audio tracks are automatically played.val videoTrack = event.track as? VideoTrack ?: return@collectvideoTrack.addRenderer(videoRenderer)}else -> {}}}
}
监听事件
事件分为:room事件和参与者事件。这是事件列表:
EVENT | DESCRIPTION | ROOM EVENT | PARTICIPANT EVENT |
ParticipantConnected 参与者Connected | A RemoteParticipant joins after the local participant. | ✔️ | |
RemoteParticipant 在本地参与者之后加入。 | |||
ParticipantDisconnected 参与者断开连接 | A RemoteParticipant leaves | ✔️ | |
RemoteParticipant 离开 | |||
Reconnecting 重新连接 | The connection to the server has been interrupted and it's attempting to reconnect. | ✔️ | |
与服务器的连接已中断,它正在尝试重新连接。 | |||
Reconnected 重新 | Reconnection has been successful | ✔️ | |
重新连接成功 | |||
Disconnected 断开 | Disconnected from room due to the room closing or unrecoverable failure | ✔️ | |
由于会议室关闭或无法恢复的故障而与会议室断开连接 | |||
TrackPublished 轨迹已发布 | A new track is published to room after the local participant has joined | ✔️ | ✔️ |
本地参加者加入后,新轨道将发布到聊天室 | |||
TrackUnpublished TrackUnpublished (未发布) | A RemoteParticipant has unpublished a track | ✔️ | ✔️ |
RemoteParticipant 已取消发布轨道 | |||
TrackSubscribed | The LocalParticipant has subscribed to a track | ✔️ | ✔️ |
LocalParticipant 已订阅跟踪 | |||
TrackUnsubscribed 跟踪Unsubscribed | A previously subscribed track has been unsubscribed | ✔️ | ✔️ |
之前订阅的曲目已取消订阅 | |||
TrackMuted TrackMuted (轨道静音) | A track was muted, fires for both local tracks and remote tracks | ✔️ | ✔️ |
轨道已静音,本地轨道和远程轨道均触发 | |||
TrackUnmuted TrackUnmuted (轨道未静音) | A track was unmuted, fires for both local tracks and remote tracks | ✔️ | ✔️ |
轨道已取消静音,本地轨道和远程轨道均触发 | |||
LocalTrackPublished LocalTrack已发布 | A local track was published successfully | ✔️ | ✔️ |
已成功发布本地轨道 | |||
LocalTrackUnpublished | A local track was unpublished | ✔️ | ✔️ |
本地曲目未发布 | |||
ActiveSpeakersChanged ActiveSpeakers已更改 | Current active speakers has changed | ✔️ | |
当前当前活跃的发言人已更改 | |||
IsSpeakingChanged | The current participant has changed speaking status | ✔️ | |
当前参与者已更改发言状态 | |||
ConnectionQualityChanged 连接质量已更改 | Connection quality was changed for a Participant | ✔️ | ✔️ |
参与者的连接质量已更改 | |||
ParticipantMetadataChanged | A participant's metadata was updated via server API | ✔️ | ✔️ |
参与者的元数据已通过服务器 API 更新 | |||
RoomMetadataChanged RoomMetadataChanged 的 | Metadata associated with the room has changed | ✔️ | |
与聊天室关联的元数据已更改 | |||
DataReceived 已接收数据 | Data received from another participant or server | ✔️ | ✔️ |
从其他参与者或服务器接收的数据 | |||
TrackStreamStateChanged TrackStreamStateChanged (已更改) | Indicates if a subscribed track has been paused due to bandwidth | ✔️ | ✔️ |
指示订阅的曲目是否因带宽而暂停 | |||
TrackSubscriptionPermissionChanged | One of subscribed tracks have changed track-level permissions for the current participant | ✔️ | ✔️ |
其中一个已订阅的轨道已更改当前参与者的轨道级别权限 | |||
ParticipantPermissionsChanged | When the current participant's permissions have changed | ✔️ | ✔️ |
ParticipantPermissions已更改 | 当前参与者的权限发生更改时 |
服务端操作
生成用户token
需要LiveKit服务的API_KEY和API-SECRET,通过LiveKit API生成JWT令牌。
通过登录JWT获取到用户的信息,identify=user_id+场景,name=用户昵称(默认值),room名称=场景名(user_id)
# server.py
import os
from livekit import api
from flask import Flaskapp = Flask(__name__)@app.route('/getToken')
def getToken():token = api.AccessToken(os.getenv('LIVEKIT_API_KEY'), os.getenv('LIVEKIT_API_SECRET')) \.with_identity("identity") \.with_name("my name") \.with_grants(api.VideoGrants(room_join=True,room="my-room",))return token.to_jwt()
开发环境可以通过CLI快速创建token:
livekit-cli token create --api-key devkey --api-secret secret --join --room test_room --identity test_user --valid-for 24h
token属性
基于JWT的令牌,包含用户身份、放假名称、功能、权限等。按照场景颁发token,也就是对应的房间。
聊天室权限在解码的加入令牌的 video
字段中指定。它可能包含以下一个或多个属性:
FIELD | TYPE | DESCRIPTION |
roomCreate room创建 | bool | Permission to create or delete rooms |
创建或删除聊天室的权限 | ||
roomList roomList 会议室 | bool | Permission to list available rooms |
列出可用会议室的权限 | ||
roomJoin room加入 | bool | Permission to join a room |
加入聊天室的权限 | ||
roomAdmin roomAdmin 管理员 | bool | Permission to moderate a room |
管理聊天室的权限 | ||
roomRecord roomRecord (房间记录) | bool | Permissions to use Egress service |
使用 Egress 服务的权限 | ||
ingressAdmin 入口管理员 | bool 布尔 | Permissions to use Ingress service |
Ingress 服务使用权限 | ||
room 房间 | string 字符串 | Name of the room, required if join or admin is set |
聊天室的名称,如果设置了 join 或 admin,则为必填项 | ||
canPublish 可以发布 | bool 布尔 | Allow participant to publish tracks |
允许参与者发布轨迹 | ||
canPublishData | bool 布尔 | Allow participant to publish data to the room |
允许参与者将数据发布到聊天室 | ||
canPublishSources | string[] 字符串[] | When set, only listed source can be published. (camera, microphone, screen_share, screen_share_audio) |
设置后,只能发布列出的源。(摄像头、麦克风、screen_share、screen_share_audio) | ||
canSubscribe canSubscribe 订阅 | bool 布尔 | Allow participant to subscribe to tracks |
允许参加者订阅曲目 | ||
canUpdateOwnMetadata | bool 布尔 | Allow participant to update its own metadata |
允许参与者更新自己的元数据 | ||
hidden 隐藏 | bool 布尔 | Hide participant from others in the room |
对聊天室中的其他人隐藏参与者 | ||
kind 类 | string 字符串 | Type of participant (standard, ingress, egress, sip, or agent). this field is typically set by LiveKit internals. |
参与者类型(标准、入口、出口、SIP 或代理)。此字段通常由 LiveKit 内部设置。 |
session断开操作
用户离开房间后,回话会结束,通过add_shutdown_callback回调,可以处理后续操作。例如:发送聊天结束事件。
async def entrypoint(ctx: JobContext):async def my_shutdown_hook():# save user state...ctx.add_shutdown_callback(my_shutdown_hook)
Agent操作
创建Agent服务节点
LiveKit的Agent框架现在只支持python的SDK,文档地址如下:https://docs.livekit.io/agents/quickstart/
这是官方给的demo:
import asynciofrom livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli, llm
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero# This function is the entrypoint for the agent.
async def entrypoint(ctx: JobContext):# Create an initial chat context with a system promptinitial_ctx = llm.ChatContext().append(role="system",text=("You are a voice assistant created by LiveKit. Your interface with users will be voice. ""You should use short and concise responses, and avoiding usage of unpronouncable punctuation."),)# Connect to the LiveKit room# indicating that the agent will only subscribe to audio tracksawait ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)# VoiceAssistant is a class that creates a full conversational AI agent.# See https://github.com/livekit/agents/tree/main/livekit-agents/livekit/agents/voice_assistant# for details on how it works.assistant = VoiceAssistant(vad=silero.VAD.load(),stt=deepgram.STT(),llm=openai.LLM(),tts=openai.TTS(),chat_ctx=initial_ctx,)# Start the voice assistant with the LiveKit roomassistant.start(ctx.room)await asyncio.sleep(1)# Greets the user with an initial messageawait assistant.say("Hey, how can I help you today?", allow_interruptions=True)if __name__ == "__main__":# Initialize the worker with the entrypointcli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
Agent的生命周期
- 当worker程序启动时,会通过websocket连接到LiveKit服务器,将自己注册成worker。一个worker下会有多个子进程(Agent)来处理请求。
- 当用户进入房间时,LiveKit服务器通过负载均衡选择一个worker,为用户提供服务。
- 子进程处理来自用户的消息,并给出回复。
- 当用户退出房间时,房间滚啊比,并且断开与agent的连接。
Agent内部执行流程
agent在处理请求时,包含几个节点:
- request handler:判断能否处理请求,不能请求则LiveKit会讲任务交给其他worker
- entrypoint:agent进入房间之前,执行的初始化操作
- prewarm function:agent进程启动时调用,可以执行加载模型等耗时的操作
Worker类型
opts = WorkerOptions(...# when omitted, the default is JobType.JT_ROOMworker_type=JobType.JT_ROOM,
)
JobType 枚举有两个选项:
- JT_ROOM:将为每个房间创建一个新的代理实例。
- JT_PUBLISHER:将为房间里的每个参与者创建一个新的代理实例。
Agent处理请求
处理音频流
@ctx.room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track,publication: rtc.TrackPublication,participant: rtc.RemoteParticipant,
):# 监听音频流if track.kind == rtc.TrackKind.KIND_AUDIO:audio_stream = rtc.AudioStream(track)async for event in audio_stream:do_something(event.frame)
发布音频流
发布音频涉及将流拆分为长度固定的音频帧。内部缓冲区保存 50 毫秒长的音频队列,实时发送。用于发送新帧的 capture_frame 方法是阻塞的,在缓冲区接收整个帧之前阻塞在那里。这样可以更轻松地处理中断。
为了发布音轨,需要事先确定采样率和声道数,以及每帧的长度(样本数)。下面的示例是在 10ms 长帧中以 48kHz 传输恒定的 16 位正弦波:
SAMPLE_RATE = 48000
NUM_CHANNELS = 1 # mono audio
AMPLITUDE = 2 ** 8 - 1
SAMPLES_PER_CHANNEL = 480 # 10ms at 48kHzasync def entrypoint(ctx: JobContext):await ctx.connect()source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS)track = rtc.LocalAudioTrack.create_audio_track("example-track", source)# since the agent is a participant, our audio I/O is its "microphone"options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)# ctx.agent is an alias for ctx.room.local_participantpublication = await ctx.agent.publish_track(track, options)frequency = 440async def _sinewave():audio_frame = rtc.AudioFrame.create(SAMPLE_RATE, NUM_CHANNELS, SAMPLES_PER_CHANNEL)audio_data = np.frombuffer(audio_frame.data, dtype=np.int16)time = np.arange(SAMPLES_PER_CHANNEL) / SAMPLE_RATEtotal_samples = 0while True:time = (total_samples + np.arange(SAMPLES_PER_CHANNEL)) / SAMPLE_RATEsinewave = (AMPLITUDE * np.sin(2 * np.pi * frequency * time)).astype(np.int16)np.copyto(audio_data, sinewave)# send this frame to the trackawait source.capture_frame(frame)total_samples += samples_per_channel
处理文本消息
监听data_received事件,处理用户发来的消息;通过publish_data()发送消息给用户。
@room.on("data_received")
def on_data_received(data: rtc.DataPacket):logging.info("received data from %s: %s", data.participant.identity, data.data)# string payload will be encoded to bytes with UTF-8
await room.local_participant.publish_data("my payload",reliable=True,destination_identities=["identity1", "identity2"],topic="topic1")