Abläufe

Flows sind verpackte Funktionen mit einigen zusätzlichen Eigenschaften im Vergleich zu direkten Aufrufen: Sie sind stark typisiert, streambar, lokal und aus der Ferne aufrufbar und vollständig beobachtbar. Firebase Genkit bietet eine Befehlszeile und eine Entwickler-UI zum Ausführen und Entwickeln von Abläufen.

Datenflüsse definieren

In seiner einfachsten Form umschließt ein Ablauf nur eine Funktion:

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		suggestion := makeMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	})

So können Sie die Funktion über die Genkit-Befehlszeile und die Entwickler-Benutzeroberfläche ausführen. Dies ist eine Voraussetzung für viele Genkit-Funktionen, einschließlich Bereitstellung und Beobachtbarkeit.

Ein wichtiger Vorteil von Genkit-Abläufen gegenüber dem direkten Aufrufen einer Modell-API ist die Typsicherheit von Eingaben und Ausgaben. Die Argument- und Ergebnistypen eines Ablaufs können einfache oder strukturierte Werte sein. Genkit generiert JSON-Schemas für diese Werte mit invopop/jsonschema.

Im folgenden Ablauf wird ein string als Eingabe verwendet und ein struct ausgegeben:

type MenuSuggestion struct {
	ItemName    string `json:"item_name"`
	Description string `json:"description"`
	Calories    int    `json:"calories"`
}

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
		suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	},
)

Laufende Datenflüsse

So führen Sie einen Ablauf in Ihrem Code aus:

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

Sie können auch die Befehlszeile verwenden, um Abläufe auszuführen:

genkit flow:run menuSuggestionFlow '"French"'

Gestreamt

Hier ein einfaches Beispiel für einen Ablauf, der Werte streamen kann:

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string

menuSuggestionFlow := genkit.DefineStreamingFlow(
	"menuSuggestionFlow",
	func(
		ctx context.Context,
		restaurantTheme InputType,
		callback func(context.Context, StreamType) error,
	) (OutputType, error) {
		var menu strings.Builder
		menuChunks := make(chan StreamType)
		go makeFullMenuSuggestion(restaurantTheme, menuChunks)
		for {
			chunk, ok := <-menuChunks
			if !ok {
				break
			}
			if callback != nil {
				callback(context.Background(), chunk)
			}
			menu.WriteString(string(chunk))
		}
		return OutputType(menu.String()), nil
	},
)

Der Streaming-Callback kann nicht definiert sein. Sie wird nur definiert, wenn der aufrufende Client eine gestreamte Antwort anfordert.

So rufen Sie einen Ablauf im Streamingmodus auf:

menuSuggestionFlow.Stream(
	context.Background(),
	"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
	if err != nil {
		// handle err
		return false
	}
	if !sfv.Done {
		fmt.Print(sfv.Stream)
		return true
	} else {
		fmt.Print(sfv.Output)
		return false
	}
})

Wenn für den Ablauf kein Streaming implementiert ist, verhält sich StreamFlow() identisch mit RunFlow().

Sie können die Befehlszeile auch zum Streamen von Abläufen verwenden:

genkit flow:run menuSuggestionFlow '"French"' -s

Abläufe bereitstellen

Wenn Sie über HTTP auf Ihren Stream zugreifen möchten, müssen Sie ihn zuerst bereitstellen. Wenn Sie Abläufe mit Cloud Run und ähnlichen Diensten bereitstellen möchten, definieren Sie die Abläufe und rufen Sie dann Init() auf:

func main() {
	genkit.DefineFlow(
		"menuSuggestionFlow",
		func(ctx context.Context, restaurantTheme string) (string, error) {
			// ...
			return "", nil
		},
	)
	if err := genkit.Init(context.Background(), nil); err != nil {
		log.Fatal(err)
	}
}

Init startet einen net/http-Server, der Ihre Workflows als HTTP-Endpunkte bereitstellt (z. B. http://localhost:3400/menuSuggestionFlow).

Der zweite Parameter ist ein optionaler Options, der Folgendes angibt:

  • FlowAddr: Adresse und Port, die bzw. der überwacht werden soll. Wenn keine Angabe erfolgt, überwacht der Server den durch die Umgebungsvariable PORT angegebenen Port. Wenn dieser leer ist, wird der Standardwert Port 3400 verwendet.
  • Flows: Die Abläufe, die bereitgestellt werden sollen. Wenn nicht angegeben, wird Init für alle Ihre definierten Aufrufabfolgen verwendet.

Wenn du Streams auf demselben Host und Port wie andere Endpunkte bereitstellen möchtest, kannst du FlowAddr auf - setzen und stattdessen NewFlowServeMux() aufrufen, um einen Handler für deine Genkit-Streams abzurufen, den du mit deinen anderen Routen-Handlern multiplexen kannst:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Beobachtbarkeit von Abläufen

Wenn Sie SDKs von Drittanbietern verwenden, die nicht für die Beobachtbarkeit instrumentiert sind, möchten Sie sie manchmal als separaten Trace-Schritt in der Entwickler-UI sehen. Sie müssen den Code lediglich in die Funktion run einfügen.

genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
			// ...
			return "", nil
		})

		// ...
		return themes, err
	})