Page tree

Creating a SparkSession

First of all, you need to create a SparkSession object, which is the entry point to using Spark functionality. You can simply create a SparkSession with the default config as below

Start a SparkSession

In[3]: from pyspark.sql import SparkSession
       spark = SparkSession.builder.appName("intake-sql").getOrCreate()

Or you can set up a SparkSession with fine-tuning configures as below

Start Spark cluster

import sys
import os
from pyspark.sql import SparkSession
def get_spark_session():
  if '_spark_instance' not in sys.modules:
    def get_env(var_name, default_val):
      return os.environ[var_name] if var_name in os.environ else default_val
    spark_master = get_env('spark_master', 'local[8]')
    executor_memory = get_env('spark_executor_memory', '4g')
    driver_memory = get_env('spark_driver_memory', '6g')
    local_tmp_dir = get_env('spark_local_dir', os.path.abspath(os.path.expanduser('~/tmp/spark-tmp')))
    log_level = get_env('spark_log_level', 'WARN')
    spark = SparkSession.builder.master(spark_master).appName('intake-sql').\
      config('spark.executor.memory', executor_memory).\
      config('spark.driver.memory', driver_memory).\
      config('spark.local.dir', local_tmp_dir).\
      config('spark.sql.caseSensitive', 'true').\
      getOrCreate()
    spark.sparkContext.setLogLevel(log_level)
    sys.modules['_spark_instance'] = spark
  return sys.modules['_spark_instance']
spark = get_spark_session()

Loading Catalog Data

You can load a catalog data source via "read" method of the SparkSession object, which supports various file formats such as CSV, JSON, and Parquet.

For example, you can load the catalog data source from Parquet file via "read.parquet" method as below
Read DataSource file

In[3]: ds_file = spark.read.parquet('/g/data/dk92/catalog/v2/data/cmip6-oi10')

Once you have loaded your data into a Spark DataFrame, you can register it as a temporary table using the createOrReplaceTempView method. This method creates a table name that you can use in your SQL queries to refer to the DataFrame. For example, you can use the following code to create a temporary table

Create a temporary table

In[3]: ds_file.createOrReplaceTempView('tbl')

Processing Catalog Data

Conduct SQL queries

Now you can execute SQL queries on your DataFrame using the spark.sql method. This method takes a SQL query string and returns a DataFrame with the results. The Spark SQL module supports many SQL features and functions. Keep in mind that SQL queries executed with Spark SQL are transformed into a Spark execution plan, which is then optimised and executed on a distributed cluster, making it a powerful way to work with large-scale data.

You can easily convert the SQL queries results to Pandas table and extract the file path list with Pandas to_list() method.

Query the SparkDataset

In[3]: df =spark.sql('''
       select file_uri
       from tbl
       where attributes.institution_id = 'AWI'
       and attributes.source_id = 'AWI-CM-1-1-MR'
       and attributes.experiment_id = 'historical'
       and attributes.variant_label='r1i1p1f1'
       and attributes.table_id = '3hr'
       and attributes.variable_id = 'pr'
       and attributes.grid_label = "gn"
       ''').toPandas()
       flist=df.file_uri.to_list()

Load the Dataset

To accelerate the data loading process, you could set up a Dask cluster prior to the loading operations via Xarray.

Start Dask cluster

In[3]: import xarray as XR
       import dask.array as da
       from dask.distributed import Client, LocalCluster
       cluster = LocalCluster()
       client = Client(cluster)

Now you can load the data from a list of files as below

Loading the dataset

In[3]: ds = xr.open_mfdataset(flist, chunks={'time':720}, combine='by_coords',parallel=True)
       print(ds)
 
<xarray.Dataset>
Dimensions:    (time: 482120, bnds: 2, lat: 192, lon: 384)
Coordinates:
  * time       (time) datetime64[ns] 1850-01-01T01:30:00 ... 2014-12-31T22:30:00
  * lat        (lat) float64 -89.28 -88.36 -87.42 -86.49 ... 87.42 88.36 89.28
  * lon        (lon) float64 0.0 0.9375 1.875 2.812 ... 356.2 357.2 358.1 359.1
Dimensions without coordinates: bnds
Data variables:
    time_bnds  (time, bnds) datetime64[ns] dask.array<chunksize=(720, 2), meta=np.ndarray>
    lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(2920, 192, 2), meta=np.ndarray>
    lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(2920, 384, 2), meta=np.ndarray>
    pr         (time, lat, lon) float32 dask.array<chunksize=(720, 192, 384), meta=np.ndarray>
Attributes: (12/44)
    Conventions:            CF-1.7 CMIP-6.2
    activity_id:            CMIP
    branch_method:          standard
    branch_time_in_child:   0.0
    branch_time_in_parent:  54421.0
    creation_date:          2019-05-02T11:50:26Z
    ...                     ...
    title:                  AWI-CM-1-1-MR output prepared for CMIP6
    variable_id:            pr
    variant_label:          r1i1p1f1
    license:                CMIP6 model data produced by Alfred Wegener Insti...
    cmor_version:           3.4.0
    tracking_id:            hdl:21.14100/90ae6418-719b-4485-a424-a5682862cb2c
  • No labels