政府门户网站建设的工作总结网络营销策划推广方案
使用langgraph框架搭建一个简易agent。
最近想学习一下agent相关知识,langgraph似乎挺好的,于是就来试一试。langgraph。看了官网,起核心思想是将agent中的角色和工具都当作是图的Node,整个agent流程通过增加Node之间的边来设定。
官网用的是claude的api,我这里用的OPENAI的api。
import json
import operator
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
from langchain.tools.render import format_tool_to_openai_function
from langgraph.prebuilt import ToolExecutor,ToolInvocation
from langchain_core.messages import FunctionMessage
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage
from langchain_core.tools import toolimport os
from langchain.chat_models import ChatOpenAI# 用于创建一个LLM大模型对象, .1版本langchain的调用方法
from langchain.schema import HumanMessage# 用于区别是user发的消息
os.environ['OPENAI_API_KEY'] = "sk-....."
model_name="gpt-3.5-turbo"
model = ChatOpenAI(model_name=model_name,temperature=0)# 自定义工具
# @tool
# def search(query: str) -> str:
# """Look up things online."""
# print(f"search: {query}")
# return "sunny"@tool
def search(query: str):"""Call to surf the web."""# This is a placeholder, but don't tell the LLM that...if "sf" in query.lower() or "san francisco" in query.lower():return "It's 60 degrees and foggy."return "It's 90 degrees and sunny."@tool
def multiply(a: int, b: int) -> int:"""Multiply two numbers."""return a * b tools = [search,multiply]tool_executor = ToolExecutor(tools)# We will set streaming=True so that we can stream tokens
# See the streaming section for more information on this.
# model = ChatOpenAI(temperature=0, streaming=True)functions = [format_tool_to_openai_function(t) for t in tools]
model = model.bind_functions(functions)class AgentState(TypedDict):messages: Annotated[Sequence[BaseMessage], operator.add]# Define the function that determines whether to continue or not
def should_continue(state):messages = state['messages']last_message = messages[-1]# If there is no function call, then we finishif "function_call" not in last_message.additional_kwargs:return "end"# Otherwise if there is, we continueelse:return "continue"# Define the function that calls the model
def call_model(state):messages = state['messages']response = model.invoke(messages)# We return a list, because this will get added to the existing listreturn {"messages": [response]}# Define the function to execute tools
def call_tool(state):messages = state['messages']# Based on the continue condition# we know the last message involves a function calllast_message = messages[-1]# We construct an ToolInvocation from the function_callaction = ToolInvocation(tool=last_message.additional_kwargs["function_call"]["name"],tool_input=json.loads(last_message.additional_kwargs["function_call"]["arguments"]),)# We call the tool_executor and get back a responseresponse = tool_executor.invoke(action)# print(f"response:{response}")# We use the response to create a FunctionMessagefunction_message = FunctionMessage(content=str(response), name=action.tool)# print(f"function_message:{function_message}")# We return a list, because this will get added to the existing listreturn {"messages": [function_message]} # Define a new graph
workflow = StateGraph(AgentState)# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", call_tool)# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.set_entry_point("agent")# We now add a conditional edge
workflow.add_conditional_edges(# First, we define the start node. We use `agent`.# This means these are the edges taken after the `agent` node is called."agent",# Next, we pass in the function that will determine which node is called next.should_continue,# Finally we pass in a mapping.# The keys are strings, and the values are other nodes.# END is a special node marking that the graph should finish.# What will happen is we will call `should_continue`, and then the output of that# will be matched against the keys in this mapping.# Based on which one it matches, that node will then be called.{# If `tools`, then we call the tool node."continue": "action",# Otherwise we finish."end": END}
)# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge('action', 'agent')# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile() #inputs = {"messages": [HumanMessage(content="what is the weather in Beijing?")]}
# inputs = {"messages": [HumanMessage(content="3乘以5等于多少,输出最终的结果")]}
response = app.invoke(# {"messages": [HumanMessage(content="3乘以5等于多少,输出最终的结果")]},{"messages": [HumanMessage(content="what is the weather in sf")]},config={"configurable": {"thread_id": 42}}
)
# print(type(response))
# print(f"last result:{response}")
# 输出如下信息:
# {'messages': [HumanMessage(content='3乘以5等于多少'), AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{\n "a": 3,\n "b": 5\n}', 'name': 'multiply'}}, response_metadata={'finish_reason': 'function_call'}, id='run-bbf18160-747f-48ac-9a81-6c1ee3b70b07-0'), FunctionMessage(content='15', name='multiply'), AIMessage(content='3乘以5等于15。', response_metadata={'finish_reason': 'stop'}, id='run-0d1403cf-4ddb-4db2-8cfa-d0965666e62d-0')]}
print(response['messages'][-1].content)
输出结果为
The weather in San Francisco is currently 60 degrees and foggy.
整体的代码分为,调用模型call_model,调用工具call_tool,定义工具,定义终止条件,以及定义workflow。
这里主要是这个workflow,workflow = StateGraph(AgentState)
,langgraph里面核心的概念是state
,这个state代表着整个环境的变量,message,状态信息,我认为是理解为一种agent的信息中心,每个agent要采取下一步的动作要根据这个信息中心来决定下一步的动作。
Limiation
1、这个只是一个很简单的agent框架,那么对于论文中那种复杂的包含rag等组件的agent,该如何使用langgraph进行搭建?
2、这里是用的是gpt的接口,如果要使用本地模型呢?如何把本地模型接入到langgraph框架里面?