langgraph的工作流模式虽然方便直观,但习惯了普通函数式编程的数据流处理。刚开始接触时,确实容易试图用函数式编程的思维去适配它,特别是langgraph数据传递由状态字典管理,而非函数返回值,导致代码不够自然,甚至绕远路。
你如果习惯了函数式编程,建议:
- 别把
langgraph
里的节点当作纯函数,而是当作“数据流中的处理单元”。 - 用状态字典管理数据,而不是函数参数和返回值。
- 用显式的工作流结构,避免深层嵌套调用。
当然,langgraph 的优势是工作流这类的处理,对于常规的并不一定高效。以下这些例子只是为了练习 从函数式编程思维->工作流思维 转变。核心思想是
- 分步骤 -> 把每个步骤制作成一个节点。
- 数据传递以State作为载体通过edge进行流转。
- 对用户透明的那些逻辑可以放在管道中的meta_function处理。
例如计算(5+1)* 2 - 3 , 传统编程:
def step1(data):return data + 1def step2(data):return data * 2def step3(data):return data - 3result = step3(step2(step1(5))) # 先执行 step1, 再 step2, 最后 step3
print(result) # 9
LangGraph 方式:
from langgraph.graph import StateGraphclass WorkflowState:value: int # 共享状态def step1(state):state["value"] += 1return statedef step2(state):state["value"] *= 2return statedef step3(state):state["value"] -= 3return state# 定义工作流
workflow = StateGraph(WorkflowState)
workflow.add_node("step1", step1)
workflow.add_node("step2", step2)
workflow.add_node("step3", step3)# 连接节点
workflow.set_entry_point("step1")
workflow.add_edge("step1", "step2")
workflow.add_edge("step2", "step3")# 运行
app = workflow.compile()
result = app.invoke({"value": 5})
print(result["value"]) # 9
再举一个例子,小学数学应用题:
A、B两车在一条笔直的公路上同向行驶。A车在B车后方 1000 米处。A车以每秒 20 米的速度行驶,B车以每秒 16 米的速度行驶。两车同时出发,问多少秒后A车能够追上B车?
解题思路:分两步 1、计算A,B辆车的相对速度。 2、用距离除以相对速度
from langgraph.graph import StateGraph
from typing import TypedDict,Annotateddef guarantee_positive(existing_value, new_value):if new_value < 0:return new_value * -1return new_valueclass CarsState(TypedDict):distance: int # A 车与 B 车的初始距离speed_A: int # A 车速度speed_B: int # B 车速度relative_speed: Annotated[int, guarantee_positive] # 相对速度,管道中检查相对速度如果是负数则改为正数time: float # 追赶时间# 计算速度差的节点
def calculate_relative_speed(state):relative_speed = state["speed_A"] - state["speed_B"]return {**state, "relative_speed": relative_speed}# 计算追赶时间的节点
def calculate_time(state):if state["relative_speed"] == 0:time = float('inf') # 速度相同,则无法追上else:time = state["distance"] / state["relative_speed"]return {**state, "time": time}# 构建 LangGraph 计算图
graph = StateGraph(CarsState) # 注意这里要改为 dict
graph.add_node("node_a", calculate_relative_speed)
graph.add_node("node_b", calculate_time)graph.set_entry_point("node_a")
graph.add_edge("node_a", "node_b")graph = graph.compile()# 运行计算图
initial_state = {"distance": 1000,"speed_A": 20,"speed_B": 16
}
result = graph.invoke(initial_state)print(result)print(f"速度差: {result['relative_speed']} m/s")
print(f"追赶时间: {result['time']} 秒")