Skip to content

Commit 9de4e3b

Browse files
authored
Add files via upload
Added tutorial on "How to Design Transactional Agentic AI Systems with LangGraph Using Two-Phase Commit, Human Interrupts, and Safe Rollbacks"
1 parent b5fe541 commit 9de4e3b

File tree

1 file changed

+383
-0
lines changed

1 file changed

+383
-0
lines changed
Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
{
2+
"nbformat": 4,
3+
"nbformat_minor": 0,
4+
"metadata": {
5+
"colab": {
6+
"provenance": []
7+
},
8+
"kernelspec": {
9+
"name": "python3",
10+
"display_name": "Python 3"
11+
},
12+
"language_info": {
13+
"name": "python"
14+
}
15+
},
16+
"cells": [
17+
{
18+
"cell_type": "code",
19+
"source": [
20+
"!pip -q install -U langgraph langchain-openai\n",
21+
"\n",
22+
"import os, json, uuid, copy, math, re, operator\n",
23+
"from typing import Any, Dict, List, Optional\n",
24+
"from typing_extensions import TypedDict, Annotated\n",
25+
"\n",
26+
"from langchain_openai import ChatOpenAI\n",
27+
"from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, AnyMessage\n",
28+
"from langgraph.graph import StateGraph, START, END\n",
29+
"from langgraph.graph.message import add_messages\n",
30+
"from langgraph.checkpoint.memory import InMemorySaver\n",
31+
"from langgraph.types import interrupt, Command\n",
32+
"\n",
33+
"def _set_env_openai():\n",
34+
" if os.environ.get(\"OPENAI_API_KEY\"):\n",
35+
" return\n",
36+
" try:\n",
37+
" from google.colab import userdata\n",
38+
" k = userdata.get(\"OPENAI_API_KEY\")\n",
39+
" if k:\n",
40+
" os.environ[\"OPENAI_API_KEY\"] = k\n",
41+
" return\n",
42+
" except Exception:\n",
43+
" pass\n",
44+
" import getpass\n",
45+
" os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"Enter OPENAI_API_KEY: \")\n",
46+
"\n",
47+
"_set_env_openai()\n",
48+
"\n",
49+
"MODEL = os.environ.get(\"OPENAI_MODEL\", \"gpt-4o-mini\")\n",
50+
"llm = ChatOpenAI(model=MODEL, temperature=0)"
51+
],
52+
"metadata": {
53+
"id": "jahmoIEr5fvS"
54+
},
55+
"execution_count": null,
56+
"outputs": []
57+
},
58+
{
59+
"cell_type": "code",
60+
"source": [
61+
"SAMPLE_LEDGER = [\n",
62+
" {\"txn_id\": \"T001\", \"name\": \"Asha\", \"email\": \"ASHA@Example.com\", \"amount\": \"1,250.50\", \"date\": \"12/01/2025\", \"note\": \"Membership renewal\"},\n",
63+
" {\"txn_id\": \"T002\", \"name\": \"Ravi\", \"email\": \"ravi@example.com\", \"amount\": \"-500\", \"date\": \"2025-12-02\", \"note\": \"Chargeback?\"},\n",
64+
" {\"txn_id\": \"T003\", \"name\": \"Sara\", \"email\": \"sara@example.com\", \"amount\": \"700\", \"date\": \"02-12-2025\", \"note\": \"Late fee waived\"},\n",
65+
" {\"txn_id\": \"T003\", \"name\": \"Sara\", \"email\": \"sara@example.com\", \"amount\": \"700\", \"date\": \"02-12-2025\", \"note\": \"Duplicate row\"},\n",
66+
" {\"txn_id\": \"T004\", \"name\": \"Lee\", \"email\": \"lee@example.com\", \"amount\": \"NaN\", \"date\": \"2025/12/03\", \"note\": \"Bad amount\"},\n",
67+
"]\n",
68+
"\n",
69+
"ALLOWED_OPS = {\"replace\", \"remove\", \"add\"}\n",
70+
"\n",
71+
"def _parse_amount(x):\n",
72+
" if isinstance(x, (int, float)):\n",
73+
" return float(x)\n",
74+
" if isinstance(x, str):\n",
75+
" try:\n",
76+
" return float(x.replace(\",\", \"\"))\n",
77+
" except:\n",
78+
" return None\n",
79+
" return None\n",
80+
"\n",
81+
"def _iso_date(d):\n",
82+
" if not isinstance(d, str):\n",
83+
" return None\n",
84+
" d = d.replace(\"/\", \"-\")\n",
85+
" p = d.split(\"-\")\n",
86+
" if len(p) == 3 and len(p[0]) == 4:\n",
87+
" return d\n",
88+
" if len(p) == 3 and len(p[2]) == 4:\n",
89+
" return f\"{p[2]}-{p[1]}-{p[0]}\"\n",
90+
" return None\n",
91+
"\n",
92+
"def profile_ledger(rows):\n",
93+
" seen, anomalies = {}, []\n",
94+
" for i, r in enumerate(rows):\n",
95+
" if _parse_amount(r.get(\"amount\")) is None:\n",
96+
" anomalies.append(i)\n",
97+
" if r.get(\"txn_id\") in seen:\n",
98+
" anomalies.append(i)\n",
99+
" seen[r.get(\"txn_id\")] = i\n",
100+
" return {\"rows\": len(rows), \"anomalies\": anomalies}\n",
101+
"\n",
102+
"def apply_patch(rows, patch):\n",
103+
" out = copy.deepcopy(rows)\n",
104+
" for op in sorted([p for p in patch if p[\"op\"] == \"remove\"], key=lambda x: x[\"idx\"], reverse=True):\n",
105+
" out.pop(op[\"idx\"])\n",
106+
" for op in patch:\n",
107+
" if op[\"op\"] in {\"add\", \"replace\"}:\n",
108+
" out[op[\"idx\"]][op[\"field\"]] = op[\"value\"]\n",
109+
" return out\n",
110+
"\n",
111+
"def validate(rows):\n",
112+
" issues = []\n",
113+
" for i, r in enumerate(rows):\n",
114+
" if _parse_amount(r.get(\"amount\")) is None:\n",
115+
" issues.append(i)\n",
116+
" if _iso_date(r.get(\"date\")) is None:\n",
117+
" issues.append(i)\n",
118+
" return {\"ok\": len(issues) == 0, \"issues\": issues}"
119+
],
120+
"metadata": {
121+
"id": "iFaWVu_V5fT5"
122+
},
123+
"execution_count": null,
124+
"outputs": []
125+
},
126+
{
127+
"cell_type": "code",
128+
"source": [
129+
"class TxnState(TypedDict):\n",
130+
" messages: Annotated[List[AnyMessage], add_messages]\n",
131+
" raw_rows: List[Dict[str, Any]]\n",
132+
" sandbox_rows: List[Dict[str, Any]]\n",
133+
" patch: List[Dict[str, Any]]\n",
134+
" validation: Dict[str, Any]\n",
135+
" approved: Optional[bool]\n",
136+
"\n",
137+
"def node_profile(state):\n",
138+
" p = profile_ledger(state[\"raw_rows\"])\n",
139+
" return {\"messages\": [AIMessage(content=json.dumps(p))]}\n",
140+
"\n",
141+
"def node_patch(state):\n",
142+
" sys = SystemMessage(content=\"Return a JSON patch list fixing amounts, dates, emails, duplicates\")\n",
143+
" usr = HumanMessage(content=json.dumps(state[\"raw_rows\"]))\n",
144+
" r = llm.invoke([sys, usr])\n",
145+
" patch = json.loads(re.search(r\"\\[.*\\]\", r.content, re.S).group())\n",
146+
" return {\"patch\": patch, \"messages\": [AIMessage(content=json.dumps(patch))]}\n",
147+
"\n",
148+
"def node_apply(state):\n",
149+
" return {\"sandbox_rows\": apply_patch(state[\"raw_rows\"], state[\"patch\"])}\n",
150+
"\n",
151+
"def node_validate(state):\n",
152+
" v = validate(state[\"sandbox_rows\"])\n",
153+
" return {\"validation\": v, \"messages\": [AIMessage(content=json.dumps(v))]}\n",
154+
"\n",
155+
"def node_approve(state):\n",
156+
" decision = interrupt({\"validation\": state[\"validation\"]})\n",
157+
" return {\"approved\": decision == \"approve\"}\n",
158+
"\n",
159+
"def node_commit(state):\n",
160+
" return {\"messages\": [AIMessage(content=\"COMMITTED\")]}\n",
161+
"\n",
162+
"def node_rollback(state):\n",
163+
" return {\"messages\": [AIMessage(content=\"ROLLED BACK\")]}"
164+
],
165+
"metadata": {
166+
"id": "g2iW_beD5fQA"
167+
},
168+
"execution_count": null,
169+
"outputs": []
170+
},
171+
{
172+
"cell_type": "code",
173+
"source": [
174+
"builder = StateGraph(TxnState)\n",
175+
"\n",
176+
"builder.add_node(\"profile\", node_profile)\n",
177+
"builder.add_node(\"patch\", node_patch)\n",
178+
"builder.add_node(\"apply\", node_apply)\n",
179+
"builder.add_node(\"validate\", node_validate)\n",
180+
"builder.add_node(\"approve\", node_approve)\n",
181+
"builder.add_node(\"commit\", node_commit)\n",
182+
"builder.add_node(\"rollback\", node_rollback)\n",
183+
"\n",
184+
"builder.add_edge(START, \"profile\")\n",
185+
"builder.add_edge(\"profile\", \"patch\")\n",
186+
"builder.add_edge(\"patch\", \"apply\")\n",
187+
"builder.add_edge(\"apply\", \"validate\")\n",
188+
"\n",
189+
"builder.add_conditional_edges(\n",
190+
" \"validate\",\n",
191+
" lambda s: \"approve\" if s[\"validation\"][\"ok\"] else \"rollback\",\n",
192+
" {\"approve\": \"approve\", \"rollback\": \"rollback\"}\n",
193+
")\n",
194+
"\n",
195+
"builder.add_conditional_edges(\n",
196+
" \"approve\",\n",
197+
" lambda s: \"commit\" if s[\"approved\"] else \"rollback\",\n",
198+
" {\"commit\": \"commit\", \"rollback\": \"rollback\"}\n",
199+
")\n",
200+
"\n",
201+
"builder.add_edge(\"commit\", END)\n",
202+
"builder.add_edge(\"rollback\", END)\n",
203+
"\n",
204+
"app = builder.compile(checkpointer=InMemorySaver())"
205+
],
206+
"metadata": {
207+
"id": "ZTwm67Oo5fNs"
208+
},
209+
"execution_count": null,
210+
"outputs": []
211+
},
212+
{
213+
"cell_type": "code",
214+
"execution_count": 4,
215+
"metadata": {
216+
"colab": {
217+
"base_uri": "https://localhost:8080/"
218+
},
219+
"id": "RG_XAZFrlTvL",
220+
"outputId": "28f39882-6eef-4065-88c1-495f62a9191f"
221+
},
222+
"outputs": [
223+
{
224+
"output_type": "stream",
225+
"name": "stdout",
226+
"text": [
227+
"\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/157.1 kB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m157.1/157.1 kB\u001b[0m \u001b[31m10.3 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n",
228+
"\u001b[?25h\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/84.6 kB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m84.6/84.6 kB\u001b[0m \u001b[31m7.6 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n",
229+
"\u001b[?25h\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/476.0 kB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m476.0/476.0 kB\u001b[0m \u001b[31m27.3 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n",
230+
"\u001b[?25h\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/66.5 kB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m66.5/66.5 kB\u001b[0m \u001b[31m5.8 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n",
231+
"\u001b[?25h---\n",
232+
"config:\n",
233+
" flowchart:\n",
234+
" curve: linear\n",
235+
"---\n",
236+
"graph TD;\n",
237+
"\t__start__([<p>__start__</p>]):::first\n",
238+
"\tprofile(profile)\n",
239+
"\tpropose_patch(propose_patch)\n",
240+
"\tapply_patch(apply_patch)\n",
241+
"\tvalidate(validate)\n",
242+
"\tapprove(approve)\n",
243+
"\tcommit(commit)\n",
244+
"\trollback(rollback)\n",
245+
"\t__end__([<p>__end__</p>]):::last\n",
246+
"\t__start__ --> profile;\n",
247+
"\tapply_patch --> validate;\n",
248+
"\tapprove -.-> apply_patch;\n",
249+
"\tapprove -.-> commit;\n",
250+
"\tapprove -.-> rollback;\n",
251+
"\tprofile --> propose_patch;\n",
252+
"\tpropose_patch --> apply_patch;\n",
253+
"\tvalidate -.-> approve;\n",
254+
"\tvalidate -.-> propose_patch;\n",
255+
"\tvalidate -.-> rollback;\n",
256+
"\tcommit --> __end__;\n",
257+
"\trollback --> __end__;\n",
258+
"\tclassDef default fill:#f2f0ff,line-height:1.2\n",
259+
"\tclassDef first fill-opacity:0\n",
260+
"\tclassDef last fill:#bfb6fc\n",
261+
"\n",
262+
"\n",
263+
"--- ✅ FINAL OUTPUT (last AI message) ---\n",
264+
"↩️ ROLLBACK COMPLETE\n",
265+
"\n",
266+
"{\n",
267+
" \"title\": \"Rollback Report (No changes committed)\",\n",
268+
" \"risk\": {\n",
269+
" \"score\": 12,\n",
270+
" \"level\": \"HIGH\",\n",
271+
" \"removed_rows\": 1,\n",
272+
" \"touches_amount\": true\n",
273+
" },\n",
274+
" \"diff\": {\n",
275+
" \"removed\": 1,\n",
276+
" \"touched_fields\": {\n",
277+
" \"amount\": 4,\n",
278+
" \"email\": 4,\n",
279+
" \"date\": 3,\n",
280+
" \"name\": 1,\n",
281+
" \"note\": 1,\n",
282+
" \"txn_id\": 1\n",
283+
" },\n",
284+
" \"n_before\": 5,\n",
285+
" \"n_after\": 4\n",
286+
" },\n",
287+
" \"validation\": {\n",
288+
" \"ok\": false,\n",
289+
" \"issues\": [\n",
290+
" {\n",
291+
" \"idx\": 3,\n",
292+
" \"field\": \"amount\",\n",
293+
" \"problem\": \"not-a-number\"\n",
294+
" },\n",
295+
" {\n",
296+
" \"idx\": 3,\n",
297+
" \"field\": \"email\",\n",
298+
" \"problem\": \"pii-not-redacted\"\n",
299+
" }\n",
300+
" ],\n",
301+
" \"n_issues\": 2\n",
302+
" },\n",
303+
" \"raw_preview\": [\n",
304+
" {\n",
305+
" \"txn_id\": \"T001\",\n",
306+
" \"name\": \"Asha\",\n",
307+
" \"email\": \"[REDACTED_EMAIL]\",\n",
308+
" \"amount\": \"1,250.50\",\n",
309+
" \"date\": \"12/01/2025\",\n",
310+
" \"note\": \"Membership renewal\"\n",
311+
" },\n",
312+
" {\n",
313+
" \"txn_id\": \"T002\",\n",
314+
" \"name\": \"Ravi\",\n",
315+
" \"email\": \"[REDACTED_EMAIL]\",\n",
316+
" \"amount\": \"-500\",\n",
317+
" \"date\": \"2025-12-02\",\n",
318+
" \"note\": \"Chargeback?\"\n",
319+
" },\n",
320+
" {\n",
321+
" \"txn_id\": \"T003\",\n",
322+
" \"name\": \"Sara\",\n",
323+
" \"email\": \"[REDACTED_EMAIL]\",\n",
324+
" \"amount\": \"700\",\n",
325+
" \"date\": \"02-12-2025\",\n",
326+
" \"note\": \"Late fee waived\"\n",
327+
" },\n",
328+
" {\n",
329+
" \"txn_id\": \"T003\",\n",
330+
" \"name\": \"Sara\",\n",
331+
" \"email\": \"[REDACTED_EMAIL]\",\n",
332+
" \"amount\": \"700\",\n",
333+
" \"date\": \"02-12-2025\",\n",
334+
" \"note\": \"Duplicate row\"\n",
335+
" },\n",
336+
" {\n",
337+
" \"txn_id\": \"T004\",\n",
338+
" \"name\": \"Lee\",\n",
339+
" \"email\": \"[REDACTED_EMAIL]\",\n",
340+
" \"amount\": \"NaN\",\n",
341+
" \"date\": \"2025/12/03\",\n",
342+
" \"note\": \"Bad amount\"\n",
343+
" }\n",
344+
" ],\n",
345+
" \"audit_tail\": [\n",
346+
" \"profile:n_rows=5\",\n",
347+
" \"patch:ops=12\",\n",
348+
" \"apply:removed=1 risk=HIGH\",\n",
349+
" \"validate:ok=False issues=1\",\n",
350+
" \"patch:ops=12\",\n",
351+
" \"apply:removed=1 risk=HIGH\",\n",
352+
" \"validate:ok=False issues=2\"\n",
353+
" ]\n",
354+
"}\n"
355+
]
356+
}
357+
],
358+
"source": [
359+
"def run():\n",
360+
" state = {\n",
361+
" \"messages\": [],\n",
362+
" \"raw_rows\": SAMPLE_LEDGER,\n",
363+
" \"sandbox_rows\": [],\n",
364+
" \"patch\": [],\n",
365+
" \"validation\": {},\n",
366+
" \"approved\": None,\n",
367+
" }\n",
368+
"\n",
369+
" cfg = {\"configurable\": {\"thread_id\": \"txn-demo\"}}\n",
370+
" out = app.invoke(state, config=cfg)\n",
371+
"\n",
372+
" if \"__interrupt__\" in out:\n",
373+
" print(json.dumps(out[\"__interrupt__\"], indent=2))\n",
374+
" decision = input(\"approve / reject: \").strip()\n",
375+
" out = app.invoke(Command(resume=decision), config=cfg)\n",
376+
"\n",
377+
" print(out[\"messages\"][-1].content)\n",
378+
"\n",
379+
"run()"
380+
]
381+
}
382+
]
383+
}

0 commit comments

Comments
 (0)