Save subgraph query result to a CSV file

ROH: 2022-12-30

Introduction


In computing, extract, transform, load (ETL) is a three-phase process where data is extracted, transformed (cleaned, sanitized, scrubbed) and loaded into an output data container.

-Wikipedia



In this lesson we’re going to use Rust to build an ETL pipeline that submits a GraphQL query to the Hosted Service graph-network-mainnet subgraph, performs some minimal data cleaning on the respnse, then saves the results to a comma separated value (CSV) file.


query GnosisSubgraphs {
  subgraphDeployments(where: {network: "gnosis"}) {
    id
    activeSubgraphCount
    createdAt
    ipfsHash
    subgraphCount
    stakedTokens
    signalledTokens
    originalName
  }
}


The above query requests data about Gnosis Chain subgraph deployments in The Graph protocol. Here’s some more information about subgraphDeployments and the returned fields:

  • subgraphDeployments: The SubgraphDeployment is represented by the immutable subgraph code that is uploaded, and posted to IPFS. A SubgraphDeployment has a manifest which gives the instructions to the Graph Network on what to index. The entity stores relevant data for the SubgraphDeployment on how much it is being staked on and signaled on in the contracts, as well as how it is performing in query fees. It is related to a SubgraphVersion.
  • id: Subgraph Deployment ID. The IPFS hash with Qm removed to fit into 32 bytes
  • activeSubgraphCount: Amount of active Subgraph entities that are currently using this deployment. Deprecated subgraph entities are not counted
  • createdAt: Creation timestamp
  • ipfsHash: IPFS hash of the subgraph manifest
  • subgraphCount: Total amount of Subgraph entities that used this deployment at some point. subgraphCount >= activeSubgraphCount + deprecatedSubgraphCount
  • stakedTokens: CURRENT total stake of all indexers on this Subgraph Deployment
  • signalledTokens: CURRENT signalled tokens in the bonding curve
  • originalName: The original Subgraph that was deployed through GNS. Can be null if never created through GNS. Used for filtering in the Explorer


Gnosis Chain is the first non-ETH chain announced during The Graph’s Migration Infrastructure Providers (MIPs) program.


Along with previously covered Rust concepts (see other guides), here’s a quick overview of the concepts and topics we’ll encouter in this lesson


Let’s get started!


Code


From your terminal/command line, create a new cargo project and open it with VSCode

cargo new gnosis_subgraphs
cd gnosis_subgraphs
code .



With VSCode now open, click Cargo.toml in the sidebar then add the following dependencies (below [dependencies]). Make sure to save your changes.

serde = { version = "1.0.149", features = ["derive"] }
reqwest = { version = "0.11", features =  ["json"]}
tokio = { version = "1.23.0", features = ["full"] }
csv = "1.1.6"


  • serde is a “framework for serializing and deserializing Rust data structures efficiently and generically”
  • reqwest “provides a convenient, higher-level HTTP Client”
  • tokio is an “event-driven, non-blocking I/O platform for writing asynchronous applications with the Rust programming language”
  • csv provides a fast and flexible CSV reader and writer, with support for Serde.



Open src/main.rs in VSCode, delete the main function, and add the following use statements at the top of the file

use std::collections::HashMap;
use std::string::String;
use std::error::Error;
use serde::{Serialize, Deserialize};



Still in src/main.rs, add the following const statments below the use statements

const NETWORK_SUBGRAPH_URL: &str = "https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-mainnet";
const NETWORK_SUBGRAPH_QUERY: &str = "{subgraphDeployments(where: {network: \"gnosis\"}) {id activeSubgraphCount createdAt ipfsHash subgraphCount stakedTokens signalledTokens originalName}}";
const CSV_FILE_PATH: &str = "./gnosis_subgraphs.csv";


  • NETWORK_SUBGRAPH_URL: URL where POST request will be sent
  • NETWORK_SUBGRAPH_QUERY: GraphQL query to be sent in POST request
  • CSV_FILE_PATH: Filepath to save results as a CSV



Next add some struct statements to src/main.rs to serialize and deserialize the query results

#[allow(non_snake_case)]
#[derive(Debug, Serialize)]
struct CsvRecord<'a> {
    id: &'a String,
    activeSubgraphCount: &'a i32,
    createdAt: &'a i32,
    ipfsHash: &'a String,
    subgraphCount: &'a i32,
    stakedTokens: &'a String,
    signalledTokens: &'a String,
    originalName: &'a Option<String>,
}


