I flussi sono funzioni aggregate con alcune caratteristiche aggiuntive rispetto alle chiamate: sono altamente tipiche, riproducibili in streaming, richiamabili localmente e da remoto completamente osservabile. Firebase Genkit fornisce strumenti di interfaccia a riga di comando e UI degli sviluppatori per l'esecuzione e il debug dei flussi.
Definizione dei flussi
Nella sua forma più semplice, un flusso aggrega una funzione:
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
})
In questo modo puoi eseguire la funzione dall'interfaccia a riga di comando di Genkit e dall'interfaccia utente per gli sviluppatori ed è un requisito per molte funzionalità di Genkit, tra cui il deployment e l'osservabilità.
Un importante vantaggio dei flussi Genkit rispetto alla chiamata diretta di un'API del modello è
la sicurezza dei tipi di ingressi e uscite.
I tipi di argomenti e risultati di un flusso possono essere valori semplici o strutturati.
Genkit genererà schemi JSON per questi valori utilizzando
invopop/jsonschema
Il seguente flusso prende un string
come input e restituisce un struct
:
type MenuSuggestion struct {
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
Flussi in esecuzione
Per eseguire un flusso nel codice:
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
Puoi utilizzare l'interfaccia a riga di comando anche per eseguire i flussi:
genkit flow:run menuSuggestionFlow '"French"'
Trasmesso in streaming
Ecco un semplice esempio di flusso che può trasmettere valori:
// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme InputType,
callback func(context.Context, StreamType) error,
) (OutputType, error) {
var menu strings.Builder
menuChunks := make(chan StreamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return OutputType(menu.String()), nil
},
)
Tieni presente che il callback di flusso può essere indefinito. Viene definito solo se il client invoking richiede una risposta in modalità flusso.
Per richiamare un flusso in modalità flusso di dati:
menuSuggestionFlow.Stream(
context.Background(),
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
if err != nil {
// handle err
return false
}
if !sfv.Done {
fmt.Print(sfv.Stream)
return true
} else {
fmt.Print(sfv.Output)
return false
}
})
Se il flusso non implementa i flussi di dati, StreamFlow()
si comporta in modo identico a
RunFlow()
.
Puoi utilizzare l'interfaccia a riga di comando anche per trasmettere flussi di flussi:
genkit flow:run menuSuggestionFlow '"French"' -s
Deployment dei flussi
Se vuoi poter accedere al flusso tramite HTTP, devi prima eseguirlo.
Per eseguire il deployment di flussi utilizzando Cloud Run e servizi simili, definisci i flussi.
quindi chiama Init()
:
func main() {
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
return "", nil
},
)
if err := genkit.Init(context.Background(), nil); err != nil {
log.Fatal(err)
}
}
Init
avvia un server net/http
che espone i tuoi flussi come endpoint HTTP (ad esempio http://localhost:3400/menuSuggestionFlow
).
Il secondo parametro è un Options
facoltativo che specifica quanto segue:
FlowAddr
: indirizzo e porta da ascoltare. Se non specificato, il server rimane in ascolto sulla porta specificata dalla variabile di ambiente PORT; Se è vuoto, utilizza il valore predefinito della porta 3400.Flows
: quali flussi pubblicare. Se non specificato,Init
pubblica tutti i tuoi flussi definiti.
Se vuoi gestire flussi sullo stesso host e sulla stessa porta di altri endpoint,
puoi impostare FlowAddr
su -
e chiamare invece NewFlowServeMux()
per ottenere un gestore
per i flussi Genkit, che puoi multiplexare con gli altri gestori di route:
mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))
Osservabilità del flusso
A volte, quando si utilizzano SDK di terze parti che non sono strumenti per l'osservabilità,
ti consigliamo di vederli come passaggio di traccia separato nell'interfaccia utente per sviluppatori. Devi solo inserire il codice nella funzione run
.
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
return "", nil
})
// ...
return themes, err
})