박정기
박정기
🗡️ AI 레전드
🎖️ 마스터 파트너

[1DAY 1랭그래프] streaming-interrupation 에 대하여 #8일차

배경 및 목적

이번 13기 AI 스터디에 앞서, 1DAY 1랭그래프, 즉 하루에 1개의 랭그래프 개념을 설명하는 부분을 기획하게 됐습니다.

-> 랭그래프 스터디에 관심이 있다면 신청해주세요!

주황색 배경의 한국 광고

스터디 신청링크

이 프로젝트의 목표는 랭그래프를 쉽게 입문하실 수 있게 상세한 개념부터 코드까지 함께 공부해보실 수 있게 정리해서 드릴 예정입니다.

저는 랭그래프를 처음들어봐요!, 1일차 글을 보고 싶다면!?

-> 링크로 접속하기!

참고 자료

코랩 실습 링크

공식 랭그래프 강의 링크

활용 툴

파이썬, 코랩, 랭그래프, 랭체인

핵심 내용 정리: LangGraph를 활용한 스트리밍 기능과 휴먼 인 더 루프 챗봇 구축

1. 소개 및 검토

- 모듈 2 복습:

- 그래프 상태와 메모리를 사용자 지정하는 다양한 방법을 학습.

- 외부 메모리를 갖춘 챗봇을 구축하여 장기적이고 복잡한 대화를 지속 가능하게 함.

2. 목표

- 휴먼 인 더 루프 (Human-in-the-Loop) 도입:

- 메모리를 기반으로 사용자가 다양한 방식으로 그래프와 직접 상호작용할 수 있도록 설계.

- 스트리밍 기능을 활용하여 그래프 실행 중 실시간으로 출력(노드 상태 또는 채팅 모델 토큰)을 시각화.

- 스트리밍의 중요성:

- 실행 과정에서 그래프의 상태 변화와 모델의 응답을 실시간으로 모니터링하고 분석할 수 있어 디버깅과 최적화에 유용.

3. 스트리밍 설정 및 구현

- 환경 설정:

```python

%%capture --no-stderr

%pip install --quiet -U langgraph langchain_openai langgraph_sdk

```

- LangGraph와 관련된 필수 라이브러리 설치.

- API 키 설정:

```python

import os, getpass

def setenv(var: str):

if not os.environ.get(var):

os.environ[var] = getpass.getpass(f"{var}: ")

setenv("OPENAI_API_KEY")

```

- OpenAI API 키를 설정하여 모델과의 연동 준비.

4. 챗봇 로직 정의

- 상태 스키마 정의:

```python

from langgraph.graph import MessagesState

class State(MessagesState):

summary: str

```

- MessagesStatesummary 키를 추가하여 대화 요약을 저장할 수 있도록 확장.

- LLM 호출 노드 (`call_model`) 정의:

```python

from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage

from langchain_openai import ChatOpenAI

from langchain_core.runnables import RunnableConfig

model = ChatOpenAI(model="gpt-4o", temperature=0)

def call_model(state: State, config: RunnableConfig):

summary = state.get("summary", "")

if summary:

system_message = f"Summary of conversation earlier: {summary}"

messages = [SystemMessage(content=system_message)] + state["messages"]

else:

messages = state["messages"]

response = model.invoke(messages, config)

return {"messages": response}

```

- 현재 상태에 요약이 있으면 시스템 메시지에 추가하여 모델에 전달.

- RunnableConfig를 사용하여 토큰 단위 스트리밍 활성화.

- 요약 생성 노드 (`summarize_conversation`) 정의:

```python

def summarize_conversation(state: State):

summary = state.get("summary", "")

if summary:

summary_message = (

f"This is summary of the conversation to date: {summary}\n\n"

"Extend the summary by taking into account the new messages above:"

)

else:

summary_message = "Create a summary of the conversation above:"

messages = state["messages"] + [HumanMessage(content=summary_message)]

response = model.invoke(messages)

delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]

return {"summary": response.content, "messages": delete_messages}

```

- 메시지 수가 일정 기준을 초과하면 대화를 요약하고 불필요한 메시지를 제거하여 상태 관리 최적화.

- 조건부 로직 정의 (`should_continue`):

```python

from langgraph.graph import END

def should_continue(state: State):

messages = state["messages"]

if len(messages) > 6:

return "summarize_conversation"

return END

```

- 메시지 수가 6개를 초과하면 요약 노드로 라우팅, 그렇지 않으면 대화 종료.

5. 그래프 빌드 및 메모리 통합

- 메모리 체크포인터 설정:

```python

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

```

- MemorySaver를 사용하여 그래프 상태를 SQLite와 같은 외부 데이터베이스에 영구 저장.

- 그래프 빌드 및 컴파일:

```python

from langgraph.graph import StateGraph, START

from IPython.display import Image, display

builder = StateGraph(State)

builder.add_node("conversation", call_model)

builder.add_node("summarize_conversation", summarize_conversation)

builder.add_edge(START, "conversation")

builder.add_conditional_edges("conversation", should_continue)

builder.add_edge("summarize_conversation", END)

graph = builder.compile(checkpointer=memory)

display(Image(graph.get_graph().draw_mermaid_png()))

```

- 정의한 노드와 조건부 에지를 추가하여 그래프를 구성하고, 체크포인터를 통해 메모리 통합.

- 머메이드 다이어그램으로 그래프 시각화.

### 6. 스트리밍 기능 활용

- 그래프 상태 스트리밍:

- 업데이트 모드 (`stream_mode="updates"`): 각 노드 실행 후 상태 업데이트만 스트리밍.

```python

config = {"configurable": {"thread_id": "1"}}

for chunk in graph.stream({"messages": [HumanMessage(content="hi! I'm Lance")]}, config, stream_mode="updates"):

print(chunk)

```

- 전체 상태 모드 (`stream_mode="values"`): 노드 실행 후 전체 상태를 스트리밍.

```python

config = {"configurable": {"thread_id": "2"}}

input_message = HumanMessage(content="hi! I'm Lance")

for event in graph.stream({"messages": [input_message]}, config, stream_mode="values"):

for m in event['messages']:

m.pretty_print()

print("---"*25)

```

- 토큰 스트리밍:

- .astream_events 메서드를 사용하여 채팅 모델의 토큰을 실시간으로 스트리밍.

```python

config = {"configurable": {"thread_id": "4"}}

input_message = HumanMessage(content="Tell me about the 49ers NFL team")

async for event in graph.astream_events({"messages": [input_message]}, config, version="v2"):

if event["event"] == "on_chat_model_stream" and event['metadata'].get('langgraph_node','') == 'conversation':

print(event["data"]["chunk"].content, end="|")

```

- 이벤트 기반 스트리밍으로 실시간으로 생성되는 토큰을 처리.

7. 결론

- 효율적인 대화 관리: LangGraph의 스트리밍 기능과 외부 DB 메모리 통합을 통해 챗봇의 대화 관리 능력이 크게 향상됩니다.

- 실시간 시각화 및 디버깅: 스트리밍을 통해 그래프 실행 중 상태 변화와 모델 응답을 실시간으로 모니터링할 수 있어 디버깅과 최적화가 용이합니다.

- 유연한 대화 흐름 제어: 조건부 로직과 스레드 ID를 활용하여 다양한 대화 시나리오를 유연하게 처리하고, 지속적인 대화를 효율적으로 관리할 수 있습니다.


랭그래프, AI 에이전트 구현에 관심이 생겼나요?

->지금바로 랭그래프 스터디에 관심이 있다면 신청해주세요!

주황색 배경의 한국 광고

지피터스 13기 스터디 신청링크

👉 이 게시글도 읽어보세요