Fragmenty sygnatur czasowych

Jeśli kolekcja zawiera dokumenty z sekwencyjnie posortowanymi wartościami indeksowanymi, Cloud Firestore ogranicza szybkość zapisu do 500 zapiszów na sekundę. Na tej stronie opisano, jak podzielić pole dokumentu, aby przekroczyć ten limit. Najpierw wyjaśnijmy, co rozumiemy przez „sekwencyjne indeksowanie pól” i kiedy ten limit ma zastosowanie.

Sekwencyjne indeksowane pola

„Uporządkowane indeksowane pola” to dowolna kolekcja dokumentów zawierająca monotonicznie rosnące lub malejące indeksowane pole. W wielu przypadkach oznacza to pole timestamp, ale dowolna wartość monotonicznie rosnąca lub malejąca może spowodować przekroczenie limitu zapisu wynoszącego 500 zapiszów na sekundę.

Na przykład limit dotyczy kolekcji dokumentów user z indekowanym polem userid, jeśli aplikacja przypisuje wartości userid w ten sposób:

  • 1281, 1282, 1283, 1284, 1285, ...

Z drugiej strony nie wszystkie pola timestamp powodują przekroczenie limitu. Jeśli pole timestamp śledzi losowo rozłożone wartości, limit zapisu nie ma zastosowania. Również rzeczywista wartość pola nie ma znaczenia, ważne jest tylko to, czy rośnie ona monotonicznie czy maleje. Na przykład oba te zestawy monotonicznie rosnących wartości pól powodują przekroczenie limitu zapisu:

  • 100000, 100001, 100002, 100003, ...
  • 0, 1, 2, 3, ...

Dzielenie na fragmenty pola sygnatury czasowej

Załóżmy, że Twoja aplikacja używa monotonicznie rosnącego pola timestamp. Jeśli Twoja aplikacja nie używa pola timestamp w żadnych zapytaniach, możesz usunąć limit 500 wpisów na sekundę, nie indeksując pola z datą i godziną. Jeśli w przypadku zapytań potrzebujesz pola timestamp, możesz obejść ten limit, używając shardowanych sygnatur czasowych:

  1. Obok pola timestamp dodaj pole shard. W polu shard użyj 1..n niepowtarzalnych wartości. Spowoduje to zwiększenie limitu zapisu dla kolekcji do 500*n, ale musisz zagregować zapytania n.
  2. Zaktualizuj logikę zapisu, aby losowo przypisywać wartość shard do każdego dokumentu.
  3. Zaktualizuj swoje zapytania, aby zsumować podzielone zbiory wyników.
  4. Wyłącz indeksy pojedynczych pól zarówno w przypadku pola shard, jak i pola timestamp. Usuń istniejące indeksy złożone, które zawierają pole timestamp
  5. Utwórz nowe indeksy złożone, aby obsługiwać zaktualizowane zapytania. Kolejność pól w indeksie ma znaczenie. Pole shard musi znajdować się przed polem timestamp. Wszystkie indeksy, które zawierają pole timestamp, muszą zawierać też pole shard.

Spartycjonowane sygnatury czasowe należy stosować tylko w przypadkach, gdy utrzymywane są szybkości zapisu powyżej 500 napisu na sekundę. W przeciwnym razie jest to przedwczesna optymalizacja. Dzielenie na fragmenty pola timestamp powoduje usunięcie ograniczenia dotyczącego 500 zapisów na sekundę, ale wymaga agregacji zapytań po stronie klienta.

W poniższych przykładach pokazujemy, jak podzielić pole timestamp i jak wysłać zapytanie dotyczące podzielonego zbioru wyników.

Przykładowy model danych i zapytania

Wyobraź sobie na przykład aplikację do analizy instrumentów finansowych (np. walut, akcji i funduszy ETF) w czasie zbliżonym do rzeczywistego. Ta aplikacja zapisuje dokumenty do kolekcji instruments w ten sposób:

