流量數

資料流是具有某些額外特性的函式:其具有強類型、可串流、本機和遠端呼叫,且可完整觀測。Firebase Genkit 提供 CLI 和開發人員 UI 工具,以處理流程 (執行、偵錯等)。

定義流程

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

使用 zod 可以定義流程的輸入和輸出結構定義。

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

指定結構定義後,Genkit 就會驗證輸入和輸出的結構定義。

執行中的流程

使用 runFlow 函式執行資料流:

const response = await runFlow(menuSuggestionFlow, 'French');

此外,您也可以使用 CLI 執行流程:

genkit flow:run menuSuggestionFlow '"French"'

直播結束

以下的簡單範例可從資料流串流值:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

請注意,streamingCallback 可以未定義。只有在叫用用戶端要求串流回應時,才需定義此項目。

如要在串流模式下叫用資料流,請使用 streamFlow 函式:

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

如果資料流未實作串流,streamFlow 的行為會與 runFlow 相同。

此外,您也可以使用 CLI 來串流資料流:

genkit flow:run menuSuggestionFlow '"French"' -s

部署流程

如果您想透過 HTTP 存取流程,請先部署流程。Genkit 可整合 Cloud Functions for Firebase 和 Express.js 主機 (例如 Cloud Run)。

已部署的流程支援與本機流程相同的所有功能 (例如串流和觀測能力)。

Firebase 專用 Cloud 函式

如要將流程與 Cloud Functions for Firebase 搭配使用,請使用 firebase 外掛程式,請將 defineFlow 替換為 onFlow 並加入 authPolicy

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Express.js

如要使用 Cloud Run 和類似服務部署流程,請使用 defineFlow 定義資料流,然後呼叫 startFlowsServer()

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

根據預設,startFlowsServer 會提供您在程式碼集定義的所有資料流,做為 HTTP 端點 (例如 http://localhost:3400/menuSuggestionFlow)。

您可以選擇要透過流程伺服器公開的流程。您可以指定自訂通訊埠 (如果設定,系統會使用 PORT 環境變數)。你也可以配置 CORS 設定。

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

流程觀測能力

有時候,使用沒有檢測功能檢測的第三方 SDK 時,您可能會想在開發人員 UI 中將這些 SDK 顯示為獨立的追蹤步驟。只需將程式碼納入 run 函式即可。

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);