Przepływy

Przepływy to funkcje opakowane z dodatkowymi cechami w stosunku do bezpośrednich połączenia: są dobrze zdefiniowane, można strumieniować, można je wywoływać lokalnie i zdalnie. w pełni dostrzegalny. Firebase Genkit udostępnia interfejs wiersza poleceń i narzędzia interfejsu dla programistów do uruchamiania i debugowania przepływów.

Definiowanie przepływów

W najprostszej postaci przepływ obejmuje tylko funkcję:

Go

menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (string, error) {
      suggestion := makeMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  })

Dzięki temu możesz uruchomić funkcję z interfejsu wiersza poleceń Genkit i interfejsu programistycznego. są wymagane do działania wielu funkcji Genkit, w tym wdrożenia dostrzegalność.

Ważną zaletą przepływów Genkit w stosunku do bezpośredniego wywoływania interfejsu API modelu jest bezpieczeństwo typu danych wejściowych i wyjściowych:

Go

Argumenty i typy wyników przepływu mogą być wartościami prostymi lub uporządkowanymi. Genkit wygeneruje schematy JSON dla tych wartości za pomocą invopop/jsonschema.

Ten proces pobiera string jako dane wejściowe i wyprowadza struct:

type MenuSuggestion struct {
  ItemName    string `json:"item_name"`
  Description string `json:"description"`
  Calories    int    `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
      suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  },
)

Uruchomione przepływy

Aby uruchomić przepływ w kodzie:

Go

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

Za pomocą interfejsu wiersza poleceń możesz też uruchamiać przepływy:

genkit flow:run menuSuggestionFlow '"French"'

Transmisja zakończona

Oto prosty przykład przepływu, który może przesyłać strumieniowo wartości:

Go

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
  "menuSuggestionFlow",
  func(
      ctx context.Context,
      restaurantTheme InputType,
      callback func(context.Context, StreamType) error,
  ) (OutputType, error) {
      var menu strings.Builder
      menuChunks := make(chan StreamType)
      go makeFullMenuSuggestion(restaurantTheme, menuChunks)
      for {
          chunk, ok := <-menuChunks
          if !ok {
              break
          }
          if callback != nil {
              callback(context.Background(), chunk)
          }
          menu.WriteString(string(chunk))
      }
      return OutputType(menu.String()), nil
  },
)

Pamiętaj, że wywołanie zwrotne strumieniowania może być niezdefiniowane. Jest on zdefiniowany tylko wtedy, gdy Klient wysyłający żądanie oczekuje odpowiedzi przesyłanej strumieniowo.

Aby wywołać przepływ w trybie strumieniowania:

Go

menuSuggestionFlow.Stream(
  context.Background(),
  "French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
  if !sfv.Done {
      fmt.Print(sfv.Output)
      return true
  } else {
      return false
  }
})

Jeśli ten proces nie obejmuje implementacji strumieniowego przesyłania danych, funkcja StreamFlow() działa identycznie jak RunFlow()

Za pomocą interfejsu wiersza poleceń możesz też strumieniować przepływy:

genkit flow:run menuSuggestionFlow '"French"' -s

Wdrażanie przepływów

Jeśli chcesz mieć dostęp do przepływu przez HTTP, musisz go wdrożyć .

Go

Aby wdrożyć przepływy za pomocą Cloud Run i podobnych usług, zdefiniuj przepływy. potem wywołaj Init():

func main() {
  genkit.DefineFlow(
      "menuSuggestionFlow",
      func(ctx context.Context, restaurantTheme string) (string, error) {
          // ...
          return "", nil
      },
  )
  if err := genkit.Init(context.Background(), nil); err != nil {
      log.Fatal(err)
  }
}

Init uruchamia serwer net/http, który ujawnia przepływy jako HTTP punktów końcowych (na przykład http://localhost:3400/menuSuggestionFlow).

Drugi parametr to opcjonalny Options, który określa:

  • FlowAddr: adres i port nasłuchiwania. Jeśli nie podasz żadnej wartości, serwer nasłuchuje na porcie określonym przez zmienną środowiskową PORT; jeśli ta wartość jest pusta, używany jest domyślny port 3400.
  • Flows: sposób, w jaki mają się wyświetlać reklamy. Jeśli nie podasz żadnej wartości, Init udostępnia wszystkie i zdefiniowane przepływy pracy.

Jeśli chcesz obsługiwać przepływy na tym samym hoście i na tym samym porcie co inne punkty końcowe, może ustawić FlowAddr na - i wywołać NewFlowServeMux() w celu uzyskania modułu obsługi dla przepływów Genkit, które można multipleksować za pomocą innych modułów obsługi tras:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Dostrzegalność przepływu

Czasami, gdy używasz zewnętrznych pakietów SDK, które nie mają narzędzi do obserwowania, możesz zobaczyć je jako osobny krok śledzenia w interfejsie programisty. Ty należy zawijać kod do funkcji run.

genkit.DefineFlow(
    "menuSuggestionFlow",
    func(ctx context.Context, restaurantTheme string) (string, error) {
        themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
            // ...
            return "", nil
        })

        // ...
        return themes, err
    })