Catalog data source
NCI regularly crawls its data collections in multiple domains (climate and weather, earth observation and environmental science, and geophysics) and produces catalog data source files under project dk92 in /g/data/dk92/catalog/v2/). These catalogues can then work with our data analysis software environments and packages.
File Format
One of important requirements of our dataset indexes is to support the heterogeneous schema among datasets, such as: variable number of columns; tree-structured metadata; and so on. To address this, we use the Apache Parquet file format for the data source indexes. As a columnar data storage format, parquet provides many benefits including: improved storage efficiency; increased query performance; and reduced data loading times. Additionally, parquet files are often used in conjunction with analytical frameworks such as Apache Spark, making it easier to perform powerful "big data" analytics.
Data layout
The Apache Spark DataFrame is used to organise the NCI indexing data layout. A Spark DataFrame is a distributed collection of (meta)data organized into named columns. It is built on top of the Apache Spark core and provides a flexible programming interface similar to SQL or data frames in R or Python, enables both high performance access and multi-node scalability to exploit both in-memory and disk-based data processing.
Spark DataFrame can be used smoothly with the intake framework. Intake makes it easy to create data catalogs from different data sources. Once the data source is defined in the catalog, it can be loaded into a Spark DataFrame using the intake API. The Spark DataFrame can then be used for data analysis and manipulation with the powerful Spark cluster.
Tools
There are many tools can be used to access the NCI intake data catalog files and data source files. Here we introduce two of them: intake-spark and Spark SQL.
Intake-spark
Intake-spark is an Intake plugin that provides a unified interface for loading and accessing data in Apache Spark using the Intake data catalog system. Spark is a powerful distributed computing framework for processing large-scale data, but working with it can be challenging because it requires specific knowledge of Spark's API and data sources. Intake-spark simplifies this process by providing a consistent and intuitive interface for loading data into Spark DataFrame. Intake-spark supports several file formats, including Apache Parquet, Avro, CSV, and JSON, and can read data from various storage systems such as HDFS, Amazon S3, and Azure Blob Storage. Intake-spark also allows users to configure advanced settings such as partitioning and caching for improved performance.
Spark SQL
Using SQL in accessing Apache Spark DataFrames provides a convenient and familiar way for users to query and manipulate data, particularly for those who are already familiar with SQL. The Spark SQL module provides an interface for executing SQL queries against Spark DataFrames, making it easier for users to work with structured data and perform complex data analysis.
...
Using SQL in accessing Spark DataFrames can provide several benefits, including the ability to leverage the powerful query optimization and distributed processing capabilities of Spark. By executing SQL queries on Spark DataFrames, users can take advantage of Spark's multi-node, distributed processing capabilities to quickly process large volumes of data and gain valuable insights from it.
NCI platform and software environments
NCI provides multiple platforms for users to use our data indexes, such as the interactive ARE JupyterLab session, or the Gadi PBS jobs.
...
NCI Data Indexing Operations
Here we introduce the workflow for using intake-spark and spark SQL. User can adopt other working tools and methods to access the NCI data indexes.
Intake-spark
To use intake-spark, you will need to follow these steps
Creating a SparkSession
SparkHolder
is a class in intake-spark that is used to manage a connection to a running Spark session.
...
Code Block | ||
---|---|---|
| ||
from intake_spark.base import SparkHolder h = SparkHolder(True, [('catalog', )], {}) h.setup() session = h.session[0] session.conf.set("spark.sql.caseSensitive", "true") |
Loading a Catalog Data Source
You can use one of the two options to load catalog data into Spark DataFrame.
Option 1: Loading catalog file. | Option 2: Loading data source files directly. | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
The catalog file for each dataset contains information about a set of data sources, such as their location, format, and metadata. You can use | In this option you can use | ||||||||||
Example code: Loading cmip6-oi10 catalog data via catalog file
| Example code: Loading cmip6-oi10 catalog data from data source file
|
Processing Catalog Data
Get columns of a Spark DataFrame
The NCI data indexing DataFrame
may contain several columns as below:
...
You can use the above schema to find out the columns you need to search.
Get unique values of columns
Show unique values of a single column of "attributes.experiment_id" and "attributes.institution_id" | Show unique values of combined columns of "attributes.experiment_id" and "attributes.institution_id" | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
|
|
You can specify a larger number in show() function and set "truncate=False" to display more rows in full lengths.
Searching within a Spark DataFrame
Searching within a Spark DataFrame typically involves filtering the DataFrame based on a particular condition or a set of conditions. You can define a condition using Spark's column operations. You can use any of the available functions in Spark's column API to create the condition and combine them using logical operators like 'and', 'or' and 'not'.
...
Code Block | ||
---|---|---|
| ||
In[3]: filter_df.show() +--------------------+--------------------+---------+--------------------+--------------------+--------------------+ | attributes| dimensions|file_type| file_uri| format| variables| +--------------------+--------------------+---------+--------------------+--------------------+--------------------+ |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| |{null, null, null...|{null, null, null...| f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...| +--------------------+--------------------+---------+--------------------+--------------------+--------------------+ only showing top 20 rows |
Load the Dataset
From the filtered Spark DataFrame, you can extract a list of file paths from its 'file_uri' column as below:
...
Code Block | ||||
---|---|---|---|---|
| ||||
In[3]: ds = xr.open_mfdataset(flist, chunks={'time':720}, combine='by_coords',parallel=True) print(ds) <xarray.Dataset> Dimensions: (time: 482120, bnds: 2, lat: 192, lon: 384) Coordinates: * time (time) datetime64[ns] 1850-01-01T01:30:00 ... 2014-12-31T22:30:00 * lat (lat) float64 -89.28 -88.36 -87.42 -86.49 ... 87.42 88.36 89.28 * lon (lon) float64 0.0 0.9375 1.875 2.812 ... 356.2 357.2 358.1 359.1 Dimensions without coordinates: bnds Data variables: time_bnds (time, bnds) datetime64[ns] dask.array<chunksize=(720, 2), meta=np.ndarray> lat_bnds (time, lat, bnds) float64 dask.array<chunksize=(2920, 192, 2), meta=np.ndarray> lon_bnds (time, lon, bnds) float64 dask.array<chunksize=(2920, 384, 2), meta=np.ndarray> pr (time, lat, lon) float32 dask.array<chunksize=(720, 192, 384), meta=np.ndarray> Attributes: (12/44) Conventions: CF-1.7 CMIP-6.2 activity_id: CMIP branch_method: standard branch_time_in_child: 0.0 branch_time_in_parent: 54421.0 creation_date: 2019-05-02T11:50:26Z ... ... title: AWI-CM-1-1-MR output prepared for CMIP6 variable_id: pr variant_label: r1i1p1f1 license: CMIP6 model data produced by Alfred Wegener Insti... cmor_version: 3.4.0 tracking_id: hdl:21.14100/90ae6418-719b-4485-a424-a5682862cb2c |
SQL
To use SQL queries with Apache Spark DataFrames, you can leverage the Spark SQL module, which provides a SQL-like interface for working with structured data. Here are the steps to use SQL queries with a Spark DataFrame.
Creating a SparkSession
First of all, you need to create a SparkSession object, which is the entry point to using Spark functionality. You can simply create a SparkSession with the default config as below
...
Code Block | ||||
---|---|---|---|---|
| ||||
import sys import os from pyspark.sql import SparkSession def get_spark_session(): if '_spark_instance' not in sys.modules: def get_env(var_name, default_val): return os.environ[var_name] if var_name in os.environ else default_val spark_master = get_env('spark_master', 'local[8]') executor_memory = get_env('spark_executor_memory', '4g') driver_memory = get_env('spark_driver_memory', '6g') local_tmp_dir = get_env('spark_local_dir', os.path.abspath(os.path.expanduser('~/tmp/spark-tmp'))) log_level = get_env('spark_log_level', 'WARN') spark = SparkSession.builder.master(spark_master).appName('intake-sql').\ config('spark.executor.memory', executor_memory).\ config('spark.driver.memory', driver_memory).\ config('spark.local.dir', local_tmp_dir).\ config('spark.sql.caseSensitive', 'true').\ getOrCreate() spark.sparkContext.setLogLevel(log_level) sys.modules['_spark_instance'] = spark return sys.modules['_spark_instance'] spark = get_spark_session() |
Loading Catalog Data
You can load a catalog data source via "read"
method of the SparkSession
object, which supports various file formats such as CSV, JSON, and Parquet.
...
Code Block | ||||
---|---|---|---|---|
| ||||
In[3]: ds_file.createOrReplaceTempView('tbl') |
Processing Catalog Data
Conduct SQL queries
Now you can execute SQL queries on your DataFrame using the spark.sql
method. This method takes a SQL query string and returns a DataFrame with the results. The Spark SQL module supports many SQL features and functions. Keep in mind that SQL queries executed with Spark SQL are transformed into a Spark execution plan, which is then optimised and executed on a distributed cluster, making it a powerful way to work with large-scale data.
...
Code Block | ||||
---|---|---|---|---|
| ||||
In[3]: df =spark.sql(''' select file_uri from tbl where attributes.institution_id = 'AWI' and attributes.source_id = 'AWI-CM-1-1-MR' and attributes.experiment_id = 'historical' and attributes.variant_label='r1i1p1f1' and attributes.table_id = '3hr' and attributes.variable_id = 'pr' and attributes.grid_label = "gn" ''').toPandas() flist=df.file_uri.to_list() |
Load the Dataset
To accelerate the data loading process, you could set up a Dask cluster prior to the loading operations via Xarray.
...