Introducing parallelism to subgraphs, in developer preview

Also as promised, here is the start of discussions around the parallelisation of subgraphs.

This repository contains explanations about how one can reason about parallelism in the context of The Graph.

Now, instead of providing only a written proposal, you will also get to try something here: a sample implementation of the PancakeSwap Exchange subgraph, infused with parallelism, capable of running at extremely high speeds:

It requires a sync’d Firehose (search “Firehose” in the forum for previous post) on BSC to run.

It also relies on sparkle as a codegen tool, as pulled into the sparkle-pancakeswap binary as a library.

The README.md in sparkle will serve as our introduction to the matter. It is (again) a rather deep technical document, but I hope we can have some of the best brains meditate on it, provide suggestions and comments. I’d ask you to concentrate all feedback and comments in this thread.

If you have technical questions while you try out all of this, please join us on Discord!

5 Likes

Great stuff @abourget.

Digging in I am interested in the developer / indexer experience, as it’s quite a shift in mental model from the existing sequential subgraph definition. Did you experiment with different approaches to the definition / directives and do you think it will be easy for newcomers to grok?

I see there are @parallel declarations in the .graphql files, and if s.stepBelow in the mappings, are there any other key magic words? And I wonder where the size of the segments is defined (is that an indexer configuration?)

If there is somewhere to look for implementation Richer data for your indexing needs referenced in features, would be great to see - the more specific state information is one of the killer features of the firehose (vs. block-level eth-calls)

Finally - are these parallel steps also run for ongoing indexing, as well as the original subgraph sync to catch up with the chain head?

Woah I had forgotten to push some updates to the README.md ! So perhaps you want to check again! :face_with_hand_over_mouth: I also just added two new sections, giving more concrete examples of data dependencies:

(Adam, answering your questions in a moment)

Digging in I am interested in the developer / indexer experience, as it’s quite a shift in mental model from the existing sequential subgraph definition. Did you experiment with different approaches to the definition / directives and do you think it will be easy for newcomers to grok?

Our goal was to keep the same paradigm, and only sprinkle what was strictly necessary to enable parallelization. At first, we weren’t even sure it would work, and we discovered the data dependency paradigm and the annotation approach to keep all of this as simple as possible.

Regarding data dependencies, it would have failed if we had found cyclic dependencies (although I’m not sure how that would work, but anyway).

I see there are @parallel declarations in the .graphql files, and if s.stepBelow in the mappings, are there any other key magic words? And I wonder where the size of the segments is defined (is that an indexer configuration?)

I just added another section called Staging a subgraph with more details about the StepBelow annotation if you’re curious.

To answer your question, the segments are defined upon running the different steps. I just wrapped up example commands to run the parallel steps, and what parameters are required. If you have any feedback there, it’d be appreciated.

If there is somewhere to look for implementation Richer data for your indexing needs referenced in features, would be great to see - the more specific state information is one of the killer features of the firehose (vs. block-level eth-calls)

