Lilian Weng在其博文《LLM Powered Autonomous Agents》中提出,對基于LLM的AI Agent 做了系統綜述。

2.DB-GPT實踐落地的Agent方案設計

3.源碼架構解析

1.Agent API

發送消息?

@abstractmethod
async def send(
self,
message: AgentMessage,
recipient: Agent,
reviewer: Optional[Agent] = None,
request_reply: Optional[bool] = True,
is_recovery: Optional[bool] = False,
silent: Optional[bool] = False,
is_retry_chat: bool = False,
last_speaker_name: Optional[str] = None,
) -> None:
"""Send a message to recipient agent.

Args:
message(AgentMessage): the message to be sent.
recipient(Agent): the recipient agent.
reviewer(Agent): the reviewer agent.
request_reply(bool): whether to request a reply.
is_recovery(bool): whether the message is a recovery message.

Returns:
None
"""


接收消息

@abstractmethod
async def receive(
self,
message: AgentMessage,
sender: Agent,
reviewer: Optional[Agent] = None,
request_reply: Optional[bool] = None,
silent: Optional[bool] = False,
is_recovery: Optional[bool] = False,
is_retry_chat: bool = False,
last_speaker_name: Optional[str] = None,
) -> None:
"""Receive a message from another agent.

Args:
message(AgentMessage): the received message.
sender(Agent): the sender agent.
reviewer(Agent): the reviewer agent.
request_reply(bool): whether to request a reply.
silent(bool): whether to be silent.
is_recovery(bool): whether the message is a recovery message.

Returns:
None
"""
@abstractmethod
async def generate_reply(
self,
received_message: AgentMessage,
sender: Agent,
reviewer: Optional[Agent] = None,
rely_messages: Optional[List[AgentMessage]] = None,
is_retry_chat: bool = False,
last_speaker_name: Optional[str] = None,
**kwargs,
) -> AgentMessage:
"""Generate a reply based on the received messages.

Args:
received_message(AgentMessage): the received message.
sender: sender of an Agent instance.
reviewer: reviewer of an Agent instance.
rely_messages: a list of messages received.

Returns:
AgentMessage: the generated reply. If None, no reply is generated.
"""

2.Agent 核心類

class Agent(ABC)class Role(ABC, BaseModel)

class ConversableAgent(Role, Agent)class AgentManager(BaseComponent)

class Team(BaseModel)class AgentMemoryFragment(MemoryFragment)

class Action(ABC, Generic[T])

class Resource(ABC, Generic[P])

3.Agent 注冊機制

agent_manage = get_agent_manager(system_app)

agent_manage.register_agent(ApiDisplayAssistantAgent)

4.Agent 資源綁定機制

# 資源基礎類
class Resource(ABC, Generic[P]):
"""Resource for the agent."""
# 數據庫資源對象
class DBResource(Resource[P], Generic[P]):

#知識資源對象(將召回對象作為資源綁定)
class RetrieverResource(Resource[ResourceParameters]):
#知識庫空間資源對象(將DBGPT的知識庫空間作為資源對象)
class KnowledgeSpaceRetrieverResource(RetrieverResource):

# 資源包(將多個資源變成一個資源包的方式綁定引用)
class ResourcePack(Resource[PackResourceParameters]):

# 內置工具資源
class ToolPack(ResourcePack):
# 插件工具資源包,可加載Autogpt插件
class PluginToolPack(ToolPack):
class AutoGPTPluginToolPack(ToolPack):

# 內置工具定義和使用方法
@tool(description="List the supported models in DB-GPT project.")
def list_dbgpt_support_models(
model_type: Annotated[
str, Doc("The model type, LLM(Large Language Model) and EMBEDDING).")
] = "LLM",
) -> str:
...

@tool(description="Get current host CPU status.")
def get_current_host_cpu_status() -> str:
...

@tool(
description="Baidu search and return the results as a markdown string. Please set "
"number of results not less than 8 for rich search results.",
)
def baidu_search(
query: Annotated[str, Doc("The search query.")],
num_results: Annotated[int, Doc("The number of search results to return.")] = 8,
) -> str:
...

資源定義:

llm_client = OpenAILLMClient(model_alias="gpt-3.5-turbo")
context: AgentContext = AgentContext(conv_id="test456")

agent_memory = AgentMemory()
tools = ToolPack([simple_calculator, count_directory_files])
prompt_template: PromptTemplate = prompt_service.get_template(
prompt_code=record.prompt_template
)
await ToolAssistantAgent()
.bind(context) #agent 運行上下文 會話id、應用名、推理參數等
.bind(LLMConfig(llm_client=llm_client)) #當前agent使用的模型服務
.bind(agent_memory) # 綁定當前agent的記憶對象
.bind(prompt_template) # 綁定Agent的prompt 覆蓋角色定義 暫時依賴Prompt模塊,后續改造為面向API
.bind(tools) # 綁定當前agent要使用的資源
.build() #Agent準備檢查和預加載等工作