Node.js
async function insertData() {
  const instruments = [
    {
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

Ta aplikacja wykonuje te zapytania i porządkuje wyniki według pola timestamp:

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  return fs.collection('instruments')
      .where(fieldName, fieldOperator, fieldValue)
      .orderBy('timestamp', 'desc')
      .limit(limit)
      .get();
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

Po przeprowadzeniu badań okazuje się, że aplikacja będzie otrzymywać od 1000 do 1500 aktualizacji instrumentu na sekundę. Liczba ta przekracza 500 zapisów na sekundę dozwolonych w przypadku kolekcji zawierających dokumenty z indeksowanymi polami z datą i godziną. Aby zwiększyć przepustowość zapisu, musisz użyć 3 wartości fragmentów: MAX_INSTRUMENT_UPDATES/500 = 3. W tym przykładzie użyto wartości fragmentów x, yz. Do wartości fragmentów możesz też użyć liczb lub innych znaków.

Dodawanie pola fragmentu

Dodaj pole shard do dokumentów. Ustaw pole shard na wartości x, y lub z, aby zwiększyć limit zapisu w kolekcji do 1500 zapiszów na sekundę.

Node.js
// Define our 'K' shard values
const shards = ['x', 'y', 'z'];
// Define a function to help 'chunk' our shards for use in queries.
// When using the 'in' query filter there is a max number of values that can be
// included in the value. If our number of shards is higher than that limit
// break down the shards into the fewest possible number of chunks.
function shardChunks() {
  const chunks = [];
  let start = 0;
  while (start < shards.length) {
    const elements = Math.min(MAX_IN_VALUES, shards.length - start);
    const end = start + elements;
    chunks.push(shards.slice(start, end));
    start = end;
  }
  return chunks;
}

// Add a convenience function to select a random shard
function randomShard() {
  return shards[Math.floor(Math.random() * Math.floor(shards.length))];
}
async function insertData() {
  const instruments = [
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

Wykonywanie zapytań dotyczących podzielonego na fragmenty sygnatury czasowej

Dodanie pola shard wymaga zaktualizowania zapytań w celu zgrupowania wyników w podziale na segmenty:

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  // For each shard value, map it to a new query which adds an additional
  // where clause specifying the shard value.
  return Promise.all(shardChunks().map(shardChunk => {
        return fs.collection('instruments')
            .where('shard', 'in', shardChunk)  // new shard condition
            .where(fieldName, fieldOperator, fieldValue)
            .orderBy('timestamp', 'desc')
            .limit(limit)
            .get();
      }))
      // Now that we have a promise of multiple possible query results, we need
      // to merge the results from all of the queries into a single result set.
      .then((snapshots) => {
        // Create a new container for 'all' results
        const docs = [];
        snapshots.forEach((querySnapshot) => {
          querySnapshot.forEach((doc) => {
            // append each document to the new all container
            docs.push(doc);
          });
        });
        if (snapshots.length === 1) {
          // if only a single query was returned skip manual sorting as it is
          // taken care of by the backend.
          return docs;
        } else {
          // When multiple query results are returned we need to sort the
          // results after they have been concatenated.
          // 
          // since we're wanting the `limit` newest values, sort the array
          // descending and take the first `limit` values. By returning negated
          // values we can easily get a descending value.
          docs.sort((a, b) => {
            const aT = a.data().timestamp;
            const bT = b.data().timestamp;
            const secondsDiff = aT.seconds - bT.seconds;
            if (secondsDiff === 0) {
              return -(aT.nanoseconds - bT.nanoseconds);
            } else {
              return -secondsDiff;
            }
          });
          return docs.slice(0, limit);
        }
      });
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

Aktualizowanie definicji indeksów

Aby usunąć ograniczenie 500 wpisów na sekundę, usuń istniejące indeksy jednopolowe i kompleksowe, które używają pola timestamp.

Usuwanie definicji indeksów złożonych

Konsola Firebase

  1. Otwórz w konsoli Firebase stronę Cloud Firestore Indeksy złożone.

    Otwórz stronę Indeksy złożone

  2. W przypadku każdego indeksu zawierającego pole timestamp kliknij kolejno przyciski Usuń.

konsola GCP

  1. W konsoli Google Cloud otwórz stronę Bazy danych.

    Otwórz Bazy danych

  2. Wybierz wymaganą bazę danych z listy baz danych.

  3. W menu nawigacyjnym kliknij Indeksy, a następnie kartę Kompozycja.

  4. Użyj pola Filtr, aby wyszukać definicje indeksu, które zawierają pole timestamp.

  5. W przypadku każdego z tych indeksów kliknij przycisk , a następnie Usuń.

wiersz poleceń Firebase

  1. Jeśli nie masz skonfigurowanego wiersza poleceń Firebase, postępuj zgodnie z tymi instrukcjami, aby zainstalować wiersz poleceń i uruchomić polecenie firebase init. Podczas wykonywania polecenia init pamiętaj, aby wybrać Firestore: Deploy rules and create indexes for Firestore.
  2. Podczas konfiguracji wiersz poleceń Firebase pobiera istniejące definicje indeksów do pliku o nazwie firestore.indexes.json.
  3. Usuń definicje indeksów, które zawierają pole timestamp, na przykład:

    {
    "indexes": [
      // Delete composite index definition that contain the timestamp field
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "exchange",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "instrumentType",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "price.currency",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
     ]
    }
    
  4. Wdróż zaktualizowane definicje indeksów:

    firebase deploy --only firestore:indexes
    

Aktualizowanie definicji indeksu pojedynczego pola

Konsola Firebase

  1. Otwórz w konsoli Firebase stronę Cloud Firestore Indeksy pojedynczych pól.

    Otwórz indeksy pojedynczych pól

  2. Kliknij Dodaj wyjątek.

  3. W polu Identyfikator kolekcji wpisz instruments. W polu Ścieżka pola wpisz timestamp.

  4. W sekcji Zakres zapytania zaznacz Kolekcja i Grupa kolekcji.

  5. Kliknij Dalej.

  6. Przełącz wszystkie ustawienia indeksu na Wyłączone. Kliknij Zapisz.

  7. Powtórz te same czynności w przypadku pola shard.

konsola GCP

  1. W konsoli Google Cloud otwórz stronę Bazy danych.

    Otwórz Bazy danych

  2. Wybierz wymaganą bazę danych z listy baz danych.

  3. W menu nawigacyjnym kliknij kolejno Indeksy i Pojedyncze pole.

  4. Kliknij kartę Pojedyncze pole.

  5. Kliknij Dodaj wyjątek.

  6. W polu Identyfikator kolekcji wpisz instruments. W polu Ścieżka pola wpisz timestamp.

  7. W sekcji Zakres zapytania zaznacz Kolekcja i Grupa kolekcji.

  8. Kliknij Dalej.

  9. Przełącz wszystkie ustawienia indeksu na Wyłączone. Kliknij Zapisz.

  10. Powtórz te same czynności w przypadku pola shard.

wiersz poleceń Firebase

  1. Dodaj do sekcji fieldOverrides w pliku definicji indeksów następujące informacje:

    {
     "fieldOverrides": [
       // Disable single-field indexing for the timestamp field
       {
         "collectionGroup": "instruments",
         "fieldPath": "timestamp",
         "indexes": []
       },
     ]
    }
    
  2. Wdróż zaktualizowane definicje indeksów:

    firebase deploy --only firestore:indexes
    

Tworzenie nowych indeksów złożonych

Po usunięciu wszystkich poprzednich indeksów zawierających timestamp zdefiniuj nowe indeksy wymagane przez aplikację. Każdy indeks zawierający pole timestamp musi też zawierać pole shard. Na przykład aby obsługiwać powyższe zapytania, dodaj te indeksy:

Kolekcja Zindeksowane pola Zakres zapytania
instrumenty shard, price.currency, timestamp Kolekcja
instrumenty  fragment,  wymiana,  sygnatura czasowa Kolekcja
instrumenty shard, instrumentType, timestamp Kolekcja

Komunikaty o błędach

Możesz utworzyć te indeksy, uruchamiając zaktualizowane zapytania.

Każde zapytanie zwraca komunikat o błędzie z linkiem do utworzenia wymaganego indeksu w konsoli Firebase.

wiersz poleceń Firebase

  1. Dodaj do pliku definicji indeksu te indeksy:

     {
       "indexes": [
       // New indexes for sharded timestamps
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "exchange",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "instrumentType",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "price.currency",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
       ]
     }
    
  2. Wdróż zaktualizowane definicje indeksów:

    firebase deploy --only firestore:indexes
    

Limit zapisu w przypadku sekwencyjnych zindeksowanych pól

Limit szybkości zapisu w przypadku sekwencyjnie indeksowanych pól wynika ze sposobu, w jaki Cloud Firestore przechowuje wartości indeksu i skaluje zapisy indeksu. W przypadku każdego zapisu indeksu Cloud Firestore definiuje wpis klucz-wartość, który łączy nazwę dokumentu i wartość każdego z indeksowanych pól. Cloud Firestore porządkuje te wpisy indeksu w grupy danych zwane tabletami. Każdy serwer Cloud Firestore zawiera co najmniej 1 tablet. Gdy obciążenie związane z zapisywaniem na konkretnym tablecie staje się zbyt duże, Cloud Firestore skaluje go poziomo, dzieląc go na mniejsze tablety i rozmieszczając je na różnych serwerach Cloud Firestore.

Cloud Firestore miejsca na podręcznej liście Jeśli wartości indeksu w tablecie są zbyt zbliżone do siebie, np. w przypadku pól sygnatury czasowej, Cloud Firestore nie może skutecznie podzielić tabletu na mniejsze tablety. Powoduje to, że jeden tablet otrzymuje zbyt dużo ruchu, a operacje odczytu i zapisu w tym punkcie stają się wolniejsze.

Dzięki dzieleniu na fragmenty pola z datą i godziną Cloud Firestore może efektywnie dzielić zbiory zadań na wiele tabletek. Chociaż wartości pola sygnatury czasowej mogą być zbliżone, złączanie wartości fragmentu i indeksów daje Cloud Firestorewystarczająco dużo miejsca pomiędzy wpisami indeksu, aby można było podzielić je na kilka tabletów.

Co dalej?