#[derive(Debug, Deserialize, PartialEq)]
struct SubgraphDeploymentsResponse {
    data: HashMap<String, Vec<SubgraphDeployment>>
}

#[allow(non_snake_case)]
#[derive(Debug, Deserialize, PartialEq)]
struct SubgraphDeployment {
    id: String,
    activeSubgraphCount: i32,
    createdAt: i32,
    ipfsHash: String,
    subgraphCount: i32,
    stakedTokens: String,
    signalledTokens: String,
    originalName: Option<String>,
}


  • CsvRecord:
  • SubgraphDeploymentsResponse:
  • SubgraphDeployment:



Finally add a main function to src/main.rs

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut map = HashMap::new();
    map.insert("query", NETWORK_SUBGRAPH_QUERY);

    let response: SubgraphDeploymentsResponse = reqwest::Client::new()
    .post(NETWORK_SUBGRAPH_URL)
    .json(&map)
    .send()
    .await?
    .json()
    .await?;

    let mut wtr = csv::Writer::from_path(CSV_FILE_PATH)?;

    for subgraph_deployment in &response.data["subgraphDeployments"] {
        wtr.serialize(CsvRecord {
            id: &subgraph_deployment.id,
            activeSubgraphCount: &subgraph_deployment.activeSubgraphCount,
            createdAt: &subgraph_deployment.createdAt,
            ipfsHash: &subgraph_deployment.ipfsHash,
            subgraphCount: &subgraph_deployment.subgraphCount,
            stakedTokens: &subgraph_deployment.stakedTokens,
            signalledTokens: &subgraph_deployment.signalledTokens,
            originalName: &subgraph_deployment.originalName,
        })?;
    }

    wtr.flush()?;

    Ok(())
} 



The completed src/main.rs looks something like this

use std::collections::HashMap;
use std::string::String;
use std::error::Error;
use serde::{Serialize, Deserialize};

const NETWORK_SUBGRAPH_URL: &str = "https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-mainnet";
const NETWORK_SUBGRAPH_QUERY: &str = "{subgraphDeployments(where: {network: \"gnosis\"}) {id activeSubgraphCount createdAt ipfsHash subgraphCount stakedTokens signalledTokens originalName}}";
const CSV_FILE_PATH: &str = "./gnosis_subgraphs.csv";

#[allow(non_snake_case)]
#[derive(Debug, Serialize)]
struct CsvRecord<'a> {
    id: &'a String,
    activeSubgraphCount: &'a i32,
    createdAt: &'a i32,
    ipfsHash: &'a String,
    subgraphCount: &'a i32,
    stakedTokens: &'a String,
    signalledTokens: &'a String,
    originalName: &'a Option<String>,
}

#[derive(Debug, Deserialize, PartialEq)]
struct SubgraphDeploymentsResponse {
    data: HashMap<String, Vec<SubgraphDeployment>>
}

#[allow(non_snake_case)]
#[derive(Debug, Deserialize, PartialEq)]
struct SubgraphDeployment {
    id: String,
    activeSubgraphCount: i32,
    createdAt: i32,
    ipfsHash: String,
    subgraphCount: i32,
    stakedTokens: String,
    signalledTokens: String,
    originalName: Option<String>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut map = HashMap::new();
    map.insert("query", NETWORK_SUBGRAPH_QUERY);

    let response: SubgraphDeploymentsResponse = reqwest::Client::new()
    .post(NETWORK_SUBGRAPH_URL)
    .json(&map)
    .send()
    .await?
    .json()
    .await?;

    let mut wtr = csv::Writer::from_path(CSV_FILE_PATH)?;

    for subgraph_deployment in &response.data["subgraphDeployments"] {
        wtr.serialize(CsvRecord {
            id: &subgraph_deployment.id,
            activeSubgraphCount: &subgraph_deployment.activeSubgraphCount,
            createdAt: &subgraph_deployment.createdAt,
            ipfsHash: &subgraph_deployment.ipfsHash,
            subgraphCount: &subgraph_deployment.subgraphCount,
            stakedTokens: &subgraph_deployment.stakedTokens,
            signalledTokens: &subgraph_deployment.signalledTokens,
            originalName: &subgraph_deployment.originalName,
        })?;
    }

    wtr.flush()?;

    Ok(())
} 



Save your changes then run the program from the integrated terminal in VSCode

cargo run



Open gnosis_subgraphs.csv to inspect the results


Closing thoughts


Coming soon