5.Agent 記憶、消息緩存機制

# 默認短期記憶 默認使用 ShortTermMemory(buffer_size=5) 內存隊列作為存儲
agent_memory = AgentMemory(gpts_memory=self.memory)

# 短期記憶
class ShortTermMemory(Memory, Generic[T])

# 長期記憶
class LongTermMemory(Memory, Generic[T])

embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)
embedding_fn = embedding_factory.create(
model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
)
vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"
Just use chroma store now
vector_store_connector = VectorStoreConnector(
vector_store_type=CFG.VECTOR_STORE_TYPE,
vector_store_config=VectorStoreConfig(
name=vstore_name, embedding_fn=embedding_fn
),
)
memory = HybridMemory[AgentMemoryFragment].from_chroma(
vstore_name=vstore_name,
embeddings=embedding_fn,
)

# 感知記憶
class SensoryMemory(Memory, Generic[T])

# 混合記憶
class HybridMemory(Memory, Generic[T])
# 增強短期記憶
class EnhancedShortTermMemory(ShortTermMemory[T])
self.memory.init({conv_id})
try:
# 這里開始一個Agent的對話
await user_proxy.initiate_chat(
recipient=tool_engineer,
reviewer=user_proxy,
message="Calculate the product of 10 and 99",
)
finally:
await self.memory.clear({conv_id}))

## 外部通過集體記憶對象的通道獲取Agent的對話消息,支持流式輸出
async def chat_messages(
self, conv_id: str, user_code: str = None, system_app: str = None,
):
while True:
queue = self.memory.queue(conv_id)
if not queue:
break
item = await queue.get()
if item == "[DONE]":
queue.task_done()
break
else:
yield item
await asyncio.sleep(0.005)

6.Agent 意圖識別和應用鏈接機制

在實際落地的時候,復雜場景基本會拆分為多個應用場景,或者多個分支Flow,如果是多個應用場景的時候, 我們會在Flow的某個節點call起來另一個應用(參考如下實現方案)

# 參考這個Aciton
class StartAppAction(Action[LinkAppInput]):

async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
conv_id = kwargs.get("conv_id")
user_input = kwargs.get("user_input")
paren_agent = kwargs.get("paren_agent")
init_message_rounds = kwargs.get("init_message_rounds")

# TODO 這里放應用啟動前的邏輯代碼

from dbgpt.serve.agent.agents.controller import multi_agents

await multi_agents.agent_team_chat_new(
new_user_input if new_user_input else user_input,
conv_id,
gpts_app,
paren_agent.memory,
False,
link_sender=paren_agent,
app_link_start=True,
init_message_rounds=init_message_rounds,
)
return ActionOutput(
is_exe_success=True, content="", view=None, have_retry=False
)

Agent Flow的分支使用方法:

# 參考這個Action 
class LinkAppAction(Action[LinkAppInput]):
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:

# TODO 這里根據模型輸出解析出下一步要走到的Agent角色名稱
role = "xxxx"
# 當前Agent返回時指定下一個發言者信息
return ActionOutput(
is_exe_success=True,
content=json.dumps(app_link_param, ensure_ascii=False),
view=await self.render_protocal.display(content=app_link_param),
next_speakers=[role],
)

7.Agent 消息輸出展示機制

self._render_protocol = VisChart()
view = await self.render_protocol.display(
chart=json.loads(model_to_json(param)), data_df=data_df
)

8.Agent 身份定義和其他屬性特性

9.Agent 推理模型選擇策略

Agent推理使用的模型可以根據模型選擇策略來決定,多個Agent每個Agent都可以有自己的模型選擇策略。

# 基類 和接口
class LLMStrategy:
# 默認使用當前模型服的默認模型
async def next_llm(self, excluded_models: Optional[List[str]] = None):

## 優先級策略的模型選擇策略實現
class LLMStrategyPriority(LLMStrategy):
# 按配置優先級進行選擇和重試
async def next_llm(self, excluded_models: Optional[List[str]] = None) -> str:
"""Return next available llm model name."""
try:
if not excluded_models:
excluded_models = []
all_models = await self._llm_client.models()
if not self._context:
raise ValueError("No context provided for priority strategy!")
priority: List[str] = json.loads(self._context)
can_uses = self._excluded_models(all_models, excluded_models, priority)
if can_uses and len(can_uses) > 0:
return can_uses[0].model
else:
raise ValueError("No model service available!")

except Exception as e:
logger.error(f"{self.type} get next llm failed!{str(e)}")
raise ValueError(f"Failed to allocate model service,{str(e)}!")

4.Agent拓展開發

