Introducing the Firehose

Hello community!

I’m Alex from StreamingFast and you are reading my very first post on these forums.

Following the recent announcement made by the Foundation, our team has been hard at work towards improving indexing performance on The Graph Network.

Today I want to present one of the ideas that helped us achieve great indexing performance in the past. It is called the Firehose.

Here’s the intro:

Abstract

Firehose: a files-based and streaming-first approach to processing blockchain data.

This proposition is a building block to an even larger vision of performance optimizations of the indexing stack, leading the path to parallelization.

Goals & Motivation

The goals of this document are:

  • to contribute knowledge and insights gathered by StreamingFast in the last 3+ years to The Graph’s ecosystem
  • to provide the necessary context to understand StreamingFast’s current contributions to the graph-node implementation

The goal of the Firehose is to provide a way to index blockchain data which:

  • is capable of handling high throughput chains (network bound)
  • increases linear indexing performances
  • increases backfilling performance & maximize data agility by enabling parallel processing
  • reduces risks of non-deterministic output
  • improves testability and developer experience when iterating on subgraphs
  • simplifies the operations of an indexer by relying on flat data files instead of live processes like an archive node
  • reduces the need for RPC calls to nodes

Read the full document here

It is a rather deep technical document, but I hope we can have some of the best brains meditate on it, provide suggestions and comments. I’d ask you to concentrate all feedback and comments in this thread.

I believe that, by introducing new ideas and having them collide with those of this community, we together will be able to create the greatest open data platform of all time.

18 Likes

This is the first of a series of proposals that will pave the way for parallelization and massive improvements to performances.

6 Likes

Looking forward to reading this and hope I can provide something useful in return!

2 Likes

+1. Really looking forward to reading through the full document properly in the next few days.

Full history stats:

  • History length: BSC = 331 days, ETH = 2187 days
  • Total size for merged blocks (again, compressed): BSC = 1030 GiB, ETH = 771.27 GiB
  • Avg data produced per day: BSC = 3.1 GiB/day, ETH: 0.35 GiB/day
  • So BSC produces ~8.85x more data per day, if we consider the whole history.

[It would be useful to have the size of an archive node at around the same time, for quick > comparison. If anyone has that data, please contact us.]

Storage stats as of July 30:

  1. Erigon 2021.07.05-alpha with all pruning disabled (--pruning=). Full archive + tracing node. Storage size: 1,342.06 GB.
  2. OpenEthereum v3.3.0-rc.4 with --tracing=on and --pruning=archive. Full archive + tracing node. Storage size: 8,018.82 GB.
3 Likes

BSC archive : closing in on 10 TiB - NVMe mandatory or you won’t catch chainhead.

3 Likes

This is a really good read and very clearly communicates the rationale behind the design of The Firehose. Thank you!

Questions/Comments that come to mind:

  • Where is the bulk of the time spent in preparing a firehose for a new protocol, assuming an existing client can be used? Sounds like the design of the protobuff schema is a significant and critical task?

  • On client instrumentation: Indexers are mostly running Openethereum 3.3.0-rc4 with tracing enabled. I expect most Indexers will transition gradually to Erigon where we are growing more and more confident of using that client in production (plus it brings a huge number of performance and resource efficiency benefits).

  • On the subject of clients, OE is on the path for retirement - any thoughts on building instrumentation for Erigon? This is where Indexers are likely going en masse in the near future.

  • Can an instrumented client continue to be used by traditional methods in parallel with serving functions for the firehose process?

I would be keen to help with testing and have the compute and storage to do so, let me know. Obtaining a specific client version and getting it to chainhead would be the main bottleneck, although I have many archive nodes at the chainhead but running the latest client software.

2 Likes

Developing the Firehose for a given protocol requires a deep understanding of that protocol, of the meaning of each bits of data. So yes design of the schema, alongside the instrumentation that makes it extract the right info at the right time, alongside some smart contracts that will exercice different intrinsics/externals/env. functions made available by the transaction execution runtime (we called our testbeds battlefield), all of those are critical and significant tasks. We usually put a few weeks worth of efforts, especially when new protocols are radically different from the previous. Porting instrumentation to different EVM runtimes, obviously takes less time. I’d say around 4-6 weeks with 2-3 engineers.

