AIアプリケーションにおけるストリーミング処理の実装

はじめに

生成AIを使用したアプリケーションでは、長文のレスポンスを処理する際にメモリ消費やタイムアウトの問題が発生することがあります。この記事では、ストリーミング処理を実装することで、これらの問題を解決する方法を解説します。

ストリーミング処理とは

ストリーミング処理は、大量のデータを小さな単位(チャンク)に分けて順次処理する方法です。従来の一括処理と比較して、以下のような利点があります:

  • メモリ使用量の削減
  • リアルタイムな処理が可能
  • 大規模データの効率的な処理

実装例

基本的なストリーミング処理

// 従来の一括処理
const response = await model.generateContent(prompt);
const fullText = response.text();

// ストリーミング処理
const stream = await model.generateContentStream(prompt);
for await (const chunk of stream.stream) {
    const partialText = chunk.text();
    // チャンク単位で処理
}
従来の処理 vs ストリーミング処理 従来の処理 Server Client Complete Response ストリーミング処理 Server Client

主要な利点

  • メモリ効率:データを小さな単位で処理
  • リアルタイム性:データを受信次第処理可能
  • スケーラビリティ:大量データの効率的な処理
ストリーミング処理の流れ 1 リクエスト 2 ストリーム開始 3 チャンク処理 4 完了 データストリーム

エラーハンドリングの実装

async function generateWithRetry(prompt, maxRetries = 3) {
    for (let i = 0; i < maxRetries; i++) {
        try {
            const stream = await model.generateContentStream(prompt);
            let response = '';
            
            for await (const chunk of stream.stream) {
                response += chunk.text();
            }
            
            return response;
        } catch (error) {
            if (i === maxRetries - 1) throw error;
            await new Promise(resolve => 
                setTimeout(resolve, 1000 * (i + 1))
            );
        }
    }
}
エラーハンドリングフロー チャンク1 チャンク2 エラー 再試行 エラー発生時の処理: 1. エラーを検出 2. 再試行メカニズムを起動

フロントエンドとの連携

// バックエンド(Express)
app.post("/format", async (req, res) => {
    const stream = await model.generateContentStream(prompt);
    res.setHeader('Content-Type', 'text/plain');
    res.setHeader('Transfer-Encoding', 'chunked');
    
    for await (const chunk of stream.stream) {
        res.write(chunk.text());
    }
    res.end();
});

// フロントエンド
const response = await fetch('/format', {
    method: 'POST',
    body: JSON.stringify({ prompt: userPrompt })
});

const reader = response.body.getReader();
while (true) {
    const {done, value} = await reader.read();
    if (done) break;
    updateUI(new TextDecoder().decode(value));
}

実装のポイント

チャンクサイズの最適化

const CHUNK_SIZE = 1024; // 1KB単位
let processedSize = 0;

for await (const chunk of stream.stream) {
    processedSize += chunk.text().length;
    if (processedSize >= CHUNK_SIZE) {
        await processChunk();
        processedSize = 0;
    }
}
// バックエンド(Express)
app.post("/format", async (req, res) => {
    const stream = await model.generateContentStream(prompt);
    
    res.setHeader('Content-Type', 'text/plain');
    res.setHeader('Transfer-Encoding', 'chunked');
    
    for await (const chunk of stream.stream) {
        res.write(chunk.text());
    }
    res.end();
});

// フロントエンド
const response = await fetch('/format', {
    method: 'POST',
    body: JSON.stringify({ prompt: userPrompt })
});

const reader = response.body.getReader();
let result = '';

while (true) {
    const {done, value} = await reader.read();
    if (done) break;
    result += new TextDecoder().decode(value);
    updateUI(result); // UIをリアルタイムで更新
}