了解了上面關于Agent 的一些基礎知識后,我們可以嘗試開發一個自己的Agent, 開發自己Agent時主要關注下面幾個方面就可以了。

1.角色

定義當前Agent 的身份信息,如下:

class DataScientistAgent(ConversableAgent):
"""Data Scientist Agent."""

profile: ProfileConfig = ProfileConfig(
name=DynConfig(
"Edgar",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_name",
),
role=DynConfig(
"DataScientist",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_role",
),
goal=DynConfig(
"Use correct {{dialect}} SQL to analyze and resolve user "
"input targets based on the data structure information of the "
"database given in the resource.",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_goal",
),
constraints=DynConfig(
[
"Please ensure that the output is in the required format. "
"Please ensure that each analysis only outputs one analysis "
"result SQL, including as much analysis target content as possible.",
"If there is a recent message record, pay attention to refer to "
"the answers and execution results inside when analyzing, "
"and do not generate the same wrong answer.Please check carefully "
"to make sure the correct SQL is generated. Please strictly adhere "
"to the data structure definition given. The use of non-existing "
"fields is prohibited. Be careful not to confuse fields from "
"different tables, and you can perform multi-table related queries.",
"If the data and fields that need to be analyzed in the target are in "
"different tables, it is recommended to use multi-table correlation "
"queries first, and pay attention to the correlation between multiple "
"table structures.",
"It is prohibited to construct data yourself as query conditions. "
"Only the data values given by the famous songs in the input can "
"be used as query conditions.",
"Please select an appropriate one from the supported display methods "
"for data display. If no suitable display type is found, "
"use 'response_table' as default value. Supported display types: \n"
"{{ display_type }}",
],
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_constraints",
),
desc=DynConfig(
"Use database resources to conduct data analysis, analyze SQL, and provide "
"recommended rendering methods.",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_desc",
),
)

2.推理

默認情況下Agent的推理由基類統一實現,不需要做任何事情. 如果對推理 有特殊邏輯改動,可以重載如下方法:

async def thinking(
self,
messages: List[AgentMessage],
sender: Optional[Agent] = None,
prompt: Optional[str] = None,
) -> Tuple[Optional[str], Optional[str]]:

3.記憶

def get_or_build_agent_memory(self, conv_id: str, dbgpts_name: str) -> AgentMemory:
from dbgpt.agent.core.memory.hybrid import HybridMemory
from dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIG
from dbgpt.rag.embedding.embedding_factory import EmbeddingFactory

memory_key = f"{dbgpts_name}_{conv_id}"
if memory_key in self.agent_memory_map:
return self.agent_memory_map[memory_key]

# embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)
# embedding_fn = embedding_factory.create(
# model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
# )
# vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"
# Just use chroma store now
# vector_store_connector = VectorStoreConnector(
# vector_store_type=CFG.VECTOR_STORE_TYPE,
# vector_store_config=VectorStoreConfig(
# name=vstore_name, embedding_fn=embedding_fn
# ),
# )
# memory = HybridMemory[AgentMemoryFragment].from_chroma(
# vstore_name=vstore_name,
# embeddings=embedding_fn,
# )

agent_memory = AgentMemory(gpts_memory=self.memory)
self.agent_memory_map[memory_key] = agent_memory
return agent_memory

4.行動

class SqlInput(BaseModel):
"""SQL input model."""

display_type: str = Field(
...,
description="The chart rendering method selected for SQL. If you don’t know "
"what to output, just output 'response_table' uniformly.",
)
sql: str = Field(
..., description="Executable sql generated for the current target/problem"
)
thought: str = Field(..., description="Summary of thoughts to the user")

class ChartAction(Action[SqlInput]):
"""Chart action class."""

def __init__(self, **kwargs):
"""Chart action init."""
super().__init__(**kwargs)
self._render_protocol = VisChart()

@property
def out_model_type(self):
"""Return the output model type."""
return SqlInput

async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
"""Perform the action."""
try:
param: SqlInput = self._input_convert(ai_message, SqlInput)
except Exception as e:
logger.exception(f"{str(e)}! \n {ai_message}")
return ActionOutput(
is_exe_success=False,
content="Error:The answer is not output in the required format.",
)
try:
if not self.resource_need:
raise ValueError("The resource type is not found!")

if not self.render_protocol:
raise ValueError("The rendering protocol is not initialized!")

db_resources: List[DBResource] = DBResource.from_resource(self.resource)
if not db_resources:
raise ValueError("The database resource is not found!")

db = db_resources[0]
data_df = await db.query_to_df(param.sql)
view = await self.render_protocol.display(
chart=json.loads(model_to_json(param)), data_df=data_df
)

return ActionOutput(
is_exe_success=True,
content=model_to_json(param),
view=view,
resource_type=self.resource_need.value,
resource_value=db._db_name,
)
except Exception as e:
logger.exception("Check your answers, the sql run failed!")
return ActionOutput(
is_exe_success=False,
content=f"Error:Check your answers, the sql run failed!Reason:{str(e)}",
)

