javascript / 4 min read

How to use streaming parsing in JavaScript

How to use streaming parsing in JavaScript

Page loading process consists of many steps. Some of these steps are bound to network throughput and some to processing speed at the code level.
You can affect the time required to download your payload, by applying compression or moving your server closes the end user. However, overall flows stay the same: Download -> Process. In the past article, we've improved decoding performance.

In this article, I am going to show you an alternative way for data downloading and decoding that can significantly improve performance.

Nobody likes to wait

During AppSpector development, I've faced many performance related challenges. The most performance critical part is session loading. The session is an archive of all data collected by our SDK from the end-user app. It can be data from iOS, Android, or Flutter apps. There are few differences between them, but most of this data consists of application logs, network requests, performance metrics, SQL queries executed by the app, device locations, etc.

If the app is actively producing these events size of a session can be pretty significant, up to 30-50mb of uncompressed msgpack data. We do compress it using LZ4, but it still can grow up to 5-9mb of compressed data.

It takes time to download this data from the server, and it takes additional time to decompress, parse, and insert into UI.

For a long time, I've been looking for a way to optimize it by using different compression algorithms or by changing the architecture.

I found the solution when I was working on optimizing msgpack-javascript library. The answer was to combine downloading, uncompressing, and decoding parts. Instead of waiting for data arrival, we can start processing it as soon as the first bytes are available.

Sounds cool. Let's see how we can do it.

Fetching data

This part is easy. Modern browsers support ReadableStreams API. It allows you to receive a stream of bytes downloaded from a server instead of waiting for the whole response body.


const response = await fetch("https://appspector.com/huge_session.msgpack");
const streamReader = response.body.getReader();

function decode(data: Uint8Array): void {
...
}

streamReader.read().then(function processData({ done, value }) {
    // Result objects contain two properties:
    // done  - true if the stream has already given you all its data.
    // value - some data. Always undefined when done is true.
    if (done) {
      console.log("Stream complete");      
      return;
    }

    decode(value);

    // Read some more, and call this function again
    return streamReader.read().then(processText);
});

You can check if it's supported in a browser that you need here

You can read more about ReadableStream here

Decoding

Now it's time to move to a fun part. Standard JSON.parse() can't parse data stream. This is why we are using MessagePack for payloads encoding. It naturally comes from the structure of this format.

When you read MessagePack, you first always have a byte that defines the type of next element and length of this element. It allows us always to know that the next element and how much bytes we need to construct it.

In AppSpector session, the payload is an array of events. Each event is from 300 bytes to 250 kB of data and can be processed by our logic one by one without waiting for other events.

I've been using messagepack-javascript  for decoding, and initially, it was not supporting streaming decoding. I had to add it.

Step 1: Create generator from ReadableStream

ReadableStream API is not very convenient to use, so I decided to wrap it into JavaScript generator.

export async function* asyncIterableFromStream<T>(stream: ReadableStream<T>): AsyncIterable<T> {
  const reader = stream.getReader();

  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) {
        return;
      }
      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}

Now we loop over incoming bytes buffers like this


async *decodeArrayStream(stream: AsyncIterable<Uint8Array>) {
    for await (const buffer of stream) {
       ...process individual buffer
    }
}

Step 2: Read array size

Our payload is one large array. To start parsing, we need to read it's length.

readArraySize(): number {
    const headByte = this.readHeadByte();

    switch (headByte) {
      // Array with 16 bits length
      case 0xdc: 
        return this.readU16();
      // Array with 32 bits length
      case 0xdd:
        return this.readU32();
      default: {
        // Array with 8 bits length
        if (headByte < 0xa0) {
          return headByte - 0x90;
        } else {
          throw new Error(`Unrecognized array type byte: ${prettyByte(headByte)}`);
        }
      }
    }
}

Step 3: Start reading individual events

Now we can start reading individual events from incoming byte buffers.

For every incoming byte buffer, we append it to the internal buffer of MessagePack Decoder.

During the first iteration, I am reading the size of the array. This size used as elements counter to identify the end of the array.

Next, I am trying to decode one element using Decoder.decodeSync() method. This method returns decoded object if decoding is successful or it throws an exception if there is not enough data in the buffer to decode current object.

async *decodeArrayStream(stream: AsyncIterable<Uint8Array>) {
    let headerParsed = false;
    let decoded = false;
    let itemsLeft = 0;

    for await (const buffer of stream) {
      if (decoded) {        
        throw this.createNoExtraBytesError(this.totalPos);
      }

      // Append new bytes to internal buffer.
      this.appendBuffer(buffer);

      if (!headerParsed) {
        itemsLeft = this.readArraySize();
        headerParsed = true;
        this.complete();
      }

      try {
        while (true) {
          // Read one object or throw if there is not enough data
          let result = this.decodeSync();

          // Return result from generator
          yield result;

          itemsLeft--;

          if (itemsLeft === 0) {
            decoded = true;
            break;
          }
        }
      } catch (e) {
        if (!(e instanceof DataViewIndexOutOfBoundsError)) {
          throw e; // rethrow
        }
        // fallthrough
      }
    }
  }

Usage

This part is based on internal methods of MessagePack Decoder. It's okay if you don't understand all the steps. It's about how we can use it in the real app.

In this sample, you will receive individual events coming one by one at the same time as data is being downloaded from the server.

const response = await fetch("https://appspector.com/huge_session.msgpack");
const streamReader = response.body.getReader();

const streamIterator = asyncIterableFromStream(streamReader);

for await(const event of decodeArrayStream(streamIterator)) {
   // Here we have decoded event that we can start processing, inserting into Redux and updating UI.  
   processEvent(event);
}

In our case instead of waiting for 2 seconds of downloading, 2.5 seconds of decoding and 1.5 seconds of processing + inserting into Redux, we have a total time of 3.5 for all three done in parallel.

This is a 1.7x improvement!

Summary

Sometimes the best way to solve the problem is to look at it from a different perspective. I've been trying to improve the performance of individual steps, and that is not always possible. The best solution was to rethink the overall data processing flow.

As always, my changes are contributed back to open source. You can find pull request here


About Us

AppSpector is remote debugging and introspection tool for iOS and Android applications. With AppSpector you can debug your app running in the same room or on another continent. You can measure app performance, view CoreData and SQLite content, logs, network requests and many more in realtime. Just like you we have been struggling for years trying to find stupid mistakes and dreaming of a better native tools, finally we decided to build them. This is the instrument that you’ve been looking for.

Share: