
如何快速實(shí)現(xiàn)REST API集成以優(yōu)化業(yè)務(wù)流程
Lilian Weng在其博文《LLM Powered Autonomous Agents》中提出,對(duì)基于LLM的AI Agent 做了系統(tǒng)綜述。
發(fā)送消息?
@abstractmethod
async def send(
self,
message: AgentMessage,
recipient: Agent,
reviewer: Optional[Agent] = None,
request_reply: Optional[bool] = True,
is_recovery: Optional[bool] = False,
silent: Optional[bool] = False,
is_retry_chat: bool = False,
last_speaker_name: Optional[str] = None,
) -> None:
"""Send a message to recipient agent.
Args:
message(AgentMessage): the message to be sent.
recipient(Agent): the recipient agent.
reviewer(Agent): the reviewer agent.
request_reply(bool): whether to request a reply.
is_recovery(bool): whether the message is a recovery message.
Returns:
None
"""
接收消息
@abstractmethod
async def receive(
self,
message: AgentMessage,
sender: Agent,
reviewer: Optional[Agent] = None,
request_reply: Optional[bool] = None,
silent: Optional[bool] = False,
is_recovery: Optional[bool] = False,
is_retry_chat: bool = False,
last_speaker_name: Optional[str] = None,
) -> None:
"""Receive a message from another agent.
Args:
message(AgentMessage): the received message.
sender(Agent): the sender agent.
reviewer(Agent): the reviewer agent.
request_reply(bool): whether to request a reply.
silent(bool): whether to be silent.
is_recovery(bool): whether the message is a recovery message.
Returns:
None
"""
@abstractmethod
async def generate_reply(
self,
received_message: AgentMessage,
sender: Agent,
reviewer: Optional[Agent] = None,
rely_messages: Optional[List[AgentMessage]] = None,
is_retry_chat: bool = False,
last_speaker_name: Optional[str] = None,
**kwargs,
) -> AgentMessage:
"""Generate a reply based on the received messages.
Args:
received_message(AgentMessage): the received message.
sender: sender of an Agent instance.
reviewer: reviewer of an Agent instance.
rely_messages: a list of messages received.
Returns:
AgentMessage: the generated reply. If None, no reply is generated.
"""
class Agent(ABC)class Role(ABC, BaseModel)
class ConversableAgent(Role, Agent)class AgentManager(BaseComponent)
class Team(BaseModel)class AgentMemoryFragment(MemoryFragment)
class Action(ABC, Generic[T])
class Resource(ABC, Generic[P])
agent_manage = get_agent_manager(system_app)
agent_manage.register_agent(ApiDisplayAssistantAgent)
# 資源基礎(chǔ)類
class Resource(ABC, Generic[P]):
"""Resource for the agent."""
# 數(shù)據(jù)庫資源對(duì)象
class DBResource(Resource[P], Generic[P]):
#知識(shí)資源對(duì)象(將召回對(duì)象作為資源綁定)
class RetrieverResource(Resource[ResourceParameters]):
#知識(shí)庫空間資源對(duì)象(將DBGPT的知識(shí)庫空間作為資源對(duì)象)
class KnowledgeSpaceRetrieverResource(RetrieverResource):
# 資源包(將多個(gè)資源變成一個(gè)資源包的方式綁定引用)
class ResourcePack(Resource[PackResourceParameters]):
# 內(nèi)置工具資源
class ToolPack(ResourcePack):
# 插件工具資源包,可加載Autogpt插件
class PluginToolPack(ToolPack):
class AutoGPTPluginToolPack(ToolPack):
# 內(nèi)置工具定義和使用方法
@tool(description="List the supported models in DB-GPT project.")
def list_dbgpt_support_models(
model_type: Annotated[
str, Doc("The model type, LLM(Large Language Model) and EMBEDDING).")
] = "LLM",
) -> str:
...
@tool(description="Get current host CPU status.")
def get_current_host_cpu_status() -> str:
...
@tool(
description="Baidu search and return the results as a markdown string. Please set "
"number of results not less than 8 for rich search results.",
)
def baidu_search(
query: Annotated[str, Doc("The search query.")],
num_results: Annotated[int, Doc("The number of search results to return.")] = 8,
) -> str:
...
資源定義:
llm_client = OpenAILLMClient(model_alias="gpt-3.5-turbo")
context: AgentContext = AgentContext(conv_id="test456")
agent_memory = AgentMemory()
tools = ToolPack([simple_calculator, count_directory_files])
prompt_template: PromptTemplate = prompt_service.get_template(
prompt_code=record.prompt_template
)
await ToolAssistantAgent()
.bind(context) #agent 運(yùn)行上下文 會(huì)話id、應(yīng)用名、推理參數(shù)等
.bind(LLMConfig(llm_client=llm_client)) #當(dāng)前agent使用的模型服務(wù)
.bind(agent_memory) # 綁定當(dāng)前agent的記憶對(duì)象
.bind(prompt_template) # 綁定Agent的prompt 覆蓋角色定義 暫時(shí)依賴Prompt模塊,后續(xù)改造為面向API
.bind(tools) # 綁定當(dāng)前agent要使用的資源
.build() #Agent準(zhǔn)備檢查和預(yù)加載等工作
# 默認(rèn)短期記憶 默認(rèn)使用 ShortTermMemory(buffer_size=5) 內(nèi)存隊(duì)列作為存儲(chǔ)
agent_memory = AgentMemory(gpts_memory=self.memory)
# 短期記憶
class ShortTermMemory(Memory, Generic[T])
# 長期記憶
class LongTermMemory(Memory, Generic[T])
embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)
embedding_fn = embedding_factory.create(
model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
)
vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"
Just use chroma store now
vector_store_connector = VectorStoreConnector(
vector_store_type=CFG.VECTOR_STORE_TYPE,
vector_store_config=VectorStoreConfig(
name=vstore_name, embedding_fn=embedding_fn
),
)
memory = HybridMemory[AgentMemoryFragment].from_chroma(
vstore_name=vstore_name,
embeddings=embedding_fn,
)
# 感知記憶
class SensoryMemory(Memory, Generic[T])
# 混合記憶
class HybridMemory(Memory, Generic[T])
# 增強(qiáng)短期記憶
class EnhancedShortTermMemory(ShortTermMemory[T])
self.memory.init({conv_id})
try:
# 這里開始一個(gè)Agent的對(duì)話
await user_proxy.initiate_chat(
recipient=tool_engineer,
reviewer=user_proxy,
message="Calculate the product of 10 and 99",
)
finally:
await self.memory.clear({conv_id}))
## 外部通過集體記憶對(duì)象的通道獲取Agent的對(duì)話消息,支持流式輸出
async def chat_messages(
self, conv_id: str, user_code: str = None, system_app: str = None,
):
while True:
queue = self.memory.queue(conv_id)
if not queue:
break
item = await queue.get()
if item == "[DONE]":
queue.task_done()
break
else:
yield item
await asyncio.sleep(0.005)
在實(shí)際落地的時(shí)候,復(fù)雜場景基本會(huì)拆分為多個(gè)應(yīng)用場景,或者多個(gè)分支Flow,如果是多個(gè)應(yīng)用場景的時(shí)候, 我們會(huì)在Flow的某個(gè)節(jié)點(diǎn)call起來另一個(gè)應(yīng)用(參考如下實(shí)現(xiàn)方案)
# 參考這個(gè)Aciton
class StartAppAction(Action[LinkAppInput]):
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
conv_id = kwargs.get("conv_id")
user_input = kwargs.get("user_input")
paren_agent = kwargs.get("paren_agent")
init_message_rounds = kwargs.get("init_message_rounds")
# TODO 這里放應(yīng)用啟動(dòng)前的邏輯代碼
from dbgpt.serve.agent.agents.controller import multi_agents
await multi_agents.agent_team_chat_new(
new_user_input if new_user_input else user_input,
conv_id,
gpts_app,
paren_agent.memory,
False,
link_sender=paren_agent,
app_link_start=True,
init_message_rounds=init_message_rounds,
)
return ActionOutput(
is_exe_success=True, content="", view=None, have_retry=False
)
Agent Flow的分支使用方法:
# 參考這個(gè)Action
class LinkAppAction(Action[LinkAppInput]):
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
# TODO 這里根據(jù)模型輸出解析出下一步要走到的Agent角色名稱
role = "xxxx"
# 當(dāng)前Agent返回時(shí)指定下一個(gè)發(fā)言者信息
return ActionOutput(
is_exe_success=True,
content=json.dumps(app_link_param, ensure_ascii=False),
view=await self.render_protocal.display(content=app_link_param),
next_speakers=[role],
)
self._render_protocol = VisChart()
view = await self.render_protocol.display(
chart=json.loads(model_to_json(param)), data_df=data_df
)
Agent推理使用的模型可以根據(jù)模型選擇策略來決定,多個(gè)Agent每個(gè)Agent都可以有自己的模型選擇策略。
# 基類 和接口
class LLMStrategy:
# 默認(rèn)使用當(dāng)前模型服的默認(rèn)模型
async def next_llm(self, excluded_models: Optional[List[str]] = None):
## 優(yōu)先級(jí)策略的模型選擇策略實(shí)現(xiàn)
class LLMStrategyPriority(LLMStrategy):
# 按配置優(yōu)先級(jí)進(jìn)行選擇和重試
async def next_llm(self, excluded_models: Optional[List[str]] = None) -> str:
"""Return next available llm model name."""
try:
if not excluded_models:
excluded_models = []
all_models = await self._llm_client.models()
if not self._context:
raise ValueError("No context provided for priority strategy!")
priority: List[str] = json.loads(self._context)
can_uses = self._excluded_models(all_models, excluded_models, priority)
if can_uses and len(can_uses) > 0:
return can_uses[0].model
else:
raise ValueError("No model service available!")
except Exception as e:
logger.error(f"{self.type} get next llm failed!{str(e)}")
raise ValueError(f"Failed to allocate model service,{str(e)}!")
了解了上面關(guān)于Agent 的一些基礎(chǔ)知識(shí)后,我們可以嘗試開發(fā)一個(gè)自己的Agent, 開發(fā)自己Agent時(shí)主要關(guān)注下面幾個(gè)方面就可以了。
定義當(dāng)前Agent 的身份信息,如下:
class DataScientistAgent(ConversableAgent):
"""Data Scientist Agent."""
profile: ProfileConfig = ProfileConfig(
name=DynConfig(
"Edgar",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_name",
),
role=DynConfig(
"DataScientist",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_role",
),
goal=DynConfig(
"Use correct {{dialect}} SQL to analyze and resolve user "
"input targets based on the data structure information of the "
"database given in the resource.",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_goal",
),
constraints=DynConfig(
[
"Please ensure that the output is in the required format. "
"Please ensure that each analysis only outputs one analysis "
"result SQL, including as much analysis target content as possible.",
"If there is a recent message record, pay attention to refer to "
"the answers and execution results inside when analyzing, "
"and do not generate the same wrong answer.Please check carefully "
"to make sure the correct SQL is generated. Please strictly adhere "
"to the data structure definition given. The use of non-existing "
"fields is prohibited. Be careful not to confuse fields from "
"different tables, and you can perform multi-table related queries.",
"If the data and fields that need to be analyzed in the target are in "
"different tables, it is recommended to use multi-table correlation "
"queries first, and pay attention to the correlation between multiple "
"table structures.",
"It is prohibited to construct data yourself as query conditions. "
"Only the data values given by the famous songs in the input can "
"be used as query conditions.",
"Please select an appropriate one from the supported display methods "
"for data display. If no suitable display type is found, "
"use 'response_table' as default value. Supported display types: \n"
"{{ display_type }}",
],
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_constraints",
),
desc=DynConfig(
"Use database resources to conduct data analysis, analyze SQL, and provide "
"recommended rendering methods.",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_desc",
),
)
默認(rèn)情況下Agent的推理由基類統(tǒng)一實(shí)現(xiàn),不需要做任何事情. 如果對(duì)推理 有特殊邏輯改動(dòng),可以重載如下方法:
async def thinking(
self,
messages: List[AgentMessage],
sender: Optional[Agent] = None,
prompt: Optional[str] = None,
) -> Tuple[Optional[str], Optional[str]]:
def get_or_build_agent_memory(self, conv_id: str, dbgpts_name: str) -> AgentMemory:
from dbgpt.agent.core.memory.hybrid import HybridMemory
from dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIG
from dbgpt.rag.embedding.embedding_factory import EmbeddingFactory
memory_key = f"{dbgpts_name}_{conv_id}"
if memory_key in self.agent_memory_map:
return self.agent_memory_map[memory_key]
# embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)
# embedding_fn = embedding_factory.create(
# model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
# )
# vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"
# Just use chroma store now
# vector_store_connector = VectorStoreConnector(
# vector_store_type=CFG.VECTOR_STORE_TYPE,
# vector_store_config=VectorStoreConfig(
# name=vstore_name, embedding_fn=embedding_fn
# ),
# )
# memory = HybridMemory[AgentMemoryFragment].from_chroma(
# vstore_name=vstore_name,
# embeddings=embedding_fn,
# )
agent_memory = AgentMemory(gpts_memory=self.memory)
self.agent_memory_map[memory_key] = agent_memory
return agent_memory
class SqlInput(BaseModel):
"""SQL input model."""
display_type: str = Field(
...,
description="The chart rendering method selected for SQL. If you don’t know "
"what to output, just output 'response_table' uniformly.",
)
sql: str = Field(
..., description="Executable sql generated for the current target/problem"
)
thought: str = Field(..., description="Summary of thoughts to the user")
class ChartAction(Action[SqlInput]):
"""Chart action class."""
def __init__(self, **kwargs):
"""Chart action init."""
super().__init__(**kwargs)
self._render_protocol = VisChart()
@property
def out_model_type(self):
"""Return the output model type."""
return SqlInput
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
"""Perform the action."""
try:
param: SqlInput = self._input_convert(ai_message, SqlInput)
except Exception as e:
logger.exception(f"{str(e)}! \n {ai_message}")
return ActionOutput(
is_exe_success=False,
content="Error:The answer is not output in the required format.",
)
try:
if not self.resource_need:
raise ValueError("The resource type is not found!")
if not self.render_protocol:
raise ValueError("The rendering protocol is not initialized!")
db_resources: List[DBResource] = DBResource.from_resource(self.resource)
if not db_resources:
raise ValueError("The database resource is not found!")
db = db_resources[0]
data_df = await db.query_to_df(param.sql)
view = await self.render_protocol.display(
chart=json.loads(model_to_json(param)), data_df=data_df
)
return ActionOutput(
is_exe_success=True,
content=model_to_json(param),
view=view,
resource_type=self.resource_need.value,
resource_value=db._db_name,
)
except Exception as e:
logger.exception("Check your answers, the sql run failed!")
return ActionOutput(
is_exe_success=False,
content=f"Error:Check your answers, the sql run failed!Reason:{str(e)}",
)
Action對(duì)于Agent其他參數(shù)信息的需求。如果因?yàn)锳ction邏輯相對(duì)復(fù)雜 需要使用到類似歷史消息,推理的prompt等標(biāo)準(zhǔn)接口定義外的內(nèi)容可以通過實(shí)現(xiàn)如下方法,在Agent里給Action傳入需要的參數(shù):
class XXXAgent(ConversableAgent):
...
...
# 為Action準(zhǔn)備的額外執(zhí)行參數(shù)
def prepare_act_param(self, received_message: Optional[AgentMessage], sender: Agent,
rely_messages: Optional[List[AgentMessage]] = None,
**kwargs) -> Dict[str, Any]:
historical_dialogues = kwargs.get("historical_dialogues", None)
return {"user_input": received_message.content,
"conv_id": self.agent_context.conv_id,
"paren_agent": self,
"rely_messages": rely_messages,
"historical_dialogues": historical_dialogues,
}
# 資源加載方法,此處會(huì)默認(rèn)會(huì)將資源包通過資源類的方法轉(zhuǎn)成資源輸入給LLM
async def load_resource(self, question: str, is_retry_chat: bool = False, **kwargs):
logger.info(f"DomainApi load_resource:{question}")
class XXXAction(Action[xxInput]):
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
...
return ActionOutput(
is_exe_success=False, # 提示當(dāng)前Agent進(jìn)展失敗
content=json.dumps(intent.to_dict(), ensure_ascii=False), # 問題內(nèi)容
view=intent.ask_user if intent.ask_user else ai_message, # 問題展示效果(可以配合GptVis像用戶發(fā)起類似動(dòng)態(tài)表單的消息)
have_retry=False, # 并主動(dòng)向用戶發(fā)起提問
ask_user=True
)
目前DB-GPT對(duì)于多Agent協(xié)作暫時(shí)只實(shí)現(xiàn)了自動(dòng)拆分規(guī)劃,和Flow編排, 后續(xù)會(huì)考慮ReAct動(dòng)態(tài)規(guī)劃.基于上文核心類提到的Team基礎(chǔ)類,構(gòu)建各種協(xié)作模式的管理者類,然后通過一個(gè)管理者Agent角色雇傭多個(gè)Agent來合作完成任務(wù)回答:
manager = AutoPlanChatManager()
manager = (
await manager.bind(context)
.bind(agent_memory)
.bind(llm_config)
.build()
)
manager.hire(employees)
user_proxy: UserProxyAgent = (
await UserProxyAgent().bind(context).bind(agent_memory).build()
)
await user_proxy.initiate_chat(
recipient=manager,
message=user_query,
is_retry_chat=is_retry_chat,
last_speaker_name=last_speaker_name,
message_rounds=init_message_rounds,
**ext_info,
)
class AutoPlanChatManager(ManagerAgent):
"""A chat manager agent that can manage a team chat of multiple agents."""
自動(dòng)規(guī)劃模式暫時(shí)使用了一個(gè)內(nèi)置的任務(wù)規(guī)劃Agent實(shí)現(xiàn)任務(wù)拆分和分配
class PlannerAgent(ConversableAgent):
"""Planner Agent.
class AWELBaseManager(ManagerAgent, ABC):
"""AWEL base manager."""
Agent和AWEL Flow的結(jié)合目前相對(duì)比較生硬,早期嘗試過讓Agent基礎(chǔ)算子類,但是實(shí)際情況Agent的資源綁定體系思路和算子的動(dòng)態(tài)組件再初始化的時(shí)候會(huì)沖突,所以折中后構(gòu)建了一套Agent容器算子, Flow的Agent容器算子綁定具體Agent,實(shí)際運(yùn)行的時(shí)候在容器里綁定資源的方案, 下面介紹下Agent相關(guān)容器算子和資源節(jié)點(diǎn)。
算子
## Agent相關(guān)算子
### Agent Flow觸發(fā)器,無實(shí)際邏輯,F(xiàn)low的特性必須從觸發(fā)器開始
class AgentDummyTrigger(Trigger):
### Agent算子容器,擁有一致的輸入輸出,可以實(shí)現(xiàn)Agent Flow的自由拼接
class AWELAgentOperator(
MixinLLMOperator, MapOperator[AgentGenerateContext, AgentGenerateContext]
):
## Agent Flow特性算子
### 實(shí)現(xiàn)Agent Flow分支的算子
class AgentBranchOperator(BranchOperator[AgentGenerateContext, AgentGenerateContext]):
### 實(shí)現(xiàn)Agent Flow分支合并的算子
class AgentBranchJoinOperator(BranchJoinOperator[AgentGenerateContext]):
資源
# 實(shí)際Agent在Flow里的綁定節(jié)點(diǎn)(把Agent作為Agent算子容器的資源)
class AWELAgent(BaseModel):
# Agent的綁定資源,將Agent的綁定資源作為Agent資源節(jié)點(diǎn)的資源節(jié)點(diǎn)
### Agent資源
class AWELAgentResource(AgentResource):
"""AWEL Agent Resource."""
### Agent知識(shí)庫資源
class AWELAgentKnowledgeResource(AgentResource):
### Agent的Prompt資源
class AgentPrompt(BaseModel):
### Agent的模型配置資源
class AWELAgentConfig(LLMConfig):
文章轉(zhuǎn)自微信公眾號(hào)@EosphorosAI
對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力
一鍵對(duì)比試用API 限時(shí)免費(fèi)