Action對于Agent其他參數信息的需求。如果因為Action邏輯相對復雜 需要使用到類似歷史消息,推理的prompt等標準接口定義外的內容可以通過實現如下方法,在Agent里給Action傳入需要的參數:

class XXXAgent(ConversableAgent):
...
...
# 為Action準備的額外執行參數
def prepare_act_param(self, received_message: Optional[AgentMessage], sender: Agent,
rely_messages: Optional[List[AgentMessage]] = None,
**kwargs) -> Dict[str, Any]:

historical_dialogues = kwargs.get("historical_dialogues", None)
return {"user_input": received_message.content,
"conv_id": self.agent_context.conv_id,
"paren_agent": self,
"rely_messages": rely_messages,
"historical_dialogues": historical_dialogues,
}

5.資源

# 資源加載方法,此處會默認會將資源包通過資源類的方法轉成資源輸入給LLM
async def load_resource(self, question: str, is_retry_chat: bool = False, **kwargs):
logger.info(f"DomainApi load_resource:{question}")

6.用戶交互和跨主題多輪對話

class XXXAction(Action[xxInput]):
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
...

return ActionOutput(
is_exe_success=False, # 提示當前Agent進展失敗
content=json.dumps(intent.to_dict(), ensure_ascii=False), # 問題內容
view=intent.ask_user if intent.ask_user else ai_message, # 問題展示效果(可以配合GptVis像用戶發起類似動態表單的消息)
have_retry=False, # 并主動向用戶發起提問
ask_user=True
)

5.多Agent協作

目前DB-GPT對于多Agent協作暫時只實現了自動拆分規劃,和Flow編排, 后續會考慮ReAct動態規劃.基于上文核心類提到的Team基礎類,構建各種協作模式的管理者類,然后通過一個管理者Agent角色雇傭多個Agent來合作完成任務回答:

manager = AutoPlanChatManager()
manager = (
await manager.bind(context)
.bind(agent_memory)
.bind(llm_config)
.build()
)
manager.hire(employees)
user_proxy: UserProxyAgent = (
await UserProxyAgent().bind(context).bind(agent_memory).build()
)
await user_proxy.initiate_chat(
recipient=manager,
message=user_query,
is_retry_chat=is_retry_chat,
last_speaker_name=last_speaker_name,
message_rounds=init_message_rounds,
**ext_info,
)

AutoPlan協作模式的Agent

class AutoPlanChatManager(ManagerAgent):
"""A chat manager agent that can manage a team chat of multiple agents."""

自動規劃模式暫時使用了一個內置的任務規劃Agent實現任務拆分和分配

class PlannerAgent(ConversableAgent):
"""Planner Agent.

AWEL協作模式的Agent

class AWELBaseManager(ManagerAgent, ABC):
"""AWEL base manager."""

Agent和AWEL Flow的結合目前相對比較生硬,早期嘗試過讓Agent基礎算子類,但是實際情況Agent的資源綁定體系思路和算子的動態組件再初始化的時候會沖突,所以折中后構建了一套Agent容器算子, Flow的Agent容器算子綁定具體Agent,實際運行的時候在容器里綁定資源的方案, 下面介紹下Agent相關容器算子和資源節點。

算子

## Agent相關算子
### Agent Flow觸發器,無實際邏輯,Flow的特性必須從觸發器開始
class AgentDummyTrigger(Trigger):

### Agent算子容器,擁有一致的輸入輸出,可以實現Agent Flow的自由拼接
class AWELAgentOperator(
MixinLLMOperator, MapOperator[AgentGenerateContext, AgentGenerateContext]
):

## Agent Flow特性算子
### 實現Agent Flow分支的算子
class AgentBranchOperator(BranchOperator[AgentGenerateContext, AgentGenerateContext]):
### 實現Agent Flow分支合并的算子
class AgentBranchJoinOperator(BranchJoinOperator[AgentGenerateContext]):

資源

# 實際Agent在Flow里的綁定節點(把Agent作為Agent算子容器的資源)
class AWELAgent(BaseModel):

# Agent的綁定資源,將Agent的綁定資源作為Agent資源節點的資源節點
### Agent資源
class AWELAgentResource(AgentResource):
"""AWEL Agent Resource."""
### Agent知識庫資源
class AWELAgentKnowledgeResource(AgentResource):
### Agent的Prompt資源
class AgentPrompt(BaseModel):
### Agent的模型配置資源
class AWELAgentConfig(LLMConfig):

文章轉自微信公眾號@EosphorosAI

上一篇:

Agent 智能體開發框架選型指南

下一篇:

Agent調研--19類Agent框架對比
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數據驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創意新穎性、情感共鳴力、商業轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費