{
  "$type": "site.standard.document",
  "bskyPostRef": {
    "cid": "bafyreicrwl5b5ilmiakqah7c7objgg76rrhue6sbxzkih7n2ycl3tfq4lq",
    "uri": "at://did:plc:25rdn5elo5izoxrmtis34zuk/app.bsky.feed.post/3mpcuhpysamy2"
  },
  "coverImage": {
    "$type": "blob",
    "ref": {
      "$link": "bafkreicd7ocqjag5cdvv5i2swuavrg3uodvlbk6j2w5xc3c2pxglriy3am"
    },
    "mimeType": "image/webp",
    "size": 496698
  },
  "path": "/jubinsoni/building-an-ai-agent-that-responds-to-real-time-events-with-aws-bedrock-kinesis-dynamodb-and-s3-20d4",
  "publishedAt": "2026-06-28T01:17:32.000Z",
  "site": "https://dev.to",
  "tags": [
    "aws",
    "bedrock",
    "kinesis",
    "dynamodb",
    "Amazon Kinesis Data Streams Developer Guide",
    "Amazon Kinesis Data Firehose Developer Guide",
    "Amazon Bedrock Agent Runtime — Invoke Agent API",
    "AWS Lambda — Using AWS Lambda with Amazon Kinesis",
    "Amazon DynamoDB — Time to Live (TTL)",
    "Amazon S3 — Best practices for event-driven architectures",
    "Building Agents with Amazon Bedrock",
    "Event-Driven Architecture on AWS — Whitepaper"
  ],
  "textContent": "Most recommendation systems are batch jobs. They crunch last night's data, write a recommendations table, and serve it all day. That works fine until your user watches three thriller movies in a row at 9pm and your system is still recommending rom-coms because the batch hasn't run yet.\n\nIn this post I'll walk through building an agent system that reacts to streaming user behavior in real time using:\n\n  * **Amazon Kinesis** to ingest and route events\n  * **AWS Lambda** to process, enrich, and trigger reasoning\n  * **Amazon Bedrock** as the reasoning and recommendation layer\n  * **DynamoDB** to store user profiles and recommendation cache\n  * **S3** for raw event archiving and model artifacts\n\n\n\nBy the end you'll have an architecture where a user's recommendation set updates within seconds of their behavior changing.\n\n##  Architecture Overview\n\nThe system has three layers:\n\nLayer | Services | Responsibility\n---|---|---\n**Ingest** | Kinesis Data Streams, Kinesis Firehose | Capture and fan-out user events\n**Process & Reason** | Lambda, Amazon Bedrock Agent | Enrich events, build context, invoke LLM\n**Store & Serve** | DynamoDB, S3 | Persist profiles, cache recs, store artifacts\n\nThe key design decision is keeping the hot path (Kinesis → Lambda → Bedrock → DynamoDB) fully async and the serving path (API → DynamoDB cache) completely decoupled. The user never waits for Bedrock to respond; they get the last cached recommendation set while a fresh one is already being computed in the background.\n\n##  Event Flow\n\nHere's what happens end to end when a user clicks on a product:\n\n  1. The app publishes a `user.interaction` event to **Kinesis Data Streams**\n  2. Kinesis fans the event out to two consumers: **Lambda Processor** and **Kinesis Firehose**\n  3. Firehose archives the raw event to **S3** (cheap, durable, great for retraining later)\n  4. Lambda enriches the event with user history from **DynamoDB User Profiles** , then invokes the **Bedrock Agent**\n  5. The Bedrock Agent reasons over the enriched context (recent events + profile + item catalog embeddings from S3) and writes a fresh recommendation set to **DynamoDB Rec Cache**\n  6. The client app reads recommendations from the cache via a lightweight **Lambda API** — no Bedrock latency in the hot path\n\n\n\n##  Code: Publishing Events to Kinesis\n\nThis is your app-side producer. Keep it thin — just serialize and publish. Do all enrichment downstream.\n\n\n\n    import boto3\n    import json\n    import uuid\n    from datetime import datetime, timezone\n\n    kinesis = boto3.client('kinesis', region_name='us-east-1')\n\n    def publish_interaction(user_id: str, item_id: str, event_type: str, metadata: dict = {}):\n        \"\"\"\n        Publish a user interaction event to Kinesis Data Streams.\n        Partition key is user_id so all events for a user land on the same shard.\n        \"\"\"\n        event = {\n            'event_id':   str(uuid.uuid4()),\n            'user_id':    user_id,\n            'item_id':    item_id,\n            'event_type': event_type,          # 'click', 'purchase', 'dwell', 'skip'\n            'timestamp':  datetime.now(timezone.utc).isoformat(),\n            'metadata':   metadata,\n        }\n\n        response = kinesis.put_record(\n            StreamName='user-interactions',\n            Data=json.dumps(event).encode('utf-8'),\n            PartitionKey=user_id,              # consistent routing per user\n        )\n\n        return response['SequenceNumber']\n\n\n    # Example call\n    publish_interaction(\n        user_id='u_8821',\n        item_id='prod_thriller_042',\n        event_type='purchase',\n        metadata={'price': 14.99, 'category': 'thriller', 'session_id': 'sess_xyz'}\n    )\n\n\n> **Tip:** Use `user_id` as the partition key so all events for a given user land on the same shard and arrive in order. This matters when Lambda is building a recency-ordered event window.\n\n##  Code: Lambda Processor — Enrich and Invoke Bedrock\n\nThis is the core of the pipeline. The Lambda reads from the Kinesis stream, pulls user context from DynamoDB, and invokes the Bedrock Agent with a structured prompt.\n\n\n\n    import boto3\n    import json\n    import os\n    from datetime import datetime, timezone\n\n    dynamodb  = boto3.resource('dynamodb')\n    bedrock   = boto3.client('bedrock-agent-runtime', region_name='us-east-1')\n\n    profiles_table = dynamodb.Table(os.environ['PROFILES_TABLE'])   # DynamoDB User Profiles\n    rec_table      = dynamodb.Table(os.environ['REC_CACHE_TABLE'])  # DynamoDB Rec Cache\n\n    AGENT_ID      = os.environ['BEDROCK_AGENT_ID']\n    AGENT_ALIAS   = os.environ['BEDROCK_AGENT_ALIAS']\n    MAX_HISTORY   = 20  # last N events to include in context\n\n\n    def handler(event, context):\n        for record in event['Records']:\n            # Kinesis payload is base64-encoded\n            payload = json.loads(record['kinesis']['data'])\n            process_event(payload)\n\n\n    def process_event(payload: dict):\n        user_id  = payload['user_id']\n        item_id  = payload['item_id']\n        evt_type = payload['event_type']\n\n        # 1. Fetch user profile + recent history from DynamoDB\n        response = profiles_table.get_item(Key={'user_id': user_id})\n        profile  = response.get('Item', {'user_id': user_id, 'history': [], 'preferences': {}})\n\n        # 2. Append current event and trim to window\n        profile['history'].append({\n            'item_id':    item_id,\n            'event_type': evt_type,\n            'timestamp':  payload['timestamp'],\n            'metadata':   payload.get('metadata', {}),\n        })\n        profile['history'] = profile['history'][-MAX_HISTORY:]\n\n        # 3. Write enriched profile back\n        profiles_table.put_item(Item=profile)\n\n        # 4. Build prompt for Bedrock Agent\n        prompt = build_personalization_prompt(profile)\n\n        # 5. Invoke Bedrock Agent\n        agent_response = bedrock.invoke_agent(\n            agentId=AGENT_ID,\n            agentAliasId=AGENT_ALIAS,\n            sessionId=user_id,           # session per user keeps conversational context\n            inputText=prompt,\n        )\n\n        # 6. Parse streaming response chunks\n        recommendations = parse_agent_response(agent_response)\n\n        # 7. Write to recommendation cache\n        rec_table.put_item(Item={\n            'user_id':         user_id,\n            'recommendations': recommendations,\n            'generated_at':    datetime.now(timezone.utc).isoformat(),\n            'ttl':             int(datetime.now(timezone.utc).timestamp()) + 3600,  # 1hr TTL\n        })\n\n\n    def build_personalization_prompt(profile: dict) -> str:\n        history_summary = '\\n'.join([\n            f\"- [{e['event_type'].upper()}] item={e['item_id']} category={e['metadata'].get('category','unknown')}\"\n            for e in profile['history'][-10:]\n        ])\n        return f\"\"\"You are a real-time personalization agent.\n\n    User profile: {json.dumps(profile.get('preferences', {}))}\n\n    Recent interactions (most recent last):\n    {history_summary}\n\n    Based on this behavior, return exactly 5 personalized item recommendations as a JSON array.\n    Each item must include: item_id, category, reasoning (1 sentence), confidence_score (0-1).\n    Return only valid JSON. No explanation outside the JSON block.\"\"\"\n\n\n    def parse_agent_response(agent_response) -> list:\n        full_text = ''\n        for event in agent_response['completion']:\n            if 'chunk' in event:\n                full_text += event['chunk']['bytes'].decode('utf-8')\n        try:\n            # Extract JSON from response\n            start = full_text.index('[')\n            end   = full_text.rindex(']') + 1\n            return json.loads(full_text[start:end])\n        except (ValueError, json.JSONDecodeError):\n            return []\n\n\n##  Code: Serving Recommendations via Lambda API\n\nThe serving layer never touches Bedrock. It reads purely from the DynamoDB cache, keeping p99 latency well under 10ms.\n\n\n\n    import boto3\n    import json\n    import os\n    from datetime import datetime, timezone\n\n    dynamodb  = boto3.resource('dynamodb')\n    rec_table = dynamodb.Table(os.environ['REC_CACHE_TABLE'])\n\n    FALLBACK_RECS = ['popular_001', 'popular_002', 'popular_003']  # cold-start fallback\n\n\n    def handler(event, context):\n        user_id = event['pathParameters']['userId']\n\n        response = rec_table.get_item(Key={'user_id': user_id})\n        item     = response.get('Item')\n\n        if not item:\n            # Cold start: user has no history yet\n            return api_response(200, {\n                'user_id':         user_id,\n                'recommendations': FALLBACK_RECS,\n                'source':          'fallback',\n                'generated_at':    None,\n            })\n\n        age_seconds = (\n            datetime.now(timezone.utc) -\n            datetime.fromisoformat(item['generated_at'])\n        ).total_seconds()\n\n        return api_response(200, {\n            'user_id':         user_id,\n            'recommendations': item['recommendations'],\n            'source':          'cache',\n            'generated_at':    item['generated_at'],\n            'cache_age_sec':   int(age_seconds),\n        })\n\n\n    def api_response(status: int, body: dict) -> dict:\n        return {\n            'statusCode': status,\n            'headers': {\n                'Content-Type':                'application/json',\n                'Access-Control-Allow-Origin': '*',\n            },\n            'body': json.dumps(body),\n        }\n\n\n##  Service Comparison: Why Each AWS Service?\n\nService | Why it's here | Alternative considered\n---|---|---\n**Kinesis Data Streams** | Ordered, replayable, millisecond latency fan-out | SQS (no ordering guarantee per user), EventBridge (higher latency)\n**Kinesis Firehose** | Zero-code delivery to S3 for archiving | Writing to S3 directly in Lambda (adds failure surface)\n**Lambda** | Event-driven, scales to 0, tight Kinesis integration | ECS Fargate (overkill for stateless enrichment)\n**Amazon Bedrock** | Managed LLM with agent runtime, no infra to maintain | Self-hosted model on SageMaker (more control, much more ops)\n**DynamoDB** | Single-digit ms reads, TTL support, scales automatically | RDS (too slow for hot path), ElastiCache (extra cost for separate store)\n**S3** | Cheap durable archive + model artifact store | DynamoDB for raw events (expensive and unnecessary)\n\n##  Things to Watch in Production\n\n**Bedrock latency is variable.** Claude Sonnet typically responds in 1-4 seconds but can spike. Since recs are written async to cache, this doesn't affect user-facing latency, but it does affect freshness. Monitor `bedrock:InvokeAgent` duration in CloudWatch.\n\n**Kinesis shard scaling.** One shard handles 1MB/s write or 1000 records/s. At 10k active users you'll need to plan shard count carefully. Use Enhanced Fan-Out if you have multiple Lambda consumers reading the same stream.\n\n**DynamoDB TTL for cache eviction.** The serving Lambda sets a 1-hour TTL on each rec entry. If Bedrock hasn't updated the cache in over an hour (e.g. Lambda errors), users fall back to the popular items list. Adjust TTL based on how stale you can tolerate.\n\n**Cold start users.** New users have no history so the Bedrock prompt has nothing useful to reason over. I recommend a popularity-based fallback as shown in the serving Lambda, and switching to personalized recs after the user's first 3-5 interactions.\n\n##  Wrapping Up\n\nThe pattern here is worth generalizing: keep the reasoning layer (Bedrock) fully off the hot serving path. Write results to a fast cache (DynamoDB), serve from the cache, and let the agent pipeline update it continuously in the background. This gives you the intelligence of an LLM-powered agent without the latency of one.\n\nThe same pattern applies to fraud scoring, content moderation queues, ops alerting — anywhere you need a reasoning system that reacts to real-time streams without blocking the user experience.\n\n##  References\n\n  * Amazon Kinesis Data Streams Developer Guide\n  * Amazon Kinesis Data Firehose Developer Guide\n  * Amazon Bedrock Agent Runtime — Invoke Agent API\n  * AWS Lambda — Using AWS Lambda with Amazon Kinesis\n  * Amazon DynamoDB — Time to Live (TTL)\n  * Amazon S3 — Best practices for event-driven architectures\n  * Building Agents with Amazon Bedrock\n  * Event-Driven Architecture on AWS — Whitepaper\n\n",
  "title": "Building an AI Agent That Responds to Real-Time Events with AWS Bedrock, Kinesis, DynamoDB, and S3"
}