Fluxos

Os fluxos são funções com algumas outras características: são fortemente tipos, são de streaming, locais e remotamente chamáveis e totalmente observáveis. O Firebase Genkit oferece CLI e ferramentas de interface do desenvolvedor para trabalhar com fluxos (em execução, depuração etc.).

Como definir fluxos

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

Esquemas de entrada e saída para fluxos podem ser definidos usando zod.

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

Quando o esquema é especificado, o Genkit o valida para entradas e saídas.

Fluxos em execução

Use a função runFlow para executar o fluxo:

const response = await runFlow(menuSuggestionFlow, 'French');

Também é possível usar a CLI para executar fluxos:

genkit flow:run menuSuggestionFlow '"French"'

Transmitido

Veja um exemplo simples de um fluxo que pode transmitir valores de um fluxo:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

Observe que streamingCallback pode ser indefinido. Ele só é definido se o cliente de invocação estiver solicitando resposta de streaming.

Para invocar um fluxo no modo de streaming, use a função streamFlow:

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

Se o fluxo não implementar o streaming, o streamFlow se comportará de maneira idêntica a runFlow.

Também é possível usar a CLI para transmitir fluxos:

genkit flow:run menuSuggestionFlow '"French"' -s

Como implantar fluxos

Se você quiser acessar o fluxo por HTTP, precisará implantá-lo primeiro. O Genkit oferece integrações para hosts do Cloud Functions para Firebase e Express.js, como o Cloud Run.

Os fluxos implantados são compatíveis com os mesmos recursos que os fluxos locais, como streaming e observabilidade.

Cloud Functions para Firebase

Para usar fluxos com o Cloud Functions para Firebase, use o plug-in firebase, substitua defineFlow por onFlow e inclua um authPolicy.

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Express.js

Para implantar fluxos usando o Cloud Run e serviços semelhantes, defina seus fluxos usando defineFlow e, em seguida, chame startFlowsServer():

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

Por padrão, startFlowsServer atenderá a todos os fluxos definidos na base de código como endpoints HTTP (por exemplo, http://localhost:3400/menuSuggestionFlow).

É possível escolher quais fluxos são expostos pelo servidor de fluxos. Especifique uma porta personalizada. Ela usará a variável de ambiente PORT, se definida. Também é possível definir as configurações de CORS.

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

Observabilidade do fluxo

Às vezes, ao usar SDKs de terceiros que não estão instrumentados para observabilidade, pode ser que você queira vê-los como uma etapa de rastreamento separada na interface do desenvolvedor. Tudo o que você precisa fazer é unir o código na função run.

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);