This piece of code is what is generated to support the different handlers that were available in the PancakeSwap subgraph. Since this code can be customized, everything available in that pbcodec.Block object could be sifted through, filtered, searched, mapped or whatever. The full protobuf definition is here and you can expect a similar function specific to each blockchain, with either full flexibility (and customizability of how to dig in the block), or higher level abstractions (like the eventHandlers that codegen’d to the HandleBlock function I just linked.

Finally - are these parallel steps also run for ongoing indexing, as well as the original subgraph sync to catch up with the chain head?

The parallel operations would be run to prep and kickstart indexing. Once complete, all parallel process are completed, and the operator can launch the linear process (the index command). The live indexer would then pick up where parallel operations stopped. That linear process is also very fast, and that,s useful from that point on :slight_smile:

I just added a Benchmark section to the PancakeSwap example, for the curious :slight_smile: Gives numbers about timings and volume of data produced at the different parallelized stages and steps.

Parallelism is notoriously difficult to get right, so I want to take a moment to establish what the bar is for determinism in the protocol.

This statement would not be sufficient: “The system enables a subgraph author to index deterministically”

Instead, what needs to be shown is this: “It is impossible for an adversarial subgraph author to index non-deterministically regardless of batch sizes, where the parallel / linear switchover happened, reorgs, blockchain data, or any other consideration of the external environment. No matter what block and any path taken the result is the same.”

Any proposal that does not meed this high standard is a non-starter. Absolute determinism is table-stakes. My hunch is that this is impossible to achieve while also giving the subgraph author a turing complete language - especially when there are separate linear/parallel code paths.

Do you believe that this proposal meets this standard?

1 Like

Further considering the determinism requirement.

There may be one (only one?) way to accomplish this, which would be to write a parallelizing compiler. Parallelizing compilers do exist, but have never reached mainstream applicability (for what reason I’m not sure).

Then the subgraph author would just write the serial version as they do today and the Indexer would run the latest parallelizing compiler on that to produce a parallel subgraph locally that they trust is a faithful representation of the serial subgraph.

This approach would also have the nice property of not splitting the ecosystem.

1 Like

Wow great comment :slight_smile:

Definitely, our implementation doesn’t give you that safety: you need to work quite a bit to make sure linear and parallel align perfectly.

Makes me think about safeguards that could be added, so that bits of data simply cannot be written to, except in their defined stage, and putting that in a VM with defined boundaries. Some sort of constraints that are applied equally to linear and parallel, and that safeguard the determinism enough.

This definitely needs more thought. Even wondering if it is possible to have two different execution mechanisms and hope that they be forcefully deterministic as stated? Curious to chat with you abou that!

2 Likes

It’s worth making explicit why this is important - determinism is a requirement for decentralization.

I don’t think that, for example, running in a VM with defined boundaries solves the problem of ensuring that two different paths gives the same result. If there are more than one program that could run in either parallel or serial without crossing the boundaries then we are still left with the problem of ensuring they are they produce the same result.

Ensuring that two programs are equivalent in their output is the same as solving the halting problem. (Eg: You could write a program that said if my program is equivalent to a program which returns 1 then return 0 otherwise return 1). Therefore it’s impossible in the general case to accept a user-specified serial and parallel implementation and prove they are the same. There are caveats here… for some subset of programs the halting problem is solvable, etc… but the point is that it’s ultimately the wrong approach in selecting what problems we as core developers should be trying to solve.

A parallelizing compiler seems a more useful application of the prior art and fruitful path to explore.

2 Likes

I may be wrong but intuitively, I would think that if certain measures were applied, parallel subgraphs could be made deterministic:

  • Parallelization using fixed-size batches.
  • Restrictions on what subgraph authors can do in parallel handlers (e.g. no store.get), to ensure that any processing order yields the same result.

… and probably a few more.

1 Like

The problem is that at some point the execution has to switch from “parallel mode” to “serial mode”. Depending on where the chain head is at, this affects when the transition takes place. When the transition takes place affects the output of the function.

It’s possible to stay in the parallel mode forever, but then you incur latency when you reach the chain head as you wait for a full batch to be available.

Locking down the API to restrict what can be done in parallel handlers doesn’t fix the problem. It would require a bottom-up approach in a language that is not turing-complete. (Eg: No go or WASM allowed). Even then I’ve not heard of any language that meets this requirement, but I would not be surprised if interesting work has been done here. I think @Brandon may have started evaluating some options.

The problem is that at some point the execution has to switch from “parallel mode” to “serial mode”. Depending on where the chain head is at, this affects when the transition takes place. When the transition takes place affects the output of the function.

True, I didn’t consider that the switching can happen at a pretty arbitrary time. Unless the subgraph manifest were to state the block at which it should happen. Granted, for such a subgraph to remain performant as the chain grows, it would have to be upgraded over time to push the “switch block” closer to the latest chain head with each upgrade. Not ideal, but maybe a workable compromise?

It’s possible to stay in the parallel mode forever, but then you incur latency when you reach the chain head as you wait for a full batch to be available.

Yes, definitely. That may be ok for some use cases but usually, we’ll want to leave it to the client to decide how fresh data needs to be for the subgraph to be useful for whatever the client tries to achieve.

I did consider having the manifest specify a switchover block but quickly discarded the idea because for our motivating use-case (PancakeSwap) the story is that near chain head the subgraph syncs slower than blocks are produced. So there is no acceptable point to place this transition because the subgraph would never sync to the chain head no matter what block is selected or where the chain head is at.

It’s not clear whether this situation holds after other optimizations are taken into account. Which brings me back to my central thesis on adding parallelism to graph-node: it’s the least bang for the most buck of any available optimization. Sure, educating developers to avoid making calls to the chain isn’t very sexy - but even for PancakeSwap case it’s the most impactful change. Other ideas here may be to have graph-node detect if a call is pure and compile it to WASM to run locally. The particulars are a tangent, but I’m sure we can get amazing gains without parallelism. Here’s why -

Let’s imagine that parallelism brings us from 100x to 800x. That’s "700 “more x!” Today, if block processing times are (for simple math) 800ms, and there is a constant overhead of 7ms per block for some operation that can be optimized you may look at that today and say it is tiny (indeed, it is less than 1%). But, if we already have implemented the 100x then the block processing time is currently 8ms. Now, that 7ms optimization has the same impact of going from 100x to 800x “700 more x!”

It is deeply weird that the order of optimizations appears to affect how impactful they are. But, this is the nature of measuring throughput where gains are exponential. So, from a certain lens the parallelism change is similar to a 7ms optimization today (Yes, I know this is not a direct comparison. Saying parallelism is responsible for the additional 700x gain is making the same error. Please do not miss the broader point.) graph-node has a TON of available optimizations that appear tiny today but would be huge in the future if the parts that are currently terribly slow were fixed.

The first step to a fast graph-node is some form of the Firehose. The second is taking a data-oriented approach.

You can calculate how fast it should be to index a subgraph. I’ll do a basic version of this for illustration. A typical HDD may read data at about 200MB per second. PancakeSwap has been around for 1 year, and in March was doing 2M transactions per day. If a transaction is Sender/Receiver/Amt/Pair at 32 bytes each, that’s 128 bytes per transaction. (Note that this would be an information theoretic upper bound, the actual entropy is much lower in practice because the data set is highly redundant and compressible). So this comes to (waves hands in the air a bit) 94GB total. If the read speed is the bottleneck (and it should be) this means you “should” be able to index PancakeSwap without going parallel in ~8 minutes. This estimate can be refined, but you get the point.

If you are not indexing PancakeSwap serially in less than 30 minutes it is because of things that data-oriented programming solves like:

  • We are not storing data in a cache-coherent manner that avoids unnecessary reads
  • We are regularly stalling the data processing pipeline with things like RPC calls
  • We introduced unnecessary and costly abstractions that are not part of transforming the data
  • etc

We should get on level before even thinking of adding user-level parallelism.

1 Like

I agree, eliminating the need for (most) Ethereum contract calls would be the most impactful optimization. We’ve discussed this among the core devs several times. The Firehose extractor could give us state change deltas, which with the help of contract ABIs, graph-node could translate such that subgraphs could process changes to any (or specific) state variables in contracts. I bet that would eliminate most Ethereum calls, which are just “look up this state variable” and not “run this contract function that executes EVM code across multiple contracts” (the latter is impossible to avoid in a smart way).

1 Like

I just had a productive call with @abourget and want to summarize some highlights here.

First, let me limit the scope of what will be addressed. This comment does not talk about adding user-level parallelism in any way, which I still hold to be unworkable. Instead, this post will be limited to the problem of data extraction and specifically whether or not it is beneficial to “pre-aggregate” data as a part of the implementation of Firehose.

TLDR: There are arguments for both, but it is possible to achieve great speed without pre-aggregation. Since avoiding pre-aggregating has some advantages in flexibility, it is the preferred approach by both of us.

The key insight of @abourget is that by using pessimistic filtering we can enable parallelism in the extraction phase without involving the user - turning the problem into essentially a map reduce.

To understand this, we will consider a subgraph with a dynamic data source that is listening to some event with the signature event() but for which the contract addresses are yet unknown.

What we would do to index this subgraph is to create a filter selecting for a superset of the required data (all events matching the signature event(), regardless of their contract) and go wide executing this filter against large ranges of blocks in parallel. The output of these jobs are piped into the indexing stage. The indexing stage would now have a linear sequence of events supplied to it nearly as fast as though it had been pre-aggregated.

It is worth addressing the fact that the filtered data pipe has more data than is necessary in the solution. There is a limit to how much filtering can be done in practice (for the pre-aggregation method, more filters creates more redundancy in the “indexed” data set). On-the-fly filtering as shown here is likely to create an even smaller data set for the processing stage than pre-aggregation would by itself.

What method is used by the indexing stage or what parallelism is possible there is now a separate and unanswered question. All that we have established is that pre-aggregation is not necessary to create a filtered pipe of data for indexing quickly. The two approaches can actually be combined for even greater efficiency (but not necessarily much greater speed).

The final point made by @abourget which solidifies the parallel, on-the-fly filtering in my mind is this:

It is much easier to get ecosystem adoption and make chains “The Graph FireHose Ready” by having a simple standard for what files to output that is not a moving target as graph-node requires different kinds of pre-aggregation. This concern far outweighs the limited difference in speed between the two approaches.

2 Likes