from langgraph.graph import StateGraph, STARTfrom langgraph.checkpoint.memory import InMemorySaverfrom typing_extensions import TypedDict, NotRequiredfrom langchain_core.utils.uuid import uuid7class State(TypedDict): topic: NotRequired[str] joke: NotRequired[str]def generate_topic(state: State): return {"topic": "socks in the dryer"}def write_joke(state: State): return {"joke": f"Why do {state['topic']} disappear? They elope!"}checkpointer = InMemorySaver()graph = ( StateGraph(State) .add_node("generate_topic", generate_topic) .add_node("write_joke", write_joke) .add_edge(START, "generate_topic") .add_edge("generate_topic", "write_joke") .compile(checkpointer=checkpointer))# Step 1: Run the graphconfig = {"configurable": {"thread_id": str(uuid7())}}result = graph.invoke({}, config)# Step 2: Find a checkpoint to replay fromhistory = list(graph.get_state_history(config))# History is in reverse chronological orderfor state in history: print(f"next={state.next}, checkpoint_id={state.config['configurable']['checkpoint_id']}")# Step 3: Replay from a specific checkpoint# Find the checkpoint before write_jokebefore_joke = next(s for s in history if s.next == ("write_joke",))replay_result = graph.invoke(None, before_joke.config)# write_joke re-executes (runs again), generate_topic does not
# Find checkpoint before write_jokehistory = list(graph.get_state_history(config))before_joke = next(s for s in history if s.next == ("write_joke",))# Fork: update state to change the topicfork_config = graph.update_state( before_joke.config, values={"topic": "chickens"},)# Resume from the fork — write_joke re-executes with the new topicfork_result = graph.invoke(None, fork_config)print(fork_result["joke"]) # A joke about chickens, not socks
# graph: generate_topic -> write_joke# Treat this update as if generate_topic produced it.# Execution resumes at write_joke (the successor of generate_topic).fork_config = graph.update_state( before_joke.config, values={"topic": "chickens"}, as_node="generate_topic",)
from langgraph.types import interrupt, Commandclass State(TypedDict): value: list[str]def ask_human(state: State): answer = interrupt("What is your name?") return {"value": [f"Hello, {answer}!"]}def final_step(state: State): return {"value": ["Done"]}graph = ( StateGraph(State) .add_node("ask_human", ask_human) .add_node("final_step", final_step) .add_edge(START, "ask_human") .add_edge("ask_human", "final_step") .compile(checkpointer=InMemorySaver()))config = {"configurable": {"thread_id": "1"}}# First run: hits interruptgraph.invoke({"value": []}, config)# Resume with answergraph.invoke(Command(resume="Alice"), config)# Replay from before ask_humanhistory = list(graph.get_state_history(config))before_ask = [s for s in history if s.next == ("ask_human",)][-1]replay_result = graph.invoke(None, before_ask.config)# Pauses at interrupt — waiting for new Command(resume=...)# Fork from before ask_humanfork_config = graph.update_state(before_ask.config, {"value": ["forked"]})fork_result = graph.invoke(None, fork_config)# Pauses at interrupt — waiting for new Command(resume=...)# Resume the forked interrupt with a different answergraph.invoke(Command(resume="Bob"), fork_config)# Result: {"value": ["forked", "Hello, Bob!", "Done"]}
def ask_name(state): name = interrupt("What is your name?") return {"value": [f"name:{name}"]}def ask_age(state): age = interrupt("How old are you?") return {"value": [f"age:{age}"]}# Graph: ask_name -> ask_age -> final# After completing both interrupts:# Fork from BETWEEN the two interrupts (after ask_name, before ask_age)history = list(graph.get_state_history(config))between = [s for s in history if s.next == ("ask_age",)][-1]fork_config = graph.update_state(between.config, {"value": ["modified"]})result = graph.invoke(None, fork_config)# ask_name result preserved ("name:Alice")# ask_age pauses at interrupt — waiting for new answer
# Subgraph without its own checkpointer (default)subgraph = ( StateGraph(State) .add_node("step_a", step_a) # Has interrupt() .add_node("step_b", step_b) # Has interrupt() .add_edge(START, "step_a") .add_edge("step_a", "step_b") .compile() # No checkpointer — inherits from parent)graph = ( StateGraph(State) .add_node("subgraph_node", subgraph) .add_edge(START, "subgraph_node") .compile(checkpointer=InMemorySaver()))config = {"configurable": {"thread_id": "1"}}# Complete both interruptsgraph.invoke({"value": []}, config) # Hits step_a interruptgraph.invoke(Command(resume="Alice"), config) # Hits step_b interruptgraph.invoke(Command(resume="30"), config) # Completes# Time travel from before the subgraphhistory = list(graph.get_state_history(config))before_sub = [s for s in history if s.next == ("subgraph_node",)][-1]fork_config = graph.update_state(before_sub.config, {"value": ["forked"]})result = graph.invoke(None, fork_config)# The entire subgraph re-executes from scratch# You cannot time travel to a point between step_a and step_b