本文使用 Dify v0.9.2 版本,主要介绍 Dify 通过导入 DSL(或 URL)文件创建(或导出)Workflow 的操作过程及源码分析实现过程。Dify通过导入DSL文件创建Workflow过程及实现:https://z0yrmerhgi8.feishu.cn/wiki/KVIWwrcPMiOdDhk4FngcHrlgnJc
一.导入 DSL 文件创建 Workflow 过程
1.导入操作过程
点击"导入 DSL 文件"后,会弹出"导入 DSL"框,如下所示:
可以选择从文件导入,也可以选择从 URL 导入,如下所示:
2.导入操作 API
通过浏览器 Network 可看到,本子调用为 http://localhost:5001/console/api/apps/import 接口,如下所示:
该接口具体返回结果(Preview),如下所示:
该接口具体返回结果(json),如下所示:
{"id": "24a1f8d6-aad0-4099-b724-0cdd15d39dde","name": "20240930_\u6a21\u578b\u7ade\u6280\u573a\uff08\u7b80\u5355\u5e76\u884c\uff09_v1","description": "20240930_\u6a21\u578b\u7ade\u6280\u573a\uff08\u7b80\u5355\u5e76\u884c\uff09_v1","mode": "advanced-chat","icon_type": null,"icon": "\ud83e\udd16","icon_background": "#FFEAD5","icon_url": null,"enable_site": true,"enable_api": true,"model_config": null,"workflow": {"id": "c9721caf-6d43-413d-8447-136b1ba8ede0","created_by": "12ab7619-6406-4df2-b2a4-c581e033d7fb","created_at": 1729144453,"updated_by": null,"updated_at": null},"site": {"access_token": "PZ63qIcMMjL3W1Aa","code": "PZ63qIcMMjL3W1Aa","title": "20240930_\u6a21\u578b\u7ade\u6280\u573a\uff08\u7b80\u5355\u5e76\u884c\uff09_v1","icon_type": null,"icon": "\ud83e\udd16","icon_background": "#FFEAD5","icon_url": null,"description": null,"default_language": "en-US","chat_color_theme": null,"chat_color_theme_inverted": false,"customize_domain": null,"copyright": null,"privacy_policy": null,"custom_disclaimer": null,"customize_token_strategy": "not_allow","prompt_public": false,"app_base_url": "http://127.0.0.1:3000","show_workflow_steps": true,"use_icon_as_answer_icon": false,"created_by": "12ab7619-6406-4df2-b2a4-c581e033d7fb","created_at": 1729144453,"updated_by": "12ab7619-6406-4df2-b2a4-c581e033d7fb","updated_at": 1729144453},"api_base_url": "http://127.0.0.1:5001/v1","use_icon_as_answer_icon": false,"created_by": "12ab7619-6406-4df2-b2a4-c581e033d7fb","created_at": 1729144453,"updated_by": "12ab7619-6406-4df2-b2a4-c581e033d7fb","updated_at": 1729144453,"deleted_tools": []
}
3.导出操作过程
因为导出 DSL 文件比较简单就不再详细介绍,主要实现为 api.add_resource(AppExportApi, "/apps/<uuid:app_id>/export")
和 {"data": AppDslService.export_dsl(app_model=app_model, include_secret=args["include_secret"])}
。
二.导入 DSL 文件创建 Workflow 实现
0.整体实现流程
首先通过(伪)流程图方式分析"导入 DSL 文件"的函数调用流程,如下所示:
1.第 1 部分源码分析
主要是通过 DSL 文件导入,以及 URL 导入的路由,如下所示:
2.第 2 部分源码分析
最核心的是根据 AppMode 类型的不同,导入 DSL 文件,然后创建 App。如下所示:
# import dsl and create app
app_mode = AppMode.value_of(app_data.get("mode"))
if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:app = cls._import_and_create_new_workflow_based_app(tenant_id=tenant_id,app_mode=app_mode,workflow_data=import_data.get("workflow"),account=account,name=name,description=description,icon_type=icon_type,icon=icon,icon_background=icon_background,use_icon_as_answer_icon=use_icon_as_answer_icon,)
elif app_mode in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.COMPLETION}:app = cls._import_and_create_new_model_config_based_app(tenant_id=tenant_id,app_mode=app_mode,model_config_data=import_data.get("model_config"),account=account,name=name,description=description,icon_type=icon_type,icon=icon,icon_background=icon_background,use_icon_as_answer_icon=use_icon_as_answer_icon,)
3.第 3 部分源码分析
针对 AppMode.ADVANCED_CHAT, AppMode.WORKFLOW 的 AppMode 类型,导入 DSL 文件,创建 App 过程。主要是创建 App,并且初始化草稿 Workflow,如下所示:
(1)创建 App
app = cls._create_app(tenant_id=tenant_id,app_mode=app_mode,account=account,name=name,description=description,icon_type=icon_type,icon=icon,icon_background=icon_background,use_icon_as_answer_icon=use_icon_as_answer_icon,
)
首先创建 app,然后在 apps 数据表中插入记录,如下所示:
当 app 创建时,发送 app_was_created 事件,如下所示:
具体 app_was_created 事件执行逻辑,如下所示:
这段代码是一个事件处理器,用于在应用程序创建时自动创建一个已安装的应用实例。具体功能如下:
- 监听
app_was_created
事件,当新应用被创建时触发。 - 通过事件的发送者(即新创建的应用)获取相关信息(如租户 ID 和应用 ID)。
- 创建一个
InstalledApp
实例,包含应用的租户 ID、应用 ID 及应用拥有者的租户 ID。 - 将该实例添加到数据库会话中,并提交更改,以保存新创建的已安装应用记录。
这段代码实现了在应用程序创建时,自动生成一个站点记录的功能。具体概述如下:
- 监听
app_was_created
事件:当一个新的应用被创建时触发。 - 获取发送者和账户信息:
sender
是触发事件的应用,kwargs
用于获取传递的账户信息(如界面语言)。 - 创建
Site
实例:使用发送者(应用)的信息创建一个站点记录,包含应用的 ID、名称、图标信息、语言设置、自定义令牌策略等。 - 生成站点代码:调用
Site.generate_code(16)
生成 16 位长度的唯一代码。 - 记录创建者和更新者:存储应用的创建者和更新者信息。
- 保存到数据库:将创建的站点记录添加到数据库会话中,并提交保存,以确保站点信息被持久化。
(2)初始化草稿 Workflow(同步工作流)
draft_workflow = workflow_service.sync_draft_workflow(app_model=app,graph=workflow_data.get("graph", {}),features=workflow_data.get("../core/app/features", {}),unique_hash=None,account=account,environment_variables=environment_variables,conversation_variables=conversation_variables,
) # 同步草稿工作流
在 sync_draft_workflow()方法中,判断 draft workflow 是否存在,若不存在则创建,若存在则更新,并保存到 workflows 数据表中。然后触发 app_draft_workflow_was_synced 事件,如下所示:
app_draft_workflow_was_synced 事件的具体执行逻辑,如下所示:
这段代码实现了在草稿工作流同步完成后,对工具节点的管理功能。具体功能概述如下:
- 监听
app_draft_workflow_was_synced
事件:当草稿工作流同步完成时触发。 - 获取应用和节点数据:
sender
代表触发事件的应用,kwargs
中包含同步的草稿工作流节点数据。 - 遍历节点:从同步的工作流中获取所有节点,逐个检查其类型。
- 处理工具节点:如果节点类型为工具节点:尝试将节点数据转换为
ToolEntity
实例;使用工具实体的信息获取工具的运行时;创建一个ToolParameterConfigurationManager
实例,用于管理该工具的参数配置;删除该工具的参数缓存,以确保使用最新的参数配置。 - 异常处理:如果在处理过程中发生异常(如工具不存在),捕获并忽略该异常。
(3)初始化草稿 Workflow(发布工作流)
workflow_service.publish_workflow(app_model=app, account=account, draft_workflow=draft_workflow) # 发布工作流
首先创建新的 workflow,记录到数据表 workflows 中。然后触发 app_published_workflow_was_updated 事件。如下所示:
app_published_workflow_was_updated 事件的具体执行逻辑,如下所示:
这段代码实现了在应用发布工作流更新时,对工作流中数据集的管理和同步功能。具体功能概述如下:
- 监听
app_published_workflow_was_updated
事件:当应用发布的工作流被更新时触发。 - 获取应用和工作流信息:
sender
代表触发事件的应用,published_workflow
是更新后的工作流数据,并将其类型转换为Workflow
。 - 提取数据集 ID:通过
get_dataset_ids_from_workflow
函数,从更新后的工作流中提取与知识检索节点相关的数据集 ID,并返回一个包含数据集 ID 的集合。 - 查询应用数据集关联:从数据库中查询该应用当前的
AppDatasetJoin
关联数据集。 - 计算新增和移除的数据集 ID。如果应用没有任何关联数据集,则所有提取的 ID 为新增数据集;如果有现有的数据集关联,则计算出新增的数据集 ID(
added_dataset_ids
)和需要移除的数据集 ID(removed_dataset_ids
)。 - 更新数据库。对于需要移除的数据集 ID,从
AppDatasetJoin
中删除相应记录;对于需要添加的数据集 ID,创建新的AppDatasetJoin
记录,并添加到数据库。 - 提交更改:最终将所有数据集的变更(新增或删除)提交到数据库以完成更新。
4.第 4 部分源码分析
针对 AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.COMPLETION 的 AppMode 类型,导入 DSL 文件,创建 App 过程。如下所示:
(1)创建 App
app = cls._create_app(tenant_id=tenant_id,app_mode=app_mode,account=account,name=name,description=description,icon_type=icon_type,icon=icon,icon_background=icon_background,use_icon_as_answer_icon=use_icon_as_answer_icon,
)
首先创建 app,然后在 apps 数据表中插入记录。当 app 创建时,发送 app_was_created 事件。该部分代码逻辑和第 3 部分源码分析(创建 App)相同,就不再赘述。
(2)触发事件
app_model_config_was_updated.send(app, app_model_config=app_model_config)
触发 app_model_config_was_updated 事件,如下所示:
app_model_config_was_updated 事件的具体执行逻辑,如下所示:
这段代码的功能是在应用程序的模型配置更新时,根据新的模型配置同步与数据集的关联。具体功能概述如下:
- 监听
app_model_config_was_updated
事件:当应用的模型配置更新时触发。 - 获取应用和模型配置信息:
sender
代表触发事件的应用,kwargs
中的app_model_config
包含更新后的模型配置。 - 提取数据集 ID:通过
get_dataset_ids_from_model_config
函数,从模型配置中提取与工具和数据集配置相关的 ID,包括代理模式中的工具以及数据集配置中的数据集,返回一个包含数据集 ID 的集合。 - 查询现有的应用数据集关联:从数据库中查询该应用当前的
AppDatasetJoin
关联的数据集。 - 计算新增和移除的数据集 ID。如果没有现有数据集关联,则所有提取的 ID 为新增数据集;如果有现有数据集关联,则计算出新增的数据集 ID(
added_dataset_ids
)和需要移除的数据集 ID(removed_dataset_ids
)。 - 更新数据库。对于需要移除的数据集 ID,从
AppDatasetJoin
表中删除相应的关联记录;对于需要添加的数据集 ID,创建新的AppDatasetJoin
记录并添加到数据库。 - 提交数据库会话:最终将所有对数据集关联的变更(新增或删除)提交到数据库。
三.删除 Workflow 过程
1.删除操作过程
(1)删除操作
因为 workflow 本质还是 app,所以删除 workflow 本质上还是删除 app,如下所示:
(2)调用接口
删除 workflow 调用接口为 http://localhost:5001/console/api/apps/3c32956e-96bd-4e19-9c47-fd06bb4c5890,如下所示:
(3)接口日志
在 PyCharm 控制台上可看到调用接口的日志,如下所示:
(4)任务日志
在 PyCharm 控制台上可看到执行异步任务(remove_app_and_related_data_task)的日志,如下所示:
2.删除操作实现
(1)任务执行逻辑
触发 remove_app_and_related_data_task 异步任务,具体执行逻辑如下所示:
(2)任务日志记录
执行异步任务(remove_app_and_related_data_task)的过程,本质上就是从相关数据表中删除 app 及相关数据,这个过程中产生的日志记录(日志级别为 INFO),如下所示:
[2024-10-18 13:46:30,516: INFO/MainProcess] Task tasks.remove_app_and_related_data_task.remove_app_and_related_data_task[e401ada0-bd53-4626-ac6d-fd69ba58f686] received
[2024-10-18 13:46:30,517: INFO/MainProcess] Start deleting app and related data: 7bdb51d6-f390-4b23-a59e-9cdbe2e93136:3c32956e-96bd-4e19-9c47-fd06bb4c5890
[2024-10-18 13:46:30,739: INFO/MainProcess] Deleted site cf995800-1808-4109-960c-f6246cbdb26f
[2024-10-18 13:46:30,758: INFO/MainProcess] Deleted installed app e125a5c8-b4a5-462d-bc9f-254abd1ab8eb
[2024-10-18 13:46:30,784: INFO/MainProcess] Deleted workflow 592fd414-204a-48aa-9ad5-7c1ac814d85d
[2024-10-18 13:46:30,789: INFO/MainProcess] Deleted workflow 782f46e7-4ee8-4064-9941-afcda22a6397
[2024-10-18 13:46:30,821: INFO/MainProcess] Deleted conversation variables for app 3c32956e-96bd-4e19-9c47-fd06bb4c5890
[2024-10-18 13:46:30,821: INFO/MainProcess] App and related data deleted: 3c32956e-96bd-4e19-9c47-fd06bb4c5890 latency: 0.30396559997461736
[2024-10-18 13:46:30,822: INFO/MainProcess] Task tasks.remove_app_and_related_data_task.remove_app_and_related_data_task[e401ada0-bd53-4626-ac6d-fd69ba58f686] succeeded in 0.3130000000819564s: None
(3)@shared_task
装饰器
@shared_task(queue="app_deletion", bind=True, max_retries=3)
代码是使用 Celery 定义一个共享任务的装饰器。定义了一个 Celery 任务,指定了其在处理应用删除操作时的队列、上下文绑定以及最大重试次数,便于实现异步任务管理和失败处理。具体含义和功能如下:
- @shared_task:这是 Celery 的一个装饰器,用于将一个函数定义为可以被 Celery 调用的任务。它可在不同的模块中被共享和调用。
- queue=“app_deletion”:指定任务将被发送到名为
app_deletion
的队列。这样可通过队列管理任务的执行,适用于需要异步处理的操作。队列包括 dataset、generation、mail、ops_trace 和 app_deletion。 - bind=True:允许任务访问其上下文信息,特别是可以访问任务实例本身(通过
self
参数)。这对于任务重试、状态更新等操作很有用。 - max_retries=3:定义任务最大重试次数。如果任务执行失败,Celery 会自动重新调度该任务,最多重试 3 次。如果超过这个次数,任务将标记为失败。
参考文献
[1] Dify 编排节点:https://docs.dify.ai/zh-hans/guides/workflow/orchestrate-node
[2] Dify通过导入DSL文件创建Workflow过程及实现:https://z0yrmerhgi8.feishu.cn/wiki/KVIWwrcPMiOdDhk4FngcHrlgnJc