{
  "$type": "com.whtwnd.blog.entry",
  "theme": "github-light",
  "title": "Tapで遊んでみた",
  "content": "この記事は [Bluesky / ATProtocol Advent Calendar 2025](https://adventar.org/calendars/12255) の14日目です。\n\n<h1>Tapとは</h1>\n\n最近、公式から [Tap](https://docs.bsky.app/blog/introducing-tap)  という仕組みがリリースされました。どういうときに使うの?ということに対して、下記のようなことが示唆されています。\n\n- Automatic historical backfill when tracking new repositories\n- Native webhook support for serverless architectures\n- Full network backfill and mirroring capabilities for research and analysis\n- Guaranteed delivery with acknowledgement mode\n- For tracking specific subsets of repositories without needing to process the full firehose\n\nユースケースとしてパッと思いつくのは **Full network backfill**でしょうか\n\n<h2>Full network backfill</h2>\n\nATProtoの公式リレーは特定のタイミングまで通過したデータを全て保持していましたが、特定のタイミングからそれを停止しました。\n\nこれにより、ATProtoの世界で過去データを正しく拾うには、PDSに問い合わせして取る必要が出てきました。総量が見えている公式PDSだけでも結構面倒なのに、セルフホストPDSを含めるとさらにハードルが上がります。\n\nここらへんをいい感じに処理してくれるのがTapということのようです。ですので、新しいコレクションを作ってこれから運営するケースなどの過去のデータを気にしなくていい場合や、そこまでの厳密性を求められない場合においては特に出番はないと思います。\n\nNative webhook support for serverless architectures(サーバーレスアプリケーション向けのWebhook)も若干気になりますが、Tapを動かすサーバーがいずれにせよ必要になるとこから限定的なのではという気もしますが有識者の判断に任せたいと思います。\n\n<h1>早速動かしてみた</h1>\n\n`注意` Tap自体が最近リリースされたものなので、大いに間違っていると思います。この記事はあくまでも取っ掛かりとして活用していただければ幸いです。\n\n公式でも警告されている**TAP_FULL_NETWORK=true**で動かしてみました。SQLiteだと即厳しい状態に陥るので、ポスグレにご登場いただきながら**docker-compose.yml**を作りました。なお、**TAP_COLLECTION_FILTERS**で特定のコレクションに絞った場合は未検証です。\n\nなお、**TAP_FULL_NETWORK=true**で動作させる場合に限るとは思いますが、Tapのプロセスは最低でも6GBはあった方が良さそうです。yamlはSQLiteで四苦八苦していた名残で16GBを最大値にしています\n\nそれっぽいパラメータチューニングしてそうなコメントがありますが、ChatGPTさんが記載したものなので、このまま使うかどうかは皆さまにて判断いただければと思います。\n\n```\nservices:\n  postgres:\n    image: postgres:16\n    container_name: tap-postgres\n    restart: unless-stopped\n    environment:\n      POSTGRES_DB: tap\n      POSTGRES_USER: tap\n      POSTGRES_PASSWORD: tap\n    volumes:\n      - ./pg-data:/var/lib/postgresql/data\n    shm_size: 1gb\n    healthcheck:\n      test: [\"CMD-SHELL\", \"pg_isready -U tap\"]\n      interval: 10s\n      timeout: 5s\n      retries: 5\n\n  tap:\n    image: ghcr.io/bluesky-social/indigo/tap:latest\n    container_name: tap\n    restart: unless-stopped\n    depends_on:\n      postgres:\n        condition: service_healthy\n    ports:\n      - \"2480:2480\"\n    environment:\n      # DB\n      TAP_DATABASE_URL: postgres://tap:tap@postgres:5432/tap?sslmode=disable\n\n      # ネットワーク全体を追う(重い)\n      TAP_FULL_NETWORK: \"true\"\n\n      # 並列はデフォルトに戻す(=指定しない)\n      # TAP_RESYNC_PARALLELISM: 5 (指定しない)\n\n      # 安定性寄り設定(推奨)\n      TAP_CURSOR_SAVE_INTERVAL: 2s\n      TAP_OUTBOX_CAPACITY: 200000\n      TAP_LOG_LEVEL: info\n\n      # メモリ節約系(地味に効く)\n      TAP_IDENT_CACHE_SIZE: 500000\n    mem_limit: 16g\n\n```\nこれで全力でデータを取りに行きます。コマンドで下記を実行します\n```\ndocker compose up -d\n```\nログを見るとデフォルトで **https://relay1.us-east.bsky.network** に繋ぎに行っているので、これからアカウントを割り出しながらPDSにデータをとりに行っているのではないかなという気がします。\n\n<h1>データを取得してみた</h1>\n\n起動しただけでは面白くないので、これを使って拙宅 [ATProto Dashboard](https://atpdashboard.usounds.work/) のバックフィルを試してみます。このダッシュボード、運営を開始した2025年1月17日以前のデータを取得していないので、ちょうどいいです。\n\n公式が [@atproto/tap](https://github.com/bluesky-social/atproto/blob/main/packages/tap/README.md) というライブラリを出しているので、これに全力で甘えることとします。\n\nなお、ここでのポスグレは、ATProto Dashboardのデータを溜めているポスグレを指し示します。要するに、Tapのためのポスグレとは直結する必要はなく、WebSocket経由でやり取りをします。\n\n[RecordEventの型定義](https://github.com/bluesky-social/atproto/blob/main/packages/tap/README.md#identityevent)を見るに、時間を示すものはありません。PDSのレコードを漁るために、データを作った時間はそのrecordがcreatedAtを持っていない限りは判定できないし、それも詐称される可能性を考慮し、バックフィルについては一律1900/1/1に発生したことにしました。\n```\nimport { Tap, SimpleIndexer } from '@atproto/tap'\nimport pg from 'pg';\nimport * as dotenv from 'dotenv';\nimport logger from './logger';\nimport PQueue from 'p-queue';\n\nconst queue = new PQueue({ concurrency: 1 });\n\ndotenv.config();\n\nconst { Pool } = pg;\n\nconst pool = new Pool({\n    user: process.env.PG_USER,\n    host: process.env.PG_HOST,\n    database: process.env.PG_DATABASE,\n    password: process.env.PG_PASSWORD,\n    port: process.env.PG_PORT,\n});\n\nconst client = await pool.connect();\n\nasync function main() {\n    const tap = new Tap('http://localhost:2480')\n    const indexer = new SimpleIndexer()\n\n    const BACKFILL_TIMEOUT = 30 * 60 * 1000 // 30分\n    const LOG_INTERVAL = 60 * 1000 // 1分\n\n    let backfillTimer: NodeJS.Timeout | null = null\n    let logInterval: NodeJS.Timeout | null = null\n    let lastBackfillEventTime = Date.now()\n\n    // タイマーリセット関数(live: false イベントでのみリセット)\n    const resetBackfillTimer = () => {\n        lastBackfillEventTime = Date.now()\n        if (backfillTimer) clearTimeout(backfillTimer)\n        backfillTimer = setTimeout(async () => {\n            logger.info('No live=false events for 30 minutes. Exiting...')\n            await queue.onIdle()\n            process.exit(0)\n        }, BACKFILL_TIMEOUT)\n    }\n\n    // 1分おきにバックフィル待機ログ(live: false イベントが対象)\n    logInterval = setInterval(() => {\n        const elapsed = Date.now() - lastBackfillEventTime\n        if (elapsed >= LOG_INTERVAL) {\n            logger.info('Waiting for live=false events...')\n        }\n    }, LOG_INTERVAL)\n\n    indexer.record(async (evt) => {\n        const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`\n\n        // live: false イベントならタイマーリセット\n        if (!evt.live) {\n            resetBackfillTimer()\n        }\n\n        // バックフィル対象は 3rd party コレクションのみ\n        if (\n            evt.action === 'create' &&\n            !evt.live &&\n            !(evt.collection.startsWith('app.bsky') || evt.collection.startsWith('chat.bsky'))\n        ) {\n            logger.info(`[BACKFILL] Detect Event ${evt.action} ${uri}`)\n\n            const createdAt = new Date('1900-01-01T00:00:00Z');\n\n            queue.add(async () => {\n                const checkQuery = `\n                    SELECT 1 \n                    FROM public.collection \n                    WHERE did = $1 AND collection = $2 AND rkey = $3\n                    LIMIT 1;\n                `;\n                const res = await client.query(checkQuery, [\n                    evt.did,\n                    evt.collection,\n                    evt.rkey,\n                ]);\n\n                if (res.rowCount === 0) {\n                    const insertQuery = `\n                        INSERT INTO public.collection (did, collection, rkey, \"createdAt\")\n                        VALUES ($1, $2, $3, $4);\n                    `;\n                    await client.query(insertQuery, [\n                        evt.did,\n                        evt.collection,\n                        evt.rkey,\n                        createdAt,\n                    ]);\n                    logger.info(`Insert successed: ${uri}`);\n                } else {\n                    logger.info(`Skipped existing: ${uri}`);\n                }\n            });\n        }\n    })\n\n    indexer.error((err) => {\n        logger.error(err)\n    })\n\n    const channel = tap.channel(indexer)\n    logger.info('Connecting to Tap...')\n    await channel.start()\n}\n\n\n\nmain().catch(console.error)\n\n```\n\nなお、現在、DID単位で3,540,150件のバックフィルを終わらせましたが、Tapのポスグレは11GBを突破しました。\n\nおそらくはTap自体にはrecord自体を溜め込む機能はないでしょうから、recordを溜め込む場合はその分のデータ容量が別途必要になると思います。\n\n<h1>終わりに</h1>\n\nここまでお読みいただきありがとうございました。明日は [marilさん](https://bsky.app/profile/maril445.bsky.social)  がご担当となります。アニメ視聴履歴を管理できるAniBlueやカスタム絵文字が使えるStellarを運営されていました。明日の記事もどうぞご覧ください!",
  "createdAt": "2025-12-13T15:00:20.437Z",
  "visibility": "public"
}