Specialised Environments

Page tree

Creating a SparkSession

SparkHolder is a class in intake-spark that is used to manage a connection to a running Spark session.

When you create an intake-spark data catalog entry for a Spark data source, the driver argument specifies that the data should be loaded using Spark. This means that a Spark session will be created to read the data. The SparkHolder class is responsible for managing this Spark session and ensuring that it is available when needed.

SparkHolder works by creating a singleton instance of a Spark session, which can be shared across multiple data sources. This means you an use the same Spark session to load data from multiple catalog data sources. This can help to reduce the overhead of creating and tearing down Spark sessions.

The SparkHolder class also provides some configuration options for the Spark session, such as the number of executor cores and memory allocation. These options can be customized by passing arguments to the args parameter when defining the data source in the Intake catalog entry.

from intake_spark.base import SparkHolder
h = SparkHolder(True, [('catalog', )], {})
h.setup()
session = h.session[0]
session.conf.set("spark.sql.caseSensitive", "true")

Loading a Catalog Data Source

You can use one of the two options to load catalog data into Spark DataFrame. 

Option 1: Loading catalog file.

Option 2: Loading data source files directly.

The catalog file for each dataset contains information about a set of data sources, such as their location, format, and metadata. You can use intake.open_catalog() method to open the catalog file and load the data source specified within the catalog file. It returns a Catalog object, which allows users to access and load the data sources described in the catalog. After that, you can convert the data source into Spark DataFrame via its to_spark() method. 

In this option you can use open_spark_dataframe() method to load a Spark DataFrame from data source files ( in parquet format ) directly. This method takes in a set of parameters that specify the data source and any necessary options, and returns a Spark DataFrame that can be used for data analysis and processing.

Example code: Loading cmip6-oi10 catalog data via catalog file

import intake

catalog = intake.open_catalog('/g/data/dk92/catalog/yml/cmip6-oi10.yml')
df = catalog.mydata.to_spark()

Example code: Loading cmip6-oi10 catalog data from data source file

import intake

cat_path="/g/data/dk92/catalog/v2/data/cmip6-oi10"
source = intake.open_spark_dataframe([
    ['read', ],
    ['option', ["mergeSchema", "true"]],
    ['parquet', [cat_path,]]
    ])
df = source.to_spark()

Processing Catalog Data

Get columns of a Spark DataFrame

The NCI data indexing DataFrame may contain several columns as below:

In[1]: print(df.columns)
['attributes', 'dimensions', 'file_type', 'file_uri', 'format', 'variables']

Some of these columns are defined as Spark StructType as they are organised in the nested manner.

StructType is essentially a list of StructField objects, where each StructField object represents a single column in the schema. In a nested structure, a StructField can have a data type of another StructType object, which can represent a sub-structure or a nested structure.

You can print the Schema of the whole DataFrame as below. In the example outputs, those columns named "attributes", "dimensions" and "variables" are in the type of StructType which contains multiple layers of other StructType columns. On the other hand, those columns named "file_uri","file_type" and "format" are all single StructField column in "string" type.

In[2]: df.printSchema()
 
root
 |-- attributes: struct (nullable = true)
 |    |-- CCCma_model_hash: string (nullable = true)
 |    |-- CCCma_parent_runid: string (nullable = true)
 |    |-- CCCma_pycmor_hash: string (nullable = true)
 |    |-- CCCma_runid: string (nullable = true)
 |    |-- CDI: string (nullable = true)
 |    |-- CDI_grid_type: string (nullable = true)
...
 |-- file_type: string (nullable = true)
 |-- file_uri: string (nullable = true)
 |-- format: string (nullable = true)
 |-- variables: struct (nullable = true)
 |    |-- 3basin: struct (nullable = true)
 |    |    |-- attributes: struct (nullable = true)
 |    |    |    |-- long_name: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- units: string (nullable = true)
 |    |    |-- chunking: struct (nullable = true)
 |    |    |    |-- sizes: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- storage: string (nullable = true)
 |    |    |-- deflate: struct (nullable = true)
 |    |    |    |-- level: long (nullable = true)
 |    |    |    |-- shuffle: boolean (nullable = true)
 |    |    |    |-- state: boolean (nullable = true)
 |    |    |-- dimensions: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- endianness: string (nullable = true)
 |    |    |-- type: string (nullable = true)
