...
First of all, you need to create a SparkSession object, which is the entry point to using Spark functionality. You can simply create a SparkSession with the default config as below
Start a SparkSession
Code Block |
---|
In[3]: from pyspark.sql import |
...
SparkSession spark = SparkSession.builder.appName("intake-sql").getOrCreate() |
Or you can set up a SparkSession with fine-tuning configures as below
Start Spark cluster
Code Block |
---|
import sys |
...
import os |
...
from pyspark.sql import SparkSession |
...
def get_spark_session(): |
...
if '_spark_instance' not in sys.modules: |
...
def get_env(var_name, default_val): |
...
return os.environ[var_name] if var_name in os.environ else default_val |
...
spark_master = get_env('spark_master', 'local[8]') |
...
executor_memory = get_env('spark_executor_memory', '4g') |
...
driver_memory = get_env('spark_driver_memory', '6g') |
...
local_tmp_dir = get_env('spark_local_dir', os.path.abspath(os.path.expanduser('~/tmp/spark-tmp'))) |
...
log_level = get_env('spark_log_level', 'WARN') |
...
spark = SparkSession.builder.master(spark_master).appName('intake-sql').\ |
...
config('spark.executor.memory', executor_memory).\ |
...
config('spark.driver.memory', driver_memory).\ |
...
config('spark.local.dir', local_tmp_dir).\ |
...
config('spark.sql.caseSensitive', 'true').\ |
...
getOrCreate() |
...
spark.sparkContext.setLogLevel(log_level) |
...
sys.modules['_spark_instance'] = |
...
spark return sys.modules['_spark_instance'] |
...
spark = get_spark_session() |
Loading Catalog Data
You can load a catalog data source via "read"
method of the SparkSession
object, which supports various file formats such as CSV, JSON, and Parquet.
For example, you can load the catalog data source from Parquet file via "read.parquet"
method as below
Read DataSource file
Code Block |
---|
In[3]: ds_file = spark.read.parquet('/g/data/dk92/catalog/v2/data/cmip6-oi10') |
Once you have loaded your data into a Spark DataFrame, you can register it as a temporary table using the the createOrReplaceTempView
method. This method creates a table name that you can use in your SQL queries to refer to the DataFrame. For example, you can use the following code to create a temporary table
Create a temporary table
Code Block |
---|
In[3]: ds_file.createOrReplaceTempView('tbl') |
Processing Catalog Data
Conduct SQL queries
Now you can execute SQL queries on your DataFrame using the spark.sql
method. This method takes a SQL query string and returns a DataFrame with the results. The Spark SQL module supports many SQL features and functions. Keep in mind that SQL queries executed with Spark SQL are transformed into a Spark execution plan, which is then optimised and executed on a distributed cluster, making it a powerful way to work with large-scale data.
You can easily convert the SQL queries results to Pandas table and extract the file path list with Pandas to_list() method.
Query the SparkDataset
Code Block |
---|
In[3]: df =spark.sql(''' |
...
select file_uri |
...
from tbl where attributes.institution_id = 'AWI' |
...
and attributes.source_id = 'AWI-CM-1-1-MR' |
...
and attributes.experiment_id = 'historical' |
...
and attributes.variant_label='r1i1p1f1' |
...
and attributes.table_id = '3hr' |
...
and attributes.variable_id = 'pr' |
...
and attributes.grid_label = "gn" |
...
''').toPandas() |
...
flist=df.file_uri.to_list() |
Load the Dataset
To accelerate the data loading process, you could set up a Dask cluster prior to the loading operations via Xarray.
Start Dask cluster
Code Block |
---|
In[3]: import xarray as |
...
XR import dask.array as |
...
da from dask.distributed import Client, |
...
LocalCluster cluster = LocalCluster() |
...
client = Client(cluster) |
Now you can load the data from a list of files as below
Loading the dataset
Code Block |
---|
In[3]: ds = xr.open_mfdataset(flist, chunks={'time':720}, combine='by_coords',parallel=True) |
...
print(ds) |
...
...
<xarray.Dataset> |
...
Dimensions: (time: 482120, bnds: 2, lat: 192, lon: 384) |
...
Coordinates: |
...
* time (time) datetime64[ns] 1850-01-01T01:30:00 ... 2014-12-31T22:30:00 |
...
* lat (lat) float64 -89.28 -88.36 -87.42 -86.49 ... 87.42 88.36 89.28 |
...
* lon (lon) float64 0.0 0.9375 1.875 2.812 ... 356.2 357.2 358.1 359.1 |
...
Dimensions without coordinates: bnds |
...
Data variables: |
...
time_bnds (time, bnds) datetime64[ns] dask.array<chunksize=(720, 2), meta=np.ndarray> |
...
lat_bnds (time, lat, bnds) float64 dask.array<chunksize=(2920, 192, 2), meta=np.ndarray> |
...
lon_bnds (time, lon, bnds) float64 dask.array<chunksize=(2920, 384, 2), meta=np.ndarray> |
...
pr (time, lat, lon) float32 dask.array<chunksize=(720, 192, 384), meta=np.ndarray> |
...
Attributes: (12/44) |
...
Conventions: CF-1.7 CMIP-6.2 |
...
activity_id: CMIP |
...
branch_method: standard |
...
branch_time_in_child: 0.0 |
...
branch_time_in_parent: 54421.0 |
...
creation_date: 2019-05-02T11:50:26Z |
...
... ... |
...
title: AWI-CM-1-1-MR output prepared for CMIP6 |
...
variable_id: pr |
...
variant_label: r1i1p1f1 |
...
license: CMIP6 model data produced by Alfred Wegener Insti... |
...
cmor_version: 3.4.0 |
...
tracking_id: hdl:21.14100/90ae6418-719b-4485-a424-a5682862cb2c |