זרימות

תהליכי עבודה הם פונקציות עטופות במכסה עם כמה מאפיינים נוספים על פני קריאות ישירות: הן מוקלדות, ניתנות לסטרימינג, באופן מקומי וניתן לקריאה מרחוק, וניתנות למדידה מלאה. ב-Firebase Genkit יש כלים ל-CLI ולממשק המשתמש למפתחים לצורך הרצה וניפוי באגים.

הגדרת תהליכים

בצורתו הפשוטה ביותר, זרימה כוללת פונקציה:

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		suggestion := makeMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	})

כך תוכלו להריץ את הפונקציה מ-CLI של Genkit ומממשק המשתמש למפתחים. זו גם דרישה לשימוש בחלק גדול מהתכונות של Genkit, כולל פריסה וניטור.

יתרון חשוב של תהליכים ב-Genkit על פני קריאה ישירה ל-API של מודל הוא בטיחות הסוג של הקלט והפלט. סוגי הארגומנט והתוצאה של תהליך יכולים להיות ערכים פשוטים או מובְנים. Genkit ייצור סכימות JSON לערכים האלה באמצעות invopop/jsonschema.

התהליך הבא מקבל string כקלט ומפיק struct:

type MenuSuggestion struct {
	ItemName    string `json:"item_name"`
	Description string `json:"description"`
	Calories    int    `json:"calories"`
}

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
		suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	},
)

תהליכים פעילים

כדי להריץ תהליך בקוד:

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

אפשר להשתמש ב-CLI גם כדי להריץ תהליכים:

genkit flow:run menuSuggestionFlow '"French"'

שודר

דוגמה פשוטה לתהליך שיכול להעביר ערכים בסטרימינג:

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string

menuSuggestionFlow := genkit.DefineStreamingFlow(
	"menuSuggestionFlow",
	func(
		ctx context.Context,
		restaurantTheme InputType,
		callback func(context.Context, StreamType) error,
	) (OutputType, error) {
		var menu strings.Builder
		menuChunks := make(chan StreamType)
		go makeFullMenuSuggestion(restaurantTheme, menuChunks)
		for {
			chunk, ok := <-menuChunks
			if !ok {
				break
			}
			if callback != nil {
				callback(context.Background(), chunk)
			}
			menu.WriteString(string(chunk))
		}
		return OutputType(menu.String()), nil
	},
)

שימו לב שאפשר לא להגדיר את הפונקציה הזו. הוא מוגדר רק אם הלקוח שמפעיל את הפונקציה מבקש תשובה בסטרימינג.

כדי להפעיל זרימה במצב סטרימינג:

menuSuggestionFlow.Stream(
	context.Background(),
	"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
	if err != nil {
		// handle err
		return false
	}
	if !sfv.Done {
		fmt.Print(sfv.Stream)
		return true
	} else {
		fmt.Print(sfv.Output)
		return false
	}
})

אם התהליך לא מטמיע סטרימינג, StreamFlow() מתנהג באופן זהה ל-RunFlow().

תוכלו להשתמש ב-CLI כדי לשדר תהליכים גם:

genkit flow:run menuSuggestionFlow '"French"' -s

פריסת תהליכים

כדי שתוכלו לגשת לתהליך דרך HTTP, תצטרכו לפרוס אותו קודם. כדי לפרוס תהליכים באמצעות Cloud Run ושירותים דומים, מגדירים את התהליכים ומפעילים את Init():

func main() {
	genkit.DefineFlow(
		"menuSuggestionFlow",
		func(ctx context.Context, restaurantTheme string) (string, error) {
			// ...
			return "", nil
		},
	)
	if err := genkit.Init(context.Background(), nil); err != nil {
		log.Fatal(err)
	}
}

Init מפעיל שרת net/http שחושפ את הזרמים שלכם כנקודות קצה מסוג HTTP (לדוגמה, http://localhost:3400/menuSuggestionFlow).

הפרמטר השני הוא Options אופציונלי שמציין את הפרטים הבאים:

  • FlowAddr: הכתובת והיציאה שבהן מתבצע האזנה. אם לא מציינים יציאה, השרת מקשיב ביציאה שצוינה במשתנה הסביבה PORT. אם המשתנה הזה ריק, המערכת משתמשת ביציאת ברירת המחדל 3400.
  • Flows: אילו תהליכים להציג. אם לא מציינים זאת, Init ימלא את כל תהליכי העבודה המוגדרים.

אם רוצים להציג תהליכים באותו מארח ובאותה יציאה כמו נקודות קצה אחרות, אפשר להגדיר את FlowAddr כ-- ובמקום זאת לבצע קריאה ל-NewFlowServeMux() כדי לקבל טיפול לתהליכי Genkit, שאפשר לבצע בו מוליפסק עם טיפולי הנתיבים האחרים:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

ניראות של Flow

לפעמים, כשמשתמשים ב-SDK של צד שלישי שלא עבר הטמעה למעקב, כדאי להציג אותו כשלב מעקב נפרד בממשק המשתמש למפתחים. כל מה שצריך לעשות הוא לעטוף את הקוד בפונקציה run.

genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
			// ...
			return "", nil
		})

		// ...
		return themes, err
	})