Flujos

Los flujos son funciones con algunas características adicionales: son funciones muy se puede escribir, se puede transmitir, se puede llamar de forma local y remota, y es totalmente observable. Firebase Genkit proporciona herramientas de CLI y de IU para desarrolladores para trabajar con flujos (ejecución, depuración, etc.).

Define flujos

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

Los esquemas de entrada y salida de los flujos se pueden definir con zod.

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

Cuando se especifica el esquema, Genkit validará el esquema para las entradas y salidas.

Flujos en ejecución

Usa la función runFlow para ejecutar el flujo:

const response = await runFlow(menuSuggestionFlow, 'French');

También puedes usar la CLI para ejecutar flujos:

genkit flow:run menuSuggestionFlow '"French"'

Transmitido

El siguiente es un ejemplo simple de un flujo que puede transmitir valores desde un flujo:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

Ten en cuenta que streamingCallback puede no estar definido. Solo se define si el cliente que invoca solicita una respuesta transmitida.

Para invocar un flujo en modo de transmisión, usa la función streamFlow:

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

Si el flujo no implementa la transmisión, streamFlow se comportará de manera idéntica a runFlow.

También puedes usar la CLI para transmitir flujos:

genkit flow:run menuSuggestionFlow '"French"' -s

Implementa flujos

Si quieres acceder a tu flujo a través de HTTP, deberás implementarlo antes de empezar. Genkit ofrece integraciones para Cloud Functions para Firebase y Hosts de Express.js, como Cloud Run.

Los flujos implementados admiten las mismas funciones que los flujos locales (como la transmisión y la observabilidad).

Función de Cloud Functions para Firebase

Para usar flujos con Cloud Functions para Firebase, usa el complemento firebase, reemplaza defineFlow por onFlow y, luego, incluye un authPolicy.

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Express.js

Para implementar flujos con Cloud Run y servicios similares, define tus flujos con defineFlow y, luego, llama a startFlowsServer():

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

De forma predeterminada, startFlowsServer publicará todos los flujos que definiste en tu base de código como extremos HTTP (p. ej., http://localhost:3400/menuSuggestionFlow). Puedes llamar a un flujo a través de una solicitud POST de la siguiente manera:

curl -X POST "http://localhost:3400/menuSuggestionFlow" -H "Content-Type: application/json"  -d '{"data": "banana"}'

Si es necesario, puedes personalizar el servidor de flujos para que entregue una lista específica de flujos, como se muestra a continuación. También puedes especificar un puerto personalizado (usará la variable de entorno PORT si está configurada) o especificar la configuración de CORS.

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

Observabilidad del flujo

A veces, cuando se usan SDKs de terceros que no están instrumentados para la observabilidad, es posible que quieras verlos como un paso de registro separado en la IU para desarrolladores. Todo lo que debes hacer es unir el código en la función run.

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);