Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

First of all, you need to create a SparkSession object, which is the entry point to using Spark functionality. You can simply create a SparkSession with the default config as below

Start a SparkSession

Code Block
In[3]: from pyspark.sql import

...

 SparkSession
       spark = SparkSession.builder.appName("intake-sql").getOrCreate()

Or you can set up a SparkSession with fine-tuning configures as below

Start Spark cluster

Code Block
import sys

...


import os

...


from pyspark.sql import SparkSession

...


def get_spark_session():

...


  if '_spark_instance' not in sys.modules:

...


    def get_env(var_name, default_val):

...


      return os.environ[var_name] if var_name in os.environ else default_val

...


    spark_master = get_env('spark_master', 'local[8]')

...


    executor_memory = get_env('spark_executor_memory', '4g')

...


    driver_memory = get_env('spark_driver_memory', '6g')

...


    local_tmp_dir = get_env('spark_local_dir', os.path.abspath(os.path.expanduser('~/tmp/spark-tmp')))

...


    log_level = get_env('spark_log_level', 'WARN')

...


    spark = SparkSession.builder.master(spark_master).appName('intake-sql').\

...


      config('spark.executor.memory', executor_memory).\

...


      config('spark.driver.memory', driver_memory).\

...


      config('spark.local.dir', local_tmp_dir).\

...


      config('spark.sql.caseSensitive', 'true').\

...


      getOrCreate()

...


    spark.sparkContext.setLogLevel(log_level)

...


    sys.modules['_spark_instance'] =

...

 spark
  return sys.modules['_spark_instance']

...


spark = get_spark_session()

Loading Catalog Data

You can load a catalog data source via "read" method of the SparkSession object, which supports various file formats such as CSV, JSON, and Parquet.

For example, you can load the catalog data source from Parquet file via "read.parquet" method as below
Read DataSource file

Code Block
In[3]: ds_file = spark.read.parquet('/g/data/dk92/catalog/v2/data/cmip6-oi10')

Once you have loaded your data into a Spark DataFrame, you can register it as a temporary table using the the createOrReplaceTempView method. This method creates a table name that you can use in your SQL queries to refer to the DataFrame. For example, you can use the following code to create a temporary table

Create a temporary table

Code Block
In[3]: ds_file.createOrReplaceTempView('tbl')

Processing Catalog Data

Conduct SQL queries

Now you can execute SQL queries on your DataFrame using the spark.sql method. This method takes a SQL query string and returns a DataFrame with the results. The Spark SQL module supports many SQL features and functions. Keep in mind that SQL queries executed with Spark SQL are transformed into a Spark execution plan, which is then optimised and executed on a distributed cluster, making it a powerful way to work with large-scale data.

You can easily convert the SQL queries results to Pandas table and extract the file path list with Pandas to_list() method.

Query the SparkDataset

Code Block
In[3]: df =spark.sql('''

...


       select file_uri

...


       from tbl
       where attributes.institution_id = 'AWI'

...


       and attributes.source_id = 'AWI-CM-1-1-MR'

...


       and attributes.experiment_id = 'historical'

...


       and attributes.variant_label='r1i1p1f1'

...


       and attributes.table_id = '3hr'

...


       and attributes.variable_id = 'pr'

...


       and attributes.grid_label = "gn"

...


       ''').toPandas()

...


       flist=df.file_uri.to_list()

Load the Dataset

To accelerate the data loading process, you could set up a Dask cluster prior to the loading operations via Xarray. 

Start Dask cluster

Code Block
In[3]: import xarray as

...

 XR
       import dask.array as

...

 da
       from dask.distributed import Client,

...

 LocalCluster
       cluster = LocalCluster()

...


       client = Client(cluster)

Now you can load the data from a list of files as below

Loading the dataset

Code Block
In[3]: ds = xr.open_mfdataset(flist, chunks={'time':720}, combine='by_coords',parallel=True)

...


       print(ds)

...


 

...


<xarray.Dataset>

...


Dimensions:    (time: 482120, bnds: 2, lat: 192, lon: 384)

...


Coordinates:

...


  * time       (time) datetime64[ns] 1850-01-01T01:30:00 ... 2014-12-31T22:30:00

...


  * lat        (lat) float64 -89.28 -88.36 -87.42 -86.49 ... 87.42 88.36 89.28

...


  * lon        (lon) float64 0.0 0.9375 1.875 2.812 ... 356.2 357.2 358.1 359.1

...


Dimensions without coordinates: bnds

...


Data variables:

...


    time_bnds  (time, bnds) datetime64[ns] dask.array<chunksize=(720, 2), meta=np.ndarray>

...


    lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(2920, 192, 2), meta=np.ndarray>

...


    lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(2920, 384, 2), meta=np.ndarray>

...


    pr         (time, lat, lon) float32 dask.array<chunksize=(720, 192, 384), meta=np.ndarray>

...


Attributes: (12/44)

...


    Conventions:            CF-1.7 CMIP-6.2

...


    activity_id:            CMIP

...


    branch_method:          standard

...


    branch_time_in_child:   0.0

...


    branch_time_in_parent:  54421.0

...


    creation_date:          2019-05-02T11:50:26Z

...


    ...                     ...

...


    title:                  AWI-CM-1-1-MR output prepared for CMIP6

...


    variable_id:            pr

...


    variant_label:          r1i1p1f1

...


    license:                CMIP6 model data produced by Alfred Wegener Insti...

...


    cmor_version:           3.4.0

...


    tracking_id:            hdl:21.14100/90ae6418-719b-4485-a424-a5682862cb2c