์ฌ๊ธฐ์์๋ LangGraph๋ฅผ ์ด์ฉํ์ฌ ๊ธฐ๋ณธ RAG๋ฅผ ๊ตฌํํ๊ณ , reflection๊ณผ query transformation์ ์ด์ฉํ์ฌ RAG์ ์ฑ๋ฅ์ ํฅ์์ํค๋ ๋ฐฉ๋ฒ์ ๋น๊ตํ์ฌ ์ค๋ช ํฉ๋๋ค. RAG๋ฅผ ์ด์ฉํจ์ผ๋ก์จ ํ์ํ ์์ ์ ์ ์ ํ ๋น์ฉ์ผ๋ก ๊ธฐ์ ์ ๋ฐ์ดํฐ๋ฅผ ํ์ฉํ ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ฐ๋ฐํ ์ ์์ต๋๋ค. ํ์ง๋ง, ์์ฐ์ผ๋ก ์ง์๋๋ ์ฌ์ฉ์์ ์์ฒญ์ผ๋ก๋ถํฐ ์ฌ์ฉ์์ ์๋๋ฅผ ์ ํํ ํ์ ํ๊ณ , ๊ด๋ จ๋ ๋ฌธ์๋ค๋ก ๋ถํฐ ๊ผญ ํ์ํ ๋ฌธ์๋ง์ ์ ํํ๊ณ , ํ๊ตญ์ด์ ์์ด ๋ฌธ์๋ฅผ ๋ชจ๋ ์กฐํํ๋ ค๋ฉด ๋ค์ํ ๋ ธ๋ ฅ์ด ํ์ํฉ๋๋ค.
Reflection๊ณผ transformation์ ์ด์ฉํ์์ ๋์ activity diagram์ ์๋์ ๊ฐ์ต๋๋ค. ์ฌ๊ธฐ์์ "(a) RAG with reflection"์ RAG๋ฅผ ์กฐํํ์ฌ ์ป์ ๋ฌธ์๋ค์ ์ด์ฉํ์ฌ ๋ต๋ณ์ ๊ตฌํ ํ์, reflection์ ํตํ์ฌ ๋ต๋ณ์์ ๊ฐ์ ํ์ฌ์ผ ํ ๋ชฉ๋ก ๋ฐ ์ถ๊ฐ๋ก ๊ฒ์์ ์ํํ์ฌ ์ป์ ๋ฌธ์๋ค๋ก ํฅ์๋ ๋ต๋ณ์ ์์ฑํฉ๋๋ค. "(b) RAG with Transformation"์ ์ฌ์ฉ์์ ์ง๋ฌธ์ ๊ฐฑ์ (rewrite)ํ ํ์ ์ถ๊ฐ์ ์ผ๋ก ๊ฒ์ํ ์ง๋ฌธ๋ค์ ์์ฑ(decompose)ํ์ฌ RAG๋ฅผ ์กฐํํฉ๋๋ค.
Reflection์ ์์ฑํ ๋ต๋ณ์ผ๋ก ๋ถํฐ ์ถ๊ฐ ์ง๋ฌธ์ ์ถ์ถํ์ฌ RAG๋ฅผ ์กฐํํ๋ฏ๋ก, ์ฒ์ ๋ต๋ณ๋ณด๋ค ๋ ๋ง์ ์ ๋ณด๋ฅผ ํ์ฉํ ์ ์์ต๋๋ค. ํ์ง๋ง, RAG๋ฅผ ํตํด ์ป์ ๋ต๋ณ์ ๊ฐฑ์ ํ๋ ๊ณผ์ ์์ ๋ค์์ ์๋ก์ด ์ถ๊ฐ ์ง๋ฌธ์ ์์ฑํ์ฌ ์กฐํํ๋ฏ๋ก, RAG ์กฐํ ๋น์ฉ์ด ๋ํญ ๋์ด๋๊ณ , ์ฒ๋ฆฌ ์๊ฐ ์ฆ๊ฐ๊ฐ ๋ถ๊ฐํผํฉ๋๋ค.
Transformation์ RAG๋ฅผ ์กฐํํ๊ธฐ ์ํด ์ง๋ฌธ์ ๋ช ํํ ํ๊ณ ๊ด๋ จ๋ ์ธ๋ถ ์ง๋ฌธ์ ๋ฏธ๋ฆฌ ์์ฑํ์ฌ ๊ฒ์ํ๋ฏ๋ก ์ฌ์ฉ์์ ์ง๋ฌธ๊ณผ ์ข๋ ๊ฐ๊น์ด ๋ฌธ์๋ค์ ๊ฒ์ํ์ฌ ํ์ฉํ ์ ์์ต๋๋ค. Query tansformation์ ์ง๋ฌธ(query)์ด ์งง์ ๊ฒฝ์ฐ์๋ ์ง๋ฌธ์ rewriteํ๊ฑฐ๋ decomposeํ๋ ํจ๊ณผ๊ฐ ๋์ง ์์ผ๋ฉฐ, chatbot๊ฐ์ ์ ํ๋ฆฌ์ผ์ด์ ์์๋ ์ด์ history๋ฅผ ์ด์ฉํด ์ง๋ฌธ์ rephrase์ ์ํํ๋ฏ๋ก query transformation์ ํตํด ์ง๋ฌธ์ ๋ช ํํ๊ฒ(rewrite) ํ๋ ๊ณผ์ ์ด ์ค๋ณต ๋์์ผ ์ ์์ต๋๋ค.
์ฌ๊ธฐ์ ๊ตฌํ๋ architecture๋ ์๋์ ๊ฐ์ต๋๋ค. ์ ์ง๋ณด์ ๋ฐ ๋ณํํ๋ ํธ๋ํฝ์ ๊ฐ์ ์ด ์๋ serverless architecture๋ฅผ ์ฌ์ฉํ์๊ณ , RAG๋ก๋ Amazon Bedrock Knowledge Base๋ฅผ ํ์ฉํ์์ต๋๋ค.
๊ธฐ๋ณธ RAG๋ฅผ ๊ตฌํํ๊ธฐ ์ํ activity diagram ์๋์ ๊ฐ์ต๋๋ค. Retrieve ๋ ธ๋์์ RAG๋ก ์ฌ์ฉ์์ ์ง๋ฌธ์ ์ ๋ฌํ์ฌ ๊ด๋ จ๋ ๋ฌธ์(relevant document)๋ค์ ๊ฐ์ ธ์ต๋๋ค. RAG์ ์ง๋ฌธ๊ณผ ๊ด๋ จ๋ ๋ฌธ์๋ค์ด ์๋ ๊ฒฝ์ฐ์๋ ๊ด๋ จ๋๊ฐ ๋จ์ด์ง๋ ๋ฌธ์๋ค์ด ์ ํ๋ ์ ์์ผ๋ฏ๋ก grading์ ํตํด ๋ฌธ์๋ฅผ ์ ํํฉ๋๋ค. ์ดํ generate ๋ ธ๋์์ ์ฌ์ฉ์์ ์ง๋ฌธ๊ณผ ๊ด๋ จ๋ ๋ฌธ์๋ฅผ ์ด์ฉํ์ฌ ๊ฒฐ๊ณผ๋ฅผ ์ป์ต๋๋ค.
LangGraph๋ก ์๋์ ๊ฐ์ด workflow๋ฅผ ๊ตฌ์ฑํฉ๋๋ค. ์ฌ๊ธฐ ๋ ธ๋(node)์๋ retrieve_node, parallel_grader, generate_node๊ฐ ์์ต๋๋ค. retrieve_node๋ RAG์ ์ง์ํ๊ณ , parallel_grader๋ ๊ฐ์ ธ์จ ๊ด๋ จ๋ ๋ฌธ์๋ฅผ ๊ฒ์ฆํ๊ณ , generate_node์์ ๋ต๋ณ์ ์์ฑํฉ๋๋ค.
class State(TypedDict):
query: str
draft: str
relevant_docs: List[str]
filtered_docs: List[str]
reflection : List[str]
sub_queries : List[str]
revision_number: int
def buildRagBasic():
workflow = StateGraph(State)
# Add nodes
workflow.add_node("retrieve_node", retrieve_node)
workflow.add_node("parallel_grader", parallel_grader)
workflow.add_node("generate_node", generate_node)
# Set entry point
workflow.set_entry_point("retrieve_node")
workflow.add_edge("retrieve_node", "parallel_grader")
workflow.add_edge("parallel_grader", "generate_node")
return workflow.compile()
๊ธฐ๋ณธ RAG๋ฅผ ์คํํ ๋์๋ ์๋์ ๊ฐ์ด ์ ๋ ฅ์ผ๋ก query๋ฅผ ์ด์ฉํ์ฌ LangGraph๋ก ์์ฑํ ์ฑ์ ์ํํฉ๋๋ค. ์ํ์ด ์๋ฃ๋๋ฉด State์ draft๋ฅผ ์ถ์ถํ์ฌ ๋ต๋ณ์ผ๋ก ์ ๋ฌํฉ๋๋ค.
def run_rag_basic(connectionId, requestId, query):
app = buildRagBasic()
# Run the workflow
isTyping(connectionId, requestId)
inputs = {
"query": query
}
config = {
"recursion_limit": 50
}
output = app.invoke(inputs, config)
return output['draft']
RAG์์ ๊ด๋ จ๋ ๋ฌธ์๋ฅผ ๊ฐ์ ธ์ค๋ ํจ์๋ ์๋์ ๊ฐ์ต๋๋ค. State์์ query๋ฅผ ์ถ์ถํ์ฌ Bedrock Knowledge Base๋ฅผ ์ด์ฉํด ๊ตฌ์ฑํ RAG์ ๊ด๋ จ๋ ๋ฌธ์๋ฅผ ์ถ์ถํด์ ์ ๋ฌํ๋๋ก ์์ฒญํฉ๋๋ค.
def retrieve_node(state: State):
print("###### retrieve ######")
query = state['query']
relevant_docs = retrieve_from_knowledge_base(query)
return {
"relevant_docs": relevant_docs
}
์๋์ ๊ฐ์ด RAG์ ์ง์์ ์ฅ์๋ก๋ถํฐ ์ป์ด์ง ๊ด๋ จ๋ ๋ฌธ์๋ฅผ gradingํฉ๋๋ค. ์ฌ๋ฌ๋ฒ grading์ ์ํํ์ฌ์ผ ํ๋ฏ๋ก Process-based parallelism์ ์ฌ์ฉํ์์ต๋๋ค.
def parallel_grader(state: State):
print("###### parallel_grader ######")
query = state['query']
relevant_docs = state['relevant_docs']
global selected_chat
filtered_docs = []
processes = []
parent_connections = []
for i, doc in enumerate(relevant_docs):
parent_conn, child_conn = Pipe()
parent_connections.append(parent_conn)
process = Process(target=grade_document_based_on_relevance, args=(child_conn, query, doc, multi_region_models, selected_chat))
processes.append(process)
selected_chat = selected_chat + 1
if selected_chat == len(multi_region_models):
selected_chat = 0
for process in processes:
process.start()
for parent_conn in parent_connections:
doc = parent_conn.recv()
if doc is not None:
filtered_docs.append(doc)
for process in processes:
process.join()
return {
"filtered_docs": filtered_docs
}
์ฌ๊ธฐ์์๋ ๊ด๋ จ๋๋ฅผ ํ๋จํ๊ธฐ ์ํ์ฌ LLM์ ์ด์ฉํ grading์ "yes/no"๋ก ํ์ ํ์์ต๋๋ค.
def get_retrieval_grader(chat):
system = """You are a grader assessing relevance of a retrieved document to a user question. \n
If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant. \n
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
grade_prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
]
)
structured_llm_grader = chat.with_structured_output(GradeDocuments)
retrieval_grader = grade_prompt | structured_llm_grader
return retrieval_grader
def grade_document_based_on_relevance(conn, question, doc, models, selected):
chat = get_multi_region_chat(models, selected)
retrieval_grader = get_retrieval_grader(chat)
score = retrieval_grader.invoke({"question": question, "document": doc.page_content})
grade = score.binary_score
if grade == 'yes':
print("---GRADE: DOCUMENT RELEVANT---")
conn.send(doc)
else: # no
print("---GRADE: DOCUMENT NOT RELEVANT---")
conn.send(None)
conn.close()
๊ธฐ๋ณธ RAG์ reflection์ ์ถ๊ฐํ์ฌ, RAG๋ก ๋ถํฐ ์์ฑ๋ ๋ต๋ณ์ ๊ฐํํฉ๋๋ค.
ย ย์๋๋ reflection์ ์ํ workflow์ ๋๋ค. reflect_node์์๋ ์ด์ ๋ต๋ณ(draft)๋ก ๋ถํฐ ๊ฐ์ ์ ์ ์ถ์ถํ๊ณ , ๊ด๋ จ๋ 3๊ฐ์ query๋ฅผ ์์ฑํฉ๋๋ค. parallel_retriever๋ 3๊ฐ์ query๋ฅผ ๋ณ๋ ฌ๋ก ์กฐํํ์ฌ ๊ด๋ จ๋ ๋ฌธ์(relevant documents)๋ฅผ ์ป๊ณ , parallel_grader๋ฅผ ์ด์ฉํ์ฌ gradingํ ํ์ revise_node๋ฅผ ์ด์ฉํ์ฌ ํฅ์๋ ๋ต๋ณ์ ์ป์ต๋๋ค.
def buildRagWithReflection():
workflow = StateGraph(State)
# Add nodes
workflow.add_node("retrieve_node", retrieve_node)
workflow.add_node("parallel_grader", parallel_grader)
workflow.add_node("generate_node", generate_node)
workflow.add_node("reflect_node", reflect_node)
workflow.add_node("parallel_retriever", parallel_retriever)
workflow.add_node("parallel_grader_subqueries", parallel_grader)
workflow.add_node("revise_node", revise_node)
# Set entry point
workflow.set_entry_point("retrieve_node")
workflow.add_edge("retrieve_node", "parallel_grader")
workflow.add_edge("parallel_grader", "generate_node")
workflow.add_edge("generate_node", "reflect_node")
workflow.add_edge("reflect_node", "parallel_retriever")
workflow.add_edge("parallel_retriever", "parallel_grader_subqueries")
workflow.add_edge("parallel_grader_subqueries", "revise_node")
workflow.add_conditional_edges(
"revise_node",
continue_reflection,
{
"end": END,
"continue": "reflect_node"}
)
return workflow.compile()
Reflection์ ์ด์(draft)๋ก ๋ถํฐ ์๋์ ๊ฐ์ด structured output์ ์ด์ฉํ์ฌ ์ถ์ถํฉ๋๋ค. ์ถ์ถ๋ ๊ฒฐ๊ณผ์๋ reflection๊ณผ ๊ด๋ จํ์ฌ missing, advisable, superfluous์ ์ป์ด์ ๋ฌธ์์ ๊ฐ์ ์ ๋์์ ์ค ์ ์์ผ๋ฉฐ, sub_queries๋ฅผ ์ด์ฉํด 1-3๊ฐ์ ์๋ก์ด ์ง๋ฌธ์ ์์ฑํฉ๋๋ค. RAG์์๋ embedding์ ํตํด ํ๊ตญ์ด๋ก ์์ด๋ก๋ ๋ฌธ์๋ฅผ ์ฐพ์ ์ ์์ผ๋ ์ ์ฌํ ํ๊ธ ๋ฌธ์๊ฐ ๋ง์ ๊ฒฝ์ฐ์๋ ์ ์ฉํ ์์ด ๋ฌธ์๋ฅผ ์ฐธ์กฐํ์ง ๋ชปํ ์ ์์ต๋๋ค. ๋ฐ๋ผ์ ์ฌ๊ธฐ์์๋ ์์ฑ๋ sub_query๊ฐ ํ๊ตญ์ด์ผ ๊ฒฝ์ฐ์๋ ๋ฒ์ญํ์ฌ ์์ด๋ก ๋ sub_query๋ฅผ ์ถ๊ฐํฉ๋๋ค. ๋ํ reflection์ผ๋ก ๋ต๋ณ์ ๋ถ์กฑํ ๋ถ๋ถ์ด๋, ์ถ๊ฐ๋์ด์ผํ ๋ด์ฉ, ๊ธธ์ด/์คํ์ผ์ ๋ํ ์ ๋ณด๋ฅผ ์ถ์ถํ์ฌ ๋ต๋ณ์ ํฅ์์ํฌ ์ ์์ต๋๋ค.
class Reflection(BaseModel):
missing: str = Field(description="Critique of what is missing.")
advisable: str = Field(description="Critique of what is helpful for better writing")
superfluous: str = Field(description="Critique of what is superfluous")
class Research(BaseModel):
"""Provide reflection and then follow up with search queries to improve the question/answer."""
reflection: Reflection = Field(description="Your reflection on the initial answer.")
sub_queries: list[str] = Field(
description="1-3 search queries for researching improvements to address the critique of your current answer."
)
class ReflectionKor(BaseModel):
missing: str = Field(description="๋ต๋ณ์ ์์ด์ผํ๋๋ฐ ๋น ์ง ๋ด์ฉ์ด๋ ๋จ์ ")
advisable: str = Field(description="๋ ์ข์ ๋ต๋ณ์ด ๋๊ธฐ ์ํด ์ถ๊ฐํ์ฌ์ผ ํ ๋ด์ฉ")
superfluous: str = Field(description="๋ต๋ณ์ ๊ธธ์ด๋ ์คํ์ผ์ ๋ํ ๋นํ")
class ResearchKor(BaseModel):
"""๋ต๋ณ์ ๊ฐ์ ํ๊ธฐ ์ํ ๊ฒ์ ์ฟผ๋ฆฌ๋ฅผ ์ ๊ณตํฉ๋๋ค."""
reflection: ReflectionKor = Field(description="๋ต๋ณ์ ๋ํ ํ๊ฐ")
sub_queries: list[str] = Field(
description="๋ต๋ณ๊ณผ ๊ด๋ จ๋ 3๊ฐ ์ด๋ด์ ๊ฒ์์ด"
)
def reflect_node(state: State):
print("###### reflect ######")
query = state['query']
draft = state['draft']
reflection = []
sub_queries = []
for attempt in range(5):
chat = get_chat()
if isKorean(draft):
structured_llm = chat.with_structured_output(ResearchKor, include_raw=True)
qa = f"์ง๋ฌธ: {query}\n\n๋ต๋ณ: {draft}"
else:
structured_llm = chat.with_structured_output(Research, include_raw=True)
qa = f"Question: {query}\n\nAnswer: {draft}"
info = structured_llm.invoke(qa)
if not info['parsed'] == None:
parsed_info = info['parsed']
reflection = [parsed_info.reflection.missing, parsed_info.reflection.advisable]
sub_queries = parsed_info.sub_queries
if isKorean(draft):
translated_search = []
for q in sub_queries:
chat = get_chat()
if isKorean(q):
search = traslation(chat, q, "Korean", "English")
else:
search = traslation(chat, q, "English", "Korean")
translated_search.append(search)
sub_queries += translated_search
break
return {
"reflection": reflection,
"sub_queries": sub_queries,
}
parallel_retriever๋ sub_queries๋งํผ ๋ณ๋ ฌ์ฒ๋ฆฌ๋ฅผ ์ํํ์ฌ ์๋๋ฅผ ๊ฐ์ ํฉ๋๋ค. ์ด๋ retriever๋ ์์ ผ๊ด๋ฆฌํ์ธ RAG ์๋น์ค์ธ knowledge base์์ ๊ด๋ จ๋ ๋ฌธ์๋ฅผ ๊ฐ์ ธ์ต๋๋ค.
def retriever(conn, query):
relevant_docs = retrieve_from_knowledge_base(query)
print("---RETRIEVE: RELEVANT DOCUMENT---")
conn.send(relevant_docs)
conn.close()
return relevant_docs
def parallel_retriever(state: State):
print("###### parallel_retriever ######")
sub_queries = state['sub_queries']
print('sub_queries: ', sub_queries)
relevant_docs = []
processes = []
parent_connections = []
for i, query in enumerate(sub_queries):
print(f"retrieve sub_queries[{i}]: {query}")
parent_conn, child_conn = Pipe()
parent_connections.append(parent_conn)
process = Process(target=retriever, args=(child_conn, query))
processes.append(process)
for process in processes:
process.start()
for parent_conn in parent_connections:
docs = parent_conn.recv()
for doc in docs:
relevant_docs.append(doc)
for process in processes:
process.join()
return {
"relevant_docs": relevant_docs
}
revise_node์์๋ reflection์ผ๋ก ์ป์ด์ง reflection critique์ sub-quries๋ฅผ ์ด์ฉํด ์กฐํํ ๊ด๋ จ๋ ๋ฌธ์๋ค์ ์ด์ฉํ์ฌ ์๋์ ๊ฐ์ด ์ด์(draft)๋ฅผ ํฅ์์ํต๋๋ค.
def revise_node(state: State):
print("###### revise ######")
draft = state['draft']
reflection = state['reflection']
if isKorean(draft):
revise_template = (
"๋น์ ์ ์ฅ๋ฌธ ์์ฑ์ ๋ฅ์ํ ์ ๋ฅํ ๊ธ์ฐ๊ธฐ ๋์ฐ๋ฏธ์
๋๋ค."
"draft์ critique๊ณผ information ์ฌ์ฉํ์ฌ ์์ ํ์ญ์์ค."
"์ต์ข
๊ฒฐ๊ณผ๋ ํ๊ตญ์ด๋ก ์์ฑํ๊ณ <result> tag๋ฅผ ๋ถ์ฌ์ฃผ์ธ์."
"<draft>"
"{draft}"
"</draft>"
"<critique>"
"{reflection}"
"</critique>"
"<information>"
"{content}"
"</information>"
)
else:
revise_template = (
"You are an excellent writing assistant."
"Revise this draft using the critique and additional information."
"Provide the final answer with <result> tag."
"<draft>"
"{draft}"
"</draft>"
"<critique>"
"{reflection}"
"</critique>"
"<information>"
"{content}"
"</information>"
)
revise_prompt = ChatPromptTemplate([
('human', revise_template)
])
filtered_docs = state['filtered_docs']
content = []
if len(filtered_docs):
for d in filtered_docs:
content.append(d.page_content)
chat = get_chat()
reflect = revise_prompt | chat
res = reflect.invoke(
{
"draft": draft,
"reflection": reflection,
"content": content
}
)
output = res.content
revised_draft = output[output.find('<result>')+8:len(output)-9]
revision_number = state["revision_number"] if state.get("revision_number") is not None else 1
return {
"draft": revised_draft,
"revision_number": revision_number + 1
}
์ด๋, ์ด์์ MAX_REVISIONS๋งํผ ๋ฐ๋ณตํ์ฌ refection์ ์ ์ฉํ ์ ์์ต๋๋ค.
MAX_REVISIONS = 1
def continue_reflection(state: State, config):
print("###### continue_reflection ######")
max_revisions = config.get("configurable", {}).get("max_revisions", MAX_REVISIONS)
if state["revision_number"] > max_revisions:
return "end"
return "continue"
RAG์ ์ง๋ฌธํ๊ธฐ ์ ์ ์ ๋ ฅ๋ query๋ฅผ ๋ณํํ์ฌ ์ฑ๋ฅ์ ํฅ์์ํค๊ธฐ ์ํด์๋ ์๋์ ๊ฐ์ด rewrite์ decompse ๊ณผ์ ์ด ํ์ํฉ๋๋ค. rewrite_node๋ RAG์์ ์ข๋ ์ข์ ๊ฒฐ๊ณผ๋ฅผ ์ป๋๋ก query์ ๋ด์ฉ์ ์์ธํ๊ฒ ํ์ด ์ ์ต๋๋ค. ์ดํ decompse_node์์๋ RAG์์ ์กฐํํ ๋ ์ฌ์ฉํ sub-quries๋ค์ ์์ฑํฉ๋๋ค. ์ฌ๊ธฐ์์๋ query_transformations.ipynb์ ์ฐธ์กฐํ์ฌ query transformation์ ์ํ prompt๋ฅผ ์์ฑํฉ๋๋ค.
Transformation์ ์ํ workflow๋ ์๋์ ๊ฐ์ต๋๋ค. RAG๋ฅผ ์กฐํํ๊ธฐ ์ ์ rewrite_node๋ก ์ง๋ฌธ์ ํ์ด์ ์ฐ๊ณ , decompose_node๋ก ์์ธํ ์ง๋ฌธ๋ค์ ์์ฑํฉ๋๋ค.
def buildRagWithTransformation():
workflow = StateGraph(State)
# Add nodes
workflow.add_node("rewrite_node", rewrite_node)
workflow.add_node("decompose_node", decompose_node)
workflow.add_node("parallel_retriever", parallel_retriever)
workflow.add_node("parallel_grader", parallel_grader)
workflow.add_node("generate_node", generate_node)
# Set entry point
workflow.set_entry_point("rewrite_node")
# Add edges
workflow.add_edge("rewrite_node", "decompose_node")
workflow.add_edge("decompose_node", "parallel_retriever")
workflow.add_edge("parallel_retriever", "parallel_grader")
workflow.add_edge("parallel_grader", "generate_node")
workflow.add_edge("generate_node", END)
rewrite_node์์๋ ์ง๋ฌธ์ ๊ฒ์์ ๋ง๊ฒ ์์ธํ๊ฒ ํ์ด์ค๋๋ค.
def rewrite_node(state: State):
print("###### rewrite ######")
query = state['query']
query_rewrite_template = (
"You are an AI assistant tasked with reformulating user queries to improve retrieval in a RAG system."
"Given the original query, rewrite it to be more specific,"
"detailed, and likely to retrieve relevant information."
"Put it in <result> tags."
"Original query: {original_query}"
"Rewritten query:"
)
rewrite_prompt = ChatPromptTemplate([
('human', query_rewrite_template)
])
chat = get_chat()
rewrite = rewrite_prompt | chat
res = rewrite.invoke({"original_query": query})
revised_query = res.content
revised_query = revised_query[revised_query.find('<result>')+8:len(revised_query)-9] # remove <result> tag
return {
"query": revised_query
}
์ง๋ฌธ์ด ์ฌ๋ฌ๊ฐ์ Subquery๋ฅผ ๊ฐ์ง์ ์์ผ๋ฏ๋ก ์๋์ ๊ฐ์ด Decompose๋ฅผ ์ํํฉ๋๋ค.
def decompose_node(state: State):
print("###### decompose ######")
query = state['query']
if isKorean(query):
subquery_decomposition_template = (
"๋น์ ์ ๋ณต์กํ ์ฟผ๋ฆฌ๋ฅผ RAG ์์คํ
์ ๋ ๊ฐ๋จํ ํ์ ์ฟผ๋ฆฌ๋ก ๋ถํดํ๋ AI ์ด์์คํดํธ์
๋๋ค. "
"์ฃผ์ด์ง ์๋ ์ฟผ๋ฆฌ๋ฅผ 1-3๊ฐ์ ๋ ๊ฐ๋จํ ํ์ ์ฟผ๋ฆฌ๋ก ๋ถํดํ์ธ์. "
"์ต์ข
๊ฒฐ๊ณผ์ <result> tag๋ฅผ ๋ถ์ฌ์ฃผ์ธ์."
"<query>"
"{original_query}"
"</query>"
"๋ค์์ ์์ ๋ฅผ ์ฐธ์กฐํ์ฌ ์ฟผ๋ฆฌ๋ฅผ ์์ฑํฉ๋๋ค. ๊ฐ ์ฟผ๋ฆฌ๋ ํ ์ค์ ์ฐจ์งํฉ๋๋ค:"
"<example>"
"์ง๋ฌธ: ๊ธฐํ ๋ณํ๊ฐ ํ๊ฒฝ์ ๋ฏธ์น๋ ์ํฅ์ ๋ฌด์์
๋๊น? "
"ํ์ ์ง๋ฌธ:"
"1. ๊ธฐํ ๋ณํ๊ฐ ํ๊ฒฝ์ ๋ฏธ์น๋ ์ฃผ์ ์ํฅ์ ๋ฌด์์
๋๊น?"
"2. ๊ธฐํ ๋ณํ๋ ์ํ๊ณ์ ์ด๋ค ์ํฅ์ ๋ฏธ์นฉ๋๊น? "
"3. ๊ธฐํ ๋ณํ๊ฐ ํ๊ฒฝ์ ๋ฏธ์น๋ ๋ถ์ ์ ์ธ ์ํฅ์ ๋ฌด์์
๋๊น?"
"</example>"
)
else:
subquery_decomposition_template = (
"You are an AI assistant tasked with breaking down complex queries into simpler sub-queries for a RAG system."
"Given the original query, decompose it into 1-3 simpler sub-queries."
"Provide the final answer with <result> tag."
"<query>"
"{original_query}"
"</query>"
"Create queries referring to the following example. Each query occupies one line."
"<example>"
"Query: What are the impacts of climate change on the environment?"
"Sub-queries:"
"1. What are the impacts of climate change on biodiversity?"
"2. How does climate change affect the oceans?"
"3. What are the effects of climate change on agriculture?"
"</example>"
)
decomposition_prompt = ChatPromptTemplate([
('human', subquery_decomposition_template)
])
chat = get_chat()
decompose = decomposition_prompt | chat
response = decompose.invoke({"original_query": query})
print('response: ', response.content)
result = response.content[response.content.find('<result>')+8:len(response.content)-9]
print('result: ', result)
result = result.strip().replace('\n\n', '\n')
decomposed_queries = result.split('\n')
print('decomposed_queries: ', decomposed_queries)
sub_queries = []
if len(decomposed_queries):
sub_queries = decomposed_queries
else:
sub_queries = [query]
return {
"sub_queries": [query] + sub_queries
}
์ด ์๋ฃจ์ ์ ์ฌ์ฉํ๊ธฐ ์ํด์๋ ์ฌ์ ์ ์๋์ ๊ฐ์ ์ค๋น๊ฐ ๋์ด์ผ ํฉ๋๋ค.
- AWS Account ์์ฑ์ ๋ฐ๋ผ ๊ณ์ ์ ์ค๋นํฉ๋๋ค.
๋ณธ ์ค์ต์์๋ us-west-2 ๋ฆฌ์ ์ ์ฌ์ฉํฉ๋๋ค. ์ธํ๋ผ ์ค์น์ ๋ฐ๋ผ CDK๋ก ์ธํ๋ผ ์ค์น๋ฅผ ์งํํฉ๋๋ค.
์๋์ ๊ฐ์ด ๋ฉ๋ด์์ "RAG (Basic)"์ ์ ํํฉ๋๋ค.
์ฑํ ์ฐฝ์ "Advanced RAG์ ๋ํด ์ค๋ช ํด์ฃผ์ธ์"๋ผ๊ณ ์ ๋ ฅํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํฉ๋๋ค. ์ด๋, RAG์ ํฌํจ๋ ๋ฌธ์์ ๋ฐ๋ผ ๊ฒฐ๊ณผ๋ ๋ฌ๋ผ์ง๋๋ค. ์ฌ๊ธฐ์์๋ RAG์ ๊ด๋ จ๋ ๊ฐ์ข PPT๋ฅผ ๋ฃ์์๋์ ๊ฒฐ๊ณผ์ ๋๋ค. ํ์ผ์ ํ๋ฉด ํ๋จ์ ํ์ผ ์์ด์ฝ์ ์ด์ฉํด ๋ฃ๊ฑฐ๋, Amazon S3 Console์์ ์ง์ pushํด์ ๋ฃ์ ์ ์์ต๋๋ค.
๋ฉ๋ด์์ "RAG with Transformation"์ ์ ํํฉ๋๋ค. ์๋๋ "Advanced RAG์ ๋ํด ์ค๋ช ํด์ฃผ์ธ์"์ ๋ํ ๋ต๋ณ์ ๋๋ค. ๋ต๋ณ์ ๊ธธ์ด๋ ์ค๋ช ์ ์ข์ผ๋ Advanced RAG๊ฐ ์๋ ์ผ๋ฐ RAG์ ๋ํด ์ค๋ช ํ๊ณ ์์ต๋๋ค.
๋ก๊ทธ๋ฅผ ํ์ธํด๋ณด๋ฉด rewriteํ์ ์๋์ ๊ฐ์ด ์๋ก์ด ์ง๋ฌธ(revised_query)๊ฐ ์์ฑ๋์์ผ๋ฉฐ, ์ด๋์ advanced ๋จ์ด๊ฐ ์ ์ธ๋์ด ์์ต๋๋ค. ์ฌ๊ธฐ์ ์ฌ์ฉํ "Advanced RAG"๋ ๋ฌธ์์์ ๊ณ ์ ๋ช ์ฌ์ฒ๋ผ ์ฌ์ฉ๋๊ณ ์์ผ๋ rewriteํ๋ ๊ณผ์ ์์ ์ ์ธ๋๊ฒ์ผ๋ก ๋ณด์ฌ์ง๋๋ค.
RAG(Retrieval-Augmented Generation) ๋ชจ๋ธ์ ์ ๋ณด ๊ฒ์๊ณผ ์์ฐ์ด ์์ฑ์ ๊ฒฐํฉํ ์ต์ ๊ธฐ์ ์
๋๋ค.
RAG ๋ชจ๋ธ์ ๊ตฌ์กฐ์ ์๋ ์๋ฆฌ, ์ฅ๋จ์ , ์ฃผ์ ์์ฉ ๋ถ์ผ์ ์ฌ๋ก์ ๋ํด ์์ธํ ์ค๋ช
ํด์ฃผ์๊ธฐ ๋ฐ๋๋๋ค.
๋ํ RAG ๋ชจ๋ธ์ ๋ฐ์ ๋ฐฉํฅ๊ณผ ํฅํ ์ ๋ง์ ๋ํด์๋ ์ธ๊ธํด์ฃผ์๋ฉด ๊ฐ์ฌํ๊ฒ ์ต๋๋ค
๋ฐ๋ผ์ ์ธ๋ถ ์ง๋ฌธ(sub-queries)์๋ ์๋์ ๊ฐ์ด "advanced"๊ฐ ํฌํจ๋์ด ์์ง ์์ผ๋ฏ๋ก, Advanced RAG๊ฐ ์๋ ์ผ๋ฐ RAG์ ๋ํ ๋ต๋ณ์ด ์์ฑ๋์์ต๋๋ค.
1. RAG ๋ชจ๋ธ์ ๊ตฌ์กฐ์ ์๋ ์๋ฆฌ๋ ๋ฌด์์
๋๊น?
2. RAG ๋ชจ๋ธ์ ์ฅ๋จ์ ์ ๋ฌด์์
๋๊น?
3. RAG ๋ชจ๋ธ์ ์ฃผ์ ์์ฉ ๋ถ์ผ์ ์ฌ๋ก๋ ๋ฌด์์
๋๊น?
4. RAG ๋ชจ๋ธ์ ๋ฐ์ ๋ฐฉํฅ๊ณผ ํฅํ ์ ๋ง์ ์ด๋ ํฉ๋๊น?
๋๋ถ๋ถ์ ๊ฒฝ์ฐ์ transformation์ ์ง๋ฌธ์ ๋ช ํํํ๊ณ ๋ค์ํ ๊ด์ ์์ RAG๋ฅผ ๊ฒ์ํ ์ ์๋๋ก ๋์์ฃผ์ด์ ์ฑ๋ฅ์ ๋์์ด ๋ฉ๋๋ค. ํ์ง๋ง, ์ผ๋ถ ์ฃ์ง ์ผ์ด์ค์์๋ ์๋ ์๋ํ๋ ๋ด์ฉ๊ณผ ๋ค๋ฅธ ๋ต๋ณ์ ์ป์ ์ ์์ต๋๋ค.
๋ฉ๋ด์์ "RAG with Reflection"์ ์ ํํ ํ์ ์๋์ ๊ฐ์ด "Advanced RAG์ ๋ํด ์ค๋ช ํด์ฃผ์ธ์"๋ผ๊ณ ์ง๋ฌธํฉ๋๋ค. ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด, reflection ๋์์ ํตํด ๋ต๋ณ๊ณผ ๊ด๋ จ๋ ๋ค์ํ query๊ฐ ์ํ๋์์ผ๋ฏ๋ก, ๊ธฐ๋ณธ RAG ๋ณด๋ค ๋ ์์ธํ ๋ด์ฉ์ ๋ต๋ณ์ ์ป์ ์ ์์ต๋๋ค.
LangGraph๋ฅผ ์ด์ฉํ์ฌ ๊ธฐ๋ณธ RAG๋ฅผ ๊ตฌํํ๊ณ ,ย Reflection๊ณผย Query Transformation์ ์ด์ฉํ์ฌ RAG์ ์ฑ๋ฅ์ ํฅ์์ํค๋ ๋ฐฉ๋ฒ์ ๋น๊ตํ์์ต๋๋ค. ์ด๋ฅผ ํตํด ํ๊ตญ์ด/์์ด ๋ฌธ์๋ค์ ๋์์ ๊ฒ์ํ๊ณ , ์์ฑ๋ ๋ต๋ณ์ ๊ฐํํ ์ ์์ต๋๋ค. ๋ํ ๋ค์์ query๋ก ์ธํด ๊ฐ์ ๋ฌธ์๊ฐ ๊ฒ์๋์๋์ ๋ํ ์ค๋ณต์ฒ๋ฆฌ๋ฅผ ์ํํ๊ณ , ๋ณ๋ ฌ์ฒ๋ฆฌ๋ฅผ ํตํ์ฌ ์ํ์๊ฐ์ ๊ฐ์ ํ์์ต๋๋ค. ์ฌ๊ธฐ์์๋ prompt์ ์ด์ฉํ relevant document ๊ฒ์ฆํ์๊ณ , ์๋ฒ๋ฆฌ์ค ์ํคํ ์ฒ๋ฅผ ์ด์ฉํ์ฌ ์ ์ง๋ณด์ ๋ฐ ๋ณํํ๋ ํธ๋ํฝ์ ๋ํด ์ ์ฐํ๊ฒ ์ด์ํ ์ ์๋๋ก ํ์์ต๋๋ค. ๋ํ, AWS CDK๋ฅผ ์ด์ฉํ์ฌ ํธ๋ฆฌํ๊ฒ AWS Cloud์ ๋ฐฐํฌํ ์ ์์ต๋๋ค.
๋์ด์ ์ธํ๋ผ๋ฅผ ์ฌ์ฉํ์ง ์๋ ๊ฒฝ์ฐ์ ์๋์ฒ๋ผ ๋ชจ๋ ๋ฆฌ์์ค๋ฅผ ์ญ์ ํ ์ ์์ต๋๋ค.
-
API Gateway Console๋ก ์ ์ํ์ฌ "rest-api-for-rag-with-reflection", "ws-api-for-rag-with-reflection"์ ์ญ์ ํฉ๋๋ค.
-
Cloud9 Console์ ์ ์ํ์ฌ ์๋์ ๋ช ๋ น์ด๋ก ์ ์ฒด ์ญ์ ๋ฅผ ํฉ๋๋ค.
cd ~/environment/rag-with-reflection/cdk-rag-with-reflection/ && cdk destroy --all