Os fluxos são funções com algumas outras características: são fortemente tipos, são de streaming, locais e remotamente chamáveis e totalmente observáveis. O Firebase Genkit oferece CLI e ferramentas de interface do desenvolvedor para trabalhar com fluxos (em execução, depuração etc.).
Como definir fluxos
import { defineFlow } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(restaurantTheme);
return suggestion;
}
);
Esquemas de entrada e saída para fluxos podem ser definidos usando zod
.
import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
inputSchema: z.string(),
outputSchema: z.string(),
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(input.restaurantTheme);
return suggestion;
}
);
Quando o esquema é especificado, o Genkit o valida para entradas e saídas.
Fluxos em execução
Use a função runFlow
para executar o fluxo:
const response = await runFlow(menuSuggestionFlow, 'French');
Também é possível usar a CLI para executar fluxos:
genkit flow:run menuSuggestionFlow '"French"'
Transmitido
Veja um exemplo simples de um fluxo que pode transmitir valores de um fluxo:
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
streamSchema: z.string(),
},
async (restaurantTheme, streamingCallback) => {
if (streamingCallback) {
makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
streamingCallback(suggestion);
});
}
}
);
Observe que streamingCallback
pode ser indefinido. Ele só é definido se o
cliente de invocação estiver solicitando resposta de streaming.
Para invocar um fluxo no modo de streaming, use a função streamFlow
:
const response = streamFlow(menuSuggestionFlow, 'French');
for await (const suggestion of response.stream()) {
console.log('suggestion', suggestion);
}
Se o fluxo não implementar o streaming, o streamFlow
se comportará de maneira idêntica a runFlow
.
Também é possível usar a CLI para transmitir fluxos:
genkit flow:run menuSuggestionFlow '"French"' -s
Como implantar fluxos
Se você quiser acessar o fluxo por HTTP, precisará implantá-lo primeiro. O Genkit oferece integrações para hosts do Cloud Functions para Firebase e Express.js, como o Cloud Run.
Os fluxos implantados são compatíveis com os mesmos recursos que os fluxos locais, como streaming e observabilidade.
Cloud Functions para Firebase
Para usar fluxos com o Cloud Functions para Firebase, use o plug-in firebase
, substitua defineFlow
por onFlow
e inclua um authPolicy
.
import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';
export const menuSuggestionFlow = onFlow(
{
name: 'menuSuggestionFlow',
authPolicy: firebaseAuth((user) => {
if (!user.email_verified) {
throw new Error("Verified email required to run flow");
}
}
},
async (restaurantTheme) => {
// ....
}
);
Express.js
Para implantar fluxos usando o Cloud Run e serviços semelhantes, defina seus fluxos usando defineFlow
e, em seguida, chame startFlowsServer()
:
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
// ....
}
);
startFlowsServer();
Por padrão, startFlowsServer
atenderá a todos os fluxos definidos na base de código como endpoints HTTP (por exemplo, http://localhost:3400/menuSuggestionFlow
).
É possível escolher quais fluxos são expostos pelo servidor de fluxos. Especifique uma porta personalizada. Ela usará a variável de ambiente PORT
, se definida. Também é possível definir as configurações de CORS.
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
// ....
});
export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
// ....
});
startFlowsServer({
flows: [flowB],
port: 4567,
cors: {
origin: '*',
},
});
Observabilidade do fluxo
Às vezes, ao usar SDKs de terceiros que não estão instrumentados para observabilidade, pode ser que você queira vê-los como uma etapa de rastreamento separada na interface do desenvolvedor. Tudo o que você precisa fazer é unir o código na função run
.
import { defineFlow, run } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
outputSchema: z.array(s.string()),
},
async (restaurantTheme) => {
const themes = await run('find-similar-themes', async () => {
return await findSimilarRestaurantThemes(restaurantTheme);
});
const suggestions = makeMenuItemSuggestions(themes);
return suggestions;
}
);