The battlefield repo for a given protocol will include: scripts to boot a chain from genesis (mining or producing blocks), scripts to deploy one or two test contracts that exercice the features of the chain, scripts to execute transactions on the producing node, a p2p connection to a Firehose-enabled node connected to the producing node, and outputting the instrumentation. We also found useful to write regression tests, to be able to diff outputs when we either upgrade the node, or upgrade our instrumentation. Once the test chain was written, we can also turn off the produce, and simply replay the chain to gather the firehose output. This whole flow makes it efficient to iterate on firehoses for different protocols, and has proven to allow steady progress to be made, without much regression.

For Ethereum in particular, we have identical protobuf generation using both geth and OpenEthereum. The firehose instrumentation is different (one being in Go, the other in Rust), but the piece of software consuming the output of both is able to produce a single output. It makes sense that the data produced by both be identical in a way that we can abstract away the implementation: if it weren’t the case, it would mean the data would be either specific to an implementation, or non-deterministic (sort of out of the protocol). It took us a day or two to port instrumentation on each meaningful geth derived chains (like BSC, etc…), same thing for OE-derived chains. I don’t foresee any issue porting that to Erigon, but (unless Matt V. has done it overnight :slight_smile: we don’t have it ready as of August 4th 2021.

Not sure if it was clear from the doc, when firehose is enabled and data is consumed this way, the node doesn’t need to have any flags that make it heavier (like archive mode, or tracing enabled). It can be treated (resource-wide) just like the simplest full node that does validation of transactions (so executes all transactions and processes the state transitions for the protocol).

If I’m not mistaken, I do think our Ethereum instrumentation supports using the node normally while it is being used for extraction. I’d suggest not to, though. Putting no other load than processing transactions and extracting data will ensure that the downstream systems will have the freshest data as fast as possible. Querying nodes while they do the write operations will inevitably cause lock contention and other slowdowns (although it’ll affect some protocols more than others).

Hope this helps, and hope it was clear enough! Thanks for the feedback!

4 Likes

This is great thanks Alex, still thinking deeply about the implications of The Firehose so will definitely be back to discuss more and ask more questions. I am very interested to explore the second order effects this technology can bring for the protocol.

2 Likes

Woah just to clarify (just re-read myself), instrumenting a brand new chain takes ~4-6 weeks, not porting something over.

3 Likes

Thanks for these writeups. I’m hyped for firehose, moreso with gaining understanding. :sunglasses:

A few questions, forgive me if some are naive / basic. (In no particular order, numbered for easy reference.)

  1. How long would it take to upgrade an already instrumented client ? In particular with hardforks or other critical updates.

  2. How well is the documentation for implementing such instrumented clients ? I reckon at some points other teams might be interested in upgrading existing clients and / or supporting new networks.

  3. Would there be any sense for the extractor to work via read access to the data-on-disk of an already synced archive node, while it keeps writing or exclusively ? Many indexers might already have this.

  4. Given the extracted data could reconstruct the chain, is there a piece in the stack that could serve “legacy” json-rpc requests based on the firehose flatfiles ?

  5. Can non-firehose adjusted subgraphs be indexed of a firehose regardless, even if they loose most / all benefits ?

  6. I love the fact that highavailability was accounted for from the start, is there a reason why merger and object store aren’t duplicated ?

  7. Is the HA setup able to deal with LAN vs WAN links ? E.g. the setup is spread across 2 (or more) datacenters / providers / countries. I.e. differences in latency, which probably means it’ll prefer LAN normally and pick WAN in case of downtime.

  8. Anything in place to reassure data integrity ? Checksums on the object store ? Could different extractors have different output data ? (Imho for the graph, data determinism is king, even at a performance hit.)

Questions 3+4+5 are intended for assessing the ability to run a unified setup for all needs. As such i think it’s important to consider backward compatibility. Additionally, the standalone indexer project i run, the team also has direct rpc access to the archive node, because of various (occasional) needs.

3 Likes

Bonus question : any built-in prometheus metrics to the firehose ?

Gathered from elsewhere:

I have a question about this part:

It takes on responsibilities that were initially given to … a full archive node when the subgraph does external calls at different block height (to query state, rounded at the end of the block, see Principles & Approach for details).

This suggests that the firehose could be used to execute contract calls, can you elaborate on that part?

No, it wouldn’t execute contract calls (like eth_call does) without some additional machinery (which we had planned, but is out of scope for now). But because it transports state changes, including the previous and new values (think previous and new ERC-20 balance for an address), some calls to an archive node can be avoided.

All code we wrote is permeated by prometheus instrumentation.

3 Likes

Sorry for the late response in here. Koen and I had some side-channel chat, but here are my answers for everyone’s benefit:

  1. Instrumentation means sprinkling a few conditional printf statements in the source code of a client. Once it’s there, it’s a matter of keeping up to date with the upstream, or even better, having the printfs merged upstream. In our experience, we’ve not seen lots of client upgrades touch the execution paths that are instrumented (which are mostly around transaction execution, block execution, etc…). So it often means just rebuilding with the patch. Some times, when new pieces of data are added, new features that affect existing data structures are enabled, we need a more diligent review. We can think of it (in terms of risks and complexity) as something similar to breaking changes on the RPC interfaces of nodes. Review and testing of more complex upgrades like that can take a few days.

Today, for chains in ETH-land, we have instrumentation for geth, OpenEthereum, bor, and a few other geth derivatives (applying the patch was around half a day for those geth derivatives).

  1. The doc linked above states the design principles, but it is our long term goal to strictly define what it would mean for a protocol to implement a firehose output.

  2. That could be envisioned, but there’s a lot of data the firehose produces that is not available on the disk. Most nodes round-up state data for example, at the block level. Most firehose implementation extract state at the EVM-call level, or transaction-level, so has much more granularity and precision than what an archive node holds.

  3. Yes of course. Outside the firehose, our team has built several services (that feed from the firehose) to offer things like state snapshots, JSON-RPC calls emulation, holding subsets of the state for example (specialized EVM execution). If you think of the firehose as a remote sync protocol, or a master-to-slave binlog sort of thing, you can imagine all sorts of services that can be optimized for different operations, and can certainly serve the same requests as the standard node. Even better and faster, since those “replica” nodes, or slaves, wouldn’t have the load of verifying or applying the state transitions through transaction execution: merely applying the changes brought by the firehose blindly.

  4. As the firehose is integrated as an optional source of data in graph-node, it will be able to handle blockHandlers, callHandlers and eventHandlers and boost the performances of subgraphs that use those things. However, if a subgraph relies heavily on ethereum.eth_call calls to nodes, it will be slowed down by that operation. Using the firehose as a data source will not preclude subgraphs to use those calls, it just won’t be less efficient. So basically, subgraphs should have immediate benefits if the backend uses firehose, but not all of the benefits of parallelization (I’ll talk about that soon enough :P)

  5. The object store is expected to maintain its own high-availability (think S3, Google Storage, or your own Minio setup, or some replicated glusterfs or ceph, a SAN/NAS-based, or RAID, yadi yada setups). These stores are normally atomic after a CreateObject call has been successful. The merger can fail and restart without affecting the rest. The system is tolerant even if its down for a few minutes. It starts putting memory pressure on the firehose service if it doesn’t come back, but that’s the extent of it. It is always slightly delayed any way. The firehose services always have in-memory blocks to bridge the flat files and the real-time stream. So having redundancy in the merger is not critical.

  6. Yes. The diverse nodes pushing data out actually race to push to consuming clients, so if your LAN connection provides data earlier, your whole system is faster, if the WAN input receives some data earlier, again, the client will receive data at the fastest speed. It’s possible to have clusters of firehoses geographically dispersed, and it might happen that some users in some parts of the world will see things faster (say an Asian user, receiving a block from a mining pool in Asia, through a firehose located in Asia), than other places (a US user, connecting to a Firehose in Asia, receiving a block produced in the US). Unsured I answered the root of the question, but hope this helps :stuck_out_tongue:

  7. Being flat files, many things could be implemented to ensure data integrity on the contents of the files. Right now, we do not have anything written to systematically check, or hash those files. By design, most of the data in those files should be deterministic (some protocols have timings included , or timestamps of execution, which can be safely ignored from hashing) and therefore could be agreed upon by sharing hashes and validating other people’s data.

4 Likes

Thank you very much for this extensive reply, this helps me a lot with understanding the tech and its possibilities. A few follow up questions :

#3. Would it be possible to use the mindreader process on an instrumented archive node (rather than fast / full) and as such replace mind-reader-bootstrap-data-url for launching parallel mindreaders at different sections of the chain ? In such case, would it matter if the instrumented node did the syncing or can it take the data from a plain node as long as it’s an instrumented node feeding the mindreader ?

#8. Given the importance of data integrity / determinism, i’d love to see systematic checks implemented, not only for comparing installs but also on read operations when data gets accessed. (More a feature request than a question.)