流程

Flow 是具有一些额外特性的函数:它们是强类型、可流式传输、可在本地和远程调用,并且可完全观测。Firebase Genkit 提供用于处理流程的 CLI 和开发者界面工具 (运行、调试等)。

定义 flow

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

可以使用 zod 定义流的输入和输出架构。

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

指定架构后,Genkit 会验证输入和输出的架构。

运行 flow

使用 runFlow 函数运行流程:

const response = await runFlow(menuSuggestionFlow, 'French');

您还可以使用 CLI 运行 flow:

genkit flow:run menuSuggestionFlow '"French"'

流式

下面是一个简单的数据流示例,该数据流可从数据流中流式传输值:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

请注意,可以未定义 streamingCallback。只有在调用客户端请求流式回答时才会定义。

如需在流处理模式下调用数据流,请使用 streamFlow 函数:

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

如果流未实现流式处理,则 streamFlow 的行为与 runFlow 相同。

您还可以使用 CLI 对 flow 进行流式处理:

genkit flow:run menuSuggestionFlow '"French"' -s

部署 flow

如果您希望能够通过 HTTP 访问 flow,则需要先部署 flow。Genkit 与 Cloud Functions for Firebase 和 Express.js 主机(例如 Cloud Run)。

已部署的流支持与本地流相同的所有功能(例如流式传输和 可观测性)。

Cloud Functions for Firebase

如需将流程与 Cloud Functions for Firebase 搭配使用,请使用 firebase 插件,将 defineFlow 替换为 onFlow,并添加 authPolicy

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Express.js

如需使用 Cloud Run 和类似服务部署 flow,请使用 defineFlow 定义 flow,然后调用 startFlowsServer()

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

默认情况下,startFlowsServer 会处理您在代码库中定义为 HTTP 端点(例如 http://localhost:3400/menuSuggestionFlow)的所有流程。您可以通过 POST 请求调用流程,如下所示:

curl -X POST "http://localhost:3400/menuSuggestionFlow" -H "Content-Type: application/json"  -d '{"data": "banana"}'

如果需要,您可以自定义流服务器以提供特定的流列表,如下所示。您还可以指定自定义端口(如果设置了 PORT 环境变量,则会使用该变量)或指定 CORS 设置。

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

Flow 可观测性

有时,在使用未针对可观测性进行插桩的第三方 SDK 时,您可能希望在开发者界面中将它们视为单独的跟踪步骤。您只需将代码封装在 run 函数中即可。

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);