from typing import NotRequiredfrom typing_extensions import TypedDictfrom langchain_core.utils.uuid import uuid7from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.graph import StateGraph, START, ENDimport requests# Define a TypedDict to represent the stateclass State(TypedDict): url: str result: NotRequired[str]def call_api(state: State): """Example node that makes an API request.""" result = requests.get(state['url']).text[:100] # Side-effect # return { "result": result }# Create a StateGraph builder and add a node for the call_api functionbuilder = StateGraph(State)builder.add_node("call_api", call_api)# Connect the start and end nodes to the call_api nodebuilder.add_edge(START, "call_api")builder.add_edge("call_api", END)# Specify a checkpointercheckpointer = InMemorySaver()# Compile the graph with the checkpointergraph = builder.compile(checkpointer=checkpointer)# Define a config with a thread ID.thread_id = str(uuid7())config = {"configurable": {"thread_id": thread_id}}# Invoke the graphgraph.invoke({"url": "https://www.example.com"}, config)
from typing import NotRequiredfrom typing_extensions import TypedDictfrom langchain_core.utils.uuid import uuid7from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.func import taskfrom langgraph.graph import StateGraph, START, ENDimport requests# Define a TypedDict to represent the stateclass State(TypedDict): urls: list[str] result: NotRequired[list[str]]@taskdef _make_request(url: str): """Make a request.""" return requests.get(url).text[:100]def call_api(state: State): """Example node that makes an API request.""" requests = [_make_request(url) for url in state['urls']] results = [request.result() for request in requests] return { "results": results }# Create a StateGraph builder and add a node for the call_api functionbuilder = StateGraph(State)builder.add_node("call_api", call_api)# Connect the start and end nodes to the call_api nodebuilder.add_edge(START, "call_api")builder.add_edge("call_api", END)# Specify a checkpointercheckpointer = InMemorySaver()# Compile the graph with the checkpointergraph = builder.compile(checkpointer=checkpointer)# Define a config with a thread ID.thread_id = str(uuid7())config = {"configurable": {"thread_id": thread_id}}# Invoke the graphgraph.invoke({"urls": ["https://www.example.com"]}, config)
from langgraph.runtime import RunControlfrom langgraph.errors import GraphDrainedcontrol = RunControl()# In a signal handler or supervisor:# control.request_drain("sigterm")try: result = graph.invoke(inputs, config, control=control)except GraphDrained as e: # The graph stopped early and saved a checkpoint. # Resume later with the same config. print(f"Drained: {e.reason}")
import signalfrom langgraph.runtime import RunControlfrom langgraph.errors import GraphDrainedcontrol = RunControl()signal.signal(signal.SIGTERM, lambda *_: control.request_drain("sigterm"))try: result = graph.invoke(inputs, config, control=control)except GraphDrained as e: log.info("graph drained: %s", e.reason) # Resume on next startup with the same config