Luồng

Flow là các hàm được bao bọc với một số đặc điểm bổ sung so với trực tiếp cuộc gọi: chúng được nhập rõ ràng, có thể phát trực tuyến, có thể gọi cục bộ và từ xa, và hoàn toàn có thể ghi nhận được. Firebase Genkit cung cấp công cụ CLI và giao diện người dùng dành cho nhà phát triển để chạy và gỡ lỗi quy trình.

Xác định luồng

Ở dạng đơn giản nhất, luồng chỉ bao bọc một hàm:

Tiến hành

menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (string, error) {
      suggestion := makeMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  })

Thao tác như vậy sẽ cho phép bạn chạy hàm từ Genkit CLI cũng như giao diện người dùng của nhà phát triển, đồng thời một yêu cầu đối với nhiều tính năng của Genkit, bao gồm cả việc triển khai và khả năng ghi nhận.

Một lợi thế quan trọng mà luồng Genkit có so với việc gọi trực tiếp API mô hình là an toàn về kiểu của cả đầu vào và đầu ra:

Tiến hành

Loại đối số và kết quả của luồng có thể là giá trị đơn giản hoặc giá trị có cấu trúc. Genkit sẽ tạo giản đồ JSON cho các giá trị này bằng cách sử dụng invopop/jsonschema.

Luồng sau đây sẽ lấy string làm đầu vào và xuất ra struct:

type MenuSuggestion struct {
  ItemName    string `json:"item_name"`
  Description string `json:"description"`
  Calories    int    `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
      suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  },
)

Luồng đang chạy

Cách chạy một luồng trong mã:

Tiến hành

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

Bạn cũng có thể sử dụng CLI để chạy luồng:

genkit flow:run menuSuggestionFlow '"French"'

Đã phát trực tiếp

Dưới đây là ví dụ đơn giản về quy trình có thể truyền trực tuyến giá trị:

Tiến hành

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
  "menuSuggestionFlow",
  func(
      ctx context.Context,
      restaurantTheme InputType,
      callback func(context.Context, StreamType) error,
  ) (OutputType, error) {
      var menu strings.Builder
      menuChunks := make(chan StreamType)
      go makeFullMenuSuggestion(restaurantTheme, menuChunks)
      for {
          chunk, ok := <-menuChunks
          if !ok {
              break
          }
          if callback != nil {
              callback(context.Background(), chunk)
          }
          menu.WriteString(string(chunk))
      }
      return OutputType(menu.String()), nil
  },
)

Lưu ý rằng lệnh gọi lại phát trực tuyến có thể chưa được xác định. Giá trị này chỉ được xác định nếu ứng dụng gọi đang yêu cầu phản hồi được truyền trực tuyến.

Cách gọi một luồng ở chế độ phát trực tuyến:

Tiến hành

menuSuggestionFlow.Stream(
  context.Background(),
  "French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
  if !sfv.Done {
      fmt.Print(sfv.Output)
      return true
  } else {
      return false
  }
})

Nếu luồng không triển khai truyền trực tuyến, StreamFlow() sẽ hoạt động giống hệt với RunFlow().

Bạn cũng có thể dùng CLI để truyền trực tuyến luồng:

genkit flow:run menuSuggestionFlow '"French"' -s

Triển khai luồng

Nếu muốn truy cập vào flow của mình qua HTTP, bạn cần phải triển khai flow đầu tiên.

Tiến hành

Để triển khai luồng bằng Cloud Run và các dịch vụ tương tự, hãy xác định luồng của bạn và sau đó gọi Init():

func main() {
  genkit.DefineFlow(
      "menuSuggestionFlow",
      func(ctx context.Context, restaurantTheme string) (string, error) {
          // ...
          return "", nil
      },
  )
  if err := genkit.Init(context.Background(), nil); err != nil {
      log.Fatal(err)
  }
}

Init khởi động một máy chủ net/http để hiển thị các luồng của bạn dưới dạng HTTP điểm cuối (ví dụ: http://localhost:3400/menuSuggestionFlow).

Tham số thứ hai là Options không bắt buộc chỉ định những tham số sau:

  • FlowAddr: Địa chỉ và cổng để nghe. Nếu không được chỉ định, máy chủ nghe trên cổng được chỉ định bởi biến môi trường PORT; nếu trống thì cổng này sẽ sử dụng giá trị mặc định là cổng 3400.
  • Flows: Luồng nào cần phân phát. Nếu không được chỉ định, Init sẽ phân phối tất cả các quy trình đã xác định của bạn.

Nếu muốn phân phát các luồng trên cùng một máy chủ và cổng với các điểm cuối khác, bạn có thể đặt FlowAddr thành - và thay vào đó, hãy gọi NewFlowServeMux() để nhận một trình xử lý cho các luồng Genkit của bạn. Bạn có thể ghép kênh với các trình xử lý định tuyến khác của mình:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Khả năng quan sát dòng chảy

Đôi khi, khi bạn sử dụng SDK của bên thứ ba không được đo lường để quan sát, bạn có thể muốn xem chúng như một bước theo dõi riêng biệt trong Giao diện người dùng dành cho nhà phát triển. Bạn việc cần làm là gói mã vào hàm run.

genkit.DefineFlow(
    "menuSuggestionFlow",
    func(ctx context.Context, restaurantTheme string) (string, error) {
        themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
            // ...
            return "", nil
        })

        // ...
        return themes, err
    })