...
 |-- dimensions: struct (nullable = true)
 |    |-- 3basin: struct (nullable = true)
 |    |    |-- length: long (nullable = true)
 |    |    |-- unlimited: boolean (nullable = true)
 |    |-- axis_nbounds: struct (nullable = true)
 |    |    |-- length: long (nullable = true)
 |    |    |-- unlimited: boolean (nullable = true) root
...

Printing out the whole schema of a DataFrame may produce an overwhelmingly long list. An alternative approach can be to select a specify column via the ".select" API and only print that schema.

For example, the following command prints out the schema of a variable named "GEOLAT".

In[3]: df.select("variables.GEOLAT").printSchema()
root
 |-- GEOLAT: struct (nullable = true)
 |    |-- attributes: struct (nullable = true)
 |    |    |-- _FillValue: double (nullable = true)
 |    |    |-- cell_methods: string (nullable = true)
 |    |    |-- interp_method: string (nullable = true)
 |    |    |-- long_name: string (nullable = true)
 |    |    |-- missing_value: double (nullable = true)
 |    |    |-- units: string (nullable = true)
 |    |-- chunking: struct (nullable = true)
 |    |    |-- sizes: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- storage: string (nullable = true)
 |    |-- deflate: struct (nullable = true)
 |    |    |-- level: long (nullable = true)
 |    |    |-- shuffle: boolean (nullable = true)
 |    |    |-- state: boolean (nullable = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endianness: string (nullable = true)
 |    |-- type: string (nullable = true)

You can use the above schema to find out the columns you need to search.

Get unique values of columns

Show unique values of a single column of "attributes.experiment_id" and "attributes.institution_id"


In[3]: df.select("attributes.experiment_id")\
.distinct().show()
+--------------+
| experiment_id|
+--------------+
|   ssp534-over|
|piClim-histall|
| esm-piControl|
|        ssp585|
|     piControl|
|        ssp460|
|piClim-control|
|        ssp370|
|        ssp126|
|        ssp119|
|        ssp245|
|       1pctCO2|
|  abrupt-4xCO2|
|          amip|
|    historical|
|piClim-histghg|
|        ssp434|
|      esm-hist|
|piClim-histaer|
|      hist-aer|
+--------------+
only showing top 20 rows
  
In[3]: df.select("attributes.institution_id")\
.distinct().show()
+-----------------+
|institution_id   |
+-----------------+
|UA               |
|IPSL             |
|E3SM-Project     |
|CCCma            |
|CAS              |
|MIROC            |
|NASA-GISS        |
|MRI              |
|NCC              |
|HAMMOZ-Consortium|
|NCAR             |
|NUIST            |
|NOAA-GFDL        |
|NIMS-KMA         |
|BCC              |
|CNRM-CERFACS     |
|AWI              |
|MPI-M            |
|KIOST            |
|MOHC             |
+-----------------+
only showing top 20 rows
In[3]: df.select("attributes.institution_id",\
          "attributes.experiment_id")\
.distinct().show(40,truncate=False)
 
+--------------+--------------+
|institution_id|experiment_id |
+--------------+--------------+
|NCC           |ssp585        |
|NCAR          |esm-piControl |
|MRI           |esm-hist      |
|NOAA-GFDL     |ssp370        |
|E3SM-Project  |ssp585        |
|UA            |ssp245        |
|NUIST         |ssp245        |
|UA            |ssp370        |
|CNRM-CERFACS  |ssp245        |
|NOAA-GFDL     |ssp585        |
|CMCC          |abrupt-4xCO2  |
|MIROC         |ssp534-over   |
|CAMS          |1pctCO2       |
|CNRM-CERFACS  |ssp126        |
|NASA-GISS     |piClim-histall|
|CMCC          |piControl     |
|MRI           |historical    |
|BCC           |piControl     |
|MPI-M         |esm-hist      |
|NCAR          |historical    |
|MIROC         |ssp126        |
|MOHC          |piClim-histall|
|CAMS          |piControl     |
|MIROC         |ssp370        |
|MOHC          |ssp534-over   |
|MPI-M         |amip          |
|NOAA-GFDL     |ssp126        |
|CCCma         |piClim-histall|
|CAMS          |abrupt-4xCO2  |
|CAMS          |amip          |
|CNRM-CERFACS  |ssp370        |
|KIOST         |ssp126        |
|CNRM-CERFACS  |ssp585        |
|NCAR          |piClim-control|
|UA            |ssp126        |
|MOHC          |ssp119        |
|CMCC          |amip          |
|MRI           |abrupt-4xCO2  |
|NUIST         |ssp126        |
|NASA-GISS     |piClim-control|
+--------------+--------------+
only showing top 40 rows


You can specify a larger number in show() function and set "truncate=False" to display more rows in full lengths.

Searching within a Spark DataFrame 

Searching within a Spark DataFrame typically involves filtering the DataFrame based on a particular condition or a set of conditions. You can define a condition using Spark's column operations. You can use any of the available functions in Spark's column API to create the condition and combine them using logical operators like 'and', 'or' and 'not'.

For example, you can define multiple conditions as below

In[3]: condition=(df.attributes.institution_id == 'AWI') \
       &(df.attributes.source_id == 'AWI-CM-1-1-MR')  \
       &(df.attributes.experiment_id == 'historical')  \
       &(df.attributes.variant_label=='r1i1p1f1') \
       &(df.attributes.table_id == '3hr') \
       &(df.attributes.variable_id == 'pr') \
       &(df.attributes.grid_label == "gn")

Then you can apply 'filter()' method of the DataFrame to apply the condition and get the filtered DataFrame. The total number of rows in the filtered DataFrame could be obtained via "count()" method as below:

In[3]: filter_df=df.filter(condition)
       print(filter_df.count())
 
165

You can also use the 'where()' method to achieve the same result as below:

In[3]: filter_df=df.where(condition)
       print(filter_df.count())
 
165

Finally, you can use the show() method to display the filtered DataFrame

In[3]: filter_df.show()
 
+--------------------+--------------------+---------+--------------------+--------------------+--------------------+
|          attributes|          dimensions|file_type|            file_uri|              format|           variables|
+--------------------+--------------------+---------+--------------------+--------------------+--------------------+
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
+--------------------+--------------------+---------+--------------------+--------------------+--------------------+
only showing top 20 rows

Load the Dataset

From the filtered Spark DataFrame, you can extract a list of file paths from its 'file_uri' column as below:

In[3]: # import necessary libraries
      from pyspark.sql.functions import col
      flist=[row['file_uri'] for row in filter_df.select(col('file_uri')).collect()]

It is recommended to set up a Dask cluster firstly to accelerate the data loading process.

Start Dask cluster

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

Now you can load the data from a list of files via 'xarray' package as below

Loading dataset

In[3]: ds = xr.open_mfdataset(flist, chunks={'time':720}, combine='by_coords',parallel=True)
       print(ds)
 
<xarray.Dataset>
Dimensions:    (time: 482120, bnds: 2, lat: 192, lon: 384)
Coordinates:
  * time       (time) datetime64[ns] 1850-01-01T01:30:00 ... 2014-12-31T22:30:00
  * lat        (lat) float64 -89.28 -88.36 -87.42 -86.49 ... 87.42 88.36 89.28
  * lon        (lon) float64 0.0 0.9375 1.875 2.812 ... 356.2 357.2 358.1 359.1
Dimensions without coordinates: bnds
Data variables:
    time_bnds  (time, bnds) datetime64[ns] dask.array<chunksize=(720, 2), meta=np.ndarray>
    lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(2920, 192, 2), meta=np.ndarray>
    lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(2920, 384, 2), meta=np.ndarray>
    pr         (time, lat, lon) float32 dask.array<chunksize=(720, 192, 384), meta=np.ndarray>
Attributes: (12/44)
    Conventions:            CF-1.7 CMIP-6.2
    activity_id:            CMIP
    branch_method:          standard
    branch_time_in_child:   0.0
    branch_time_in_parent:  54421.0
    creation_date:          2019-05-02T11:50:26Z
    ...                     ...
    title:                  AWI-CM-1-1-MR output prepared for CMIP6
    variable_id:            pr
    variant_label:          r1i1p1f1
    license:                CMIP6 model data produced by Alfred Wegener Insti...
    cmor_version:           3.4.0
    tracking_id:            hdl:21.14100/90ae6418-719b-4485-a424-a5682862cb2c


  • No labels