How to use streaming parsing in JavaScript

Sergey ZenchenkoFebruary 23, 2023

Share:

The page-loading process consists of many steps bound to network throughput and processing speed at the code level. You can affect the time required to download your payload by applying compression or moving your server closer to the end user. In the past article, we've improved decoding performance.

This article will show you an alternative way for data downloading and decoding that can significantly improve performance.

Nobody likes to wait.

During AppSpector development, we faced many performance challenges. The session is an archive of all data collected by our SDK from the end-user app, which can be data from iOS, Android, and Flutter apps.

If the app actively produces these events, the session size can be significant, up to 30-50 MB of uncompressed msgpack data. We compress it using LZ4, but it can still grow up to 5-9 MB of compressed data.

It takes time to download this data from the server and additional time to decompress, parse, and insert it into UI.

We've been looking for a way to optimize it for a long time by using different compression algorithms or changing the architecture.

We found the solution when working on optimizing the msgpack-javascript library. The answer was to combine downloading, uncompressing, and decoding parts. Instead of waiting for data arrival, we can start processing it as soon as the first bytes are available.

Fetching data

Modern browsers support ReadableStreams API. It allows you to receive a stream of bytes downloaded from a server instead of waiting for the whole response body.

const response = await fetch("https://appspector.com/huge_session.msgpack");
const streamReader = response.body.getReader();

function decode(data: Uint8Array): void {
...
}

streamReader.read().then(function processData({ done, value }) {
    // Result objects contain two properties:
    // done - true if the stream has already given you all its data.
    // value - some data. Always undefined when done is true.
    if (done) {
      console.log("Stream complete");      
      return;
    }

    decode(value);

    // Read some more, and call this function again
    return streamReader.read().then(processText);
});

You can read more about ReadableStream here

Decoding

Standard JSON.parse() can't parse data stream, which is why we use MessagePack for payload encoding. It naturally comes from the structure of this format.

When you read MessagePack, you first always have a byte that defines the type of the next element and the length of this element. It always lets us know the next part and how many bytes we need to construct it.

In an AppSpector session, the payload is an array of events. Each event is from 300 bytes to 250 kB of data and is processed by our logic one by one without waiting for other events.

We've been using messagepack-javascript for decoding, and initially, it was not supporting streaming decoding, forcing us to add it.

Step 1: Create a generator from ReadableStream

ReadableStream API could be more convenient, so we wrapped it into a JavaScript generator.


export async function* asyncIterableFromStream<T>(stream: ReadableStream<T>): AsyncIterable<T> {
  const reader = stream.getReader();

  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) {
        return;
      }
      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}

Now we loop over incoming bytes buffers like this.


async *decodeArrayStream(stream: AsyncIterable<Uint8Array>) {
    for await (const buffer of stream) {
       ...process individual buffer
    }
}

Step 2: Read the array size

Our payload is one large array, so we need to read its length to start parsing.


readArraySize(): number {
    const headByte = this.readHeadByte();

    switch (headByte) {
      // Array with 16 bits length
      case 0xdc: 
        return this.readU16();
      // Array with 32 bits length
      case 0xdd:
        return this.readU32();
      default: {
        // Array with 8 bits length
        if (headByte < 0xa0) {
          return headByte - 0x90;
        } else {
          throw new Error(`Unrecognized array type byte: ${prettyByte(headByte)}`);
        }
      }
    }
}

Step 3: Start reading individual events

Now we can start reading individual events from incoming byte buffers.

For every incoming byte buffer, we append it to the internal buffer of MessagePack Decoder.

During the first iteration, we read the size of the array. This size is used as an elements counter to identify the end of the array.

Next, we try to decode one element using Decoder.decodeSync() method. This method returns decoded object if decoding is successful or throws an exception if there is insufficient data in the buffer to decode the current object.

async *decodeArrayStream(stream: AsyncIterable<Uint8Array>) {
    let headerParsed = false;
    let decoded = false;
    let itemsLeft = 0;

    for await (const buffer of stream) {
      if (decoded) {        
        throw this.createNoExtraBytesError(this.totalPos);
      }

      // Append new bytes to internal buffer.
      this.appendBuffer(buffer);

      if (!headerParsed) {
        itemsLeft = this.readArraySize();
        headerParsed = true;
        this.complete();
      }

      try {
        while (true) {
          // Read one object or throw if there is not enough data
          let result = this.decodeSync();

          // Return result from generator
          yield result;

          itemsLeft--;

          if (itemsLeft === 0) {
            decoded = true;
            break;
          }
        }
      } catch (e) {
        if (!(e instanceof DataViewIndexOutOfBoundsError)) {
          throw e; // rethrow
        }
        // fallthrough
      }
    }
  }

Usage

This part is based on internal methods of MessagePack Decoder. Don't worry if you need help understanding all the steps. It's about how we can use it in the actual app.

In this sample, you will receive individual events simultaneously as data is downloaded from the server.

const response = await fetch("https://appspector.com/huge_session.msgpack");
const streamReader = response.body.getReader();

const streamIterator = asyncIterableFromStream(streamReader);

for await(const event of decodeArrayStream(streamIterator)) {
   // Here we have decoded event that we can start processing, inserting into Redux and updating UI. 
   processEvent(event);
}

Instead of waiting for 2 seconds of downloading, 2.5 seconds of decoding, and 1.5 seconds of processing + inserting into Redux, we have a total time of 3.5 for all three done in parallel, a 1.7x improvement.

Summary

Sometimes the best way to solve the problem is to look at it from a different perspective. We've been improving the performance of individual steps, which is only sometimes possible. The best solution was to rethink the overall data processing flow. As always, our changes are contributed back to open source (you can find the pull request here).