Flow 是具有一些其他特性的函数:它们是强类型、可流式传输、本地和远程调用且完全可观察的函数。Firebase Genkit 提供 CLI 和开发者界面工具,用于处理流(运行、调试等)。

定义数据流

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

可以使用 zod 定义流的输入和输出架构。

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

指定架构后,Genkit 会验证输入和输出的架构。

正在运行的流

使用 runFlow 函数运行数据流:

const response = await runFlow(menuSuggestionFlow, 'French');

您也可以使用 CLI 运行流:

genkit flow:run menuSuggestionFlow '"French"'

直播

下面是一个简单的数据流示例,它可以流式传输来自数据流的值:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

请注意,streamingCallback 可以未定义。只有当调用的客户端请求流式响应时,系统才会定义它。

如需在流处理模式下调用流,请使用 streamFlow 函数:

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

如果流程未实现流式传输,streamFlow 的行为将与 runFlow 相同。

您还可以使用 CLI 流式传输流:

genkit flow:run menuSuggestionFlow '"French"' -s

部署流

如果您希望能够通过 HTTP 访问流程,则需要先部署该流程。Genkit 支持 Cloud Functions for Firebase 和 Express.js 主机(例如 Cloud Run)。

部署的流支持与本地流相同的所有功能(例如流处理和可观测性)。

Cloud Functions for Firebase

如需将流与 Cloud Functions for Firebase 搭配使用,请使用 firebase 插件,请将 defineFlow 替换为 onFlow 并添加 authPolicy

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Express.js

如需使用 Cloud Run 和类似服务部署流,请使用 defineFlow 定义流,然后调用 startFlowsServer()

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

默认情况下,startFlowsServer 将处理您在代码库中定义为 HTTP 端点(例如 http://localhost:3400/menuSuggestionFlow)的所有流。

您可以选择通过流服务器公开哪些流。您可以指定自定义端口(它会使用 PORT 环境变量(如果已设置)。您还可以设置 CORS 设置。

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

数据流可观测性

有时,当使用未针对可观测性进行插桩的第三方 SDK 时,您可能希望在开发者界面中将这些 SDK 视为单独的跟踪步骤。您只需将代码封装在 run 函数中即可。

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);