Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
maxLevel4

File Format

In NCI Intake-spark scheme, the catalog file collects all attributes from a single variable, dimension and configurations of all dataset files. Thus the dataset indexes require supporting the heterogeneous schema among datasets, such as: variable number of columns; tree-structured metadata; and so on. To address this, we  use the Apache Parquet file format for the data source indexes. As a columnar data storage format, parquet provides many benefits including: improved storage efficiency; increased query performance; and reduced data loading times. Additionally, parquet files are often used in conjunction with analytical frameworks such as Apache Spark, making it easier to perform powerful "big data" analytics. 

Data layout

The Apache Spark DataFrame is used to organise the NCI indexing data layout. A Spark DataFrame is a distributed collection of (meta)data organized into named columns. It is built on top of the Apache Spark core and provides a flexible programming interface similar to SQL or data frames in R or Python, enables both high performance access and multi-node scalability to exploit both in-memory and disk-based data processing.

Spark DataFrame can be used smoothly with the intake framework. Intake makes it easy to create data catalogs from different data sources. Once the data source is defined in the catalog, it can be loaded into a Spark DataFrame using the intake API. The Spark DataFrame can then be used for data analysis and manipulation with the powerful Spark cluster. 

Tools

Two packages are widely used in the Intake-spark scheme, i.e. intake-spark and Spark SQL.

Intake-spark

Intake-spark is an Intake plugin that provides a unified interface for loading and accessing data in Apache Spark using the Intake data catalog system. Spark is a powerful distributed computing framework for processing large-scale data, but working with it can be challenging because it requires specific knowledge of Spark's API and data sources. Intake-spark simplifies this process by providing a consistent and intuitive interface for loading data into Spark DataFrame. Intake-spark supports several file formats, including Apache Parquet, Avro, CSV, and JSON, and can read data from various storage systems such as HDFS, Amazon S3, and Azure Blob Storage. Intake-spark also allows users to configure advanced settings such as partitioning and caching for improved performance.

Spark SQL 

Using SQL in accessing Apache Spark DataFrames provides a convenient and familiar way for users to query and manipulate data, particularly for those who are already familiar with SQL. The Spark SQL module provides an interface for executing SQL queries against Spark DataFrames, making it easier for users to work with structured data and perform complex data analysis.

To use SQL in accessing Spark DataFrames, you first need to create a SparkSession object, which is the entry point to using Spark functionality. Then, you can load your data into a DataFrame, register the DataFrame as a temporary table, and execute SQL queries on the DataFrame using the Spark SQL module.

Using SQL in accessing Spark DataFrames can provide several benefits, including the ability to leverage the powerful query optimization and distributed processing capabilities of Spark. By executing SQL queries on Spark DataFrames, users can take advantage of Spark's multi-node, distributed processing capabilities to quickly process large volumes of data and gain valuable insights from it.

Workflow

Here we introduce the workflow for using intake-spark and spark SQL. User can adopt other working tools and methods to access the NCI data indexes.

Intake-spark

To use intake-spark, you will need to follow these steps

Creating a SparkSession

SparkHolder is a class in intake-spark that is used to manage a connection to a running Spark session.

When you create an intake-spark data catalog entry for a Spark data source, the driver argument specifies that the data should be loaded using Spark. This means that a Spark session will be created to read the data. The SparkHolder class is responsible for managing this Spark session and ensuring that it is available when needed.

SparkHolder works by creating a singleton instance of a Spark session, which can be shared across multiple data sources. This means you an use the same Spark session to load data from multiple catalog data sources. This can help to reduce the overhead of creating and tearing down Spark sessions.

The SparkHolder class also provides some configuration options for the Spark session, such as the number of executor cores and memory allocation. These options can be customized by passing arguments to the args parameter when defining the data source in the Intake catalog entry.

Code Block
languagepy
from intake_spark.base import SparkHolder
h = SparkHolder(True, [('catalog', )], {})
h.setup()
session = h.session[0]
session.conf.set("spark.sql.caseSensitive", "true")

Loading a Catalog Data Source

You can use one of the two options to load catalog data into Spark DataFrame. 

Option 1: Loading catalog file.

Option 2: Loading data source files directly.

The catalog file for each dataset contains information about a set of data sources, such as their location, format, and metadata. You can use intake.open_catalog() method to open the catalog file and load the data source specified within the catalog file. It returns a Catalog object, which allows users to access and load the data sources described in the catalog. After that, you can convert the data source into Spark DataFrame via its to_spark() method. 

In this option you can use open_spark_dataframe() method to load a Spark DataFrame from data source files ( in parquet format ) directly. This method takes in a set of parameters that specify the data source and any necessary options, and returns a Spark DataFrame that can be used for data analysis and processing.

Example code: Loading cmip6-oi10 catalog data via catalog file

Code Block
languagepy
import intake
catalog = intake.open_catalog('/g/data/dk92/catalog/yml/cmip6-oi10.yml')
df = catalog.mydata.to_spark()


Example code: Loading cmip6-oi10 catalog data from data source file

Code Block
languagepy
import intake
cat_path="/g/data/dk92/catalog/v2/data/cmip6-oi10"
source = intake.open_spark_dataframe([
    ['read', ],
    ['option', ["mergeSchema", "true"]],
    ['parquet', [cat_path,]]
    ])
df = source.to_spark()


Processing Catalog Data

Get columns of a Spark DataFrame

The NCI data indexing DataFrame may contain several columns as below:

Code Block
languagepy
In[1]: print(df.columns) 
['attributes', 'dimensions', 'file_type', 'file_uri', 'format', 'variables']

Some of these columns are defined as Spark StructType as they are organised in the nested manner.

StructType is essentially a list of StructField objects, where each StructField object represents a single column in the schema. In a nested structure, a StructField can have a data type of another StructType object, which can represent a sub-structure or a nested structure.

You can print the Schema of the whole DataFrame as below. In the example outputs, those columns named "attributes", "dimensions" and "variables" are in the type of StructType which contains multiple layers of other StructType columns. On the other hand, those columns named "file_uri","file_type" and "format" are all single StructField column in "string" type.

Code Block
languagepy
In[2]: df.printSchema()

root
 |-- attributes: struct (nullable = true)
 |    |-- CCCma_model_hash: string (nullable = true)
 |    |-- CCCma_parent_runid: string (nullable = true)
 |    |-- CCCma_pycmor_hash: string (nullable = true)
 |    |-- CCCma_runid: string (nullable = true)
 |    |-- CDI: string (nullable = true)
 |    |-- CDI_grid_type: string (nullable = true)
...
 |-- file_type: string (nullable = true)
 |-- file_uri: string (nullable = true)
 |-- format: string (nullable = true)
 |-- variables: struct (nullable = true)
 |    |-- 3basin: struct (nullable = true)
 |    |    |-- attributes: struct (nullable = true)
 |    |    |    |-- long_name: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- units: string (nullable = true)
 |    |    |-- chunking: struct (nullable = true)
 |    |    |    |-- sizes: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- storage: string (nullable = true)
 |    |    |-- deflate: struct (nullable = true)
 |    |    |    |-- level: long (nullable = true)
 |    |    |    |-- shuffle: boolean (nullable = true)
 |    |    |    |-- state: boolean (nullable = true)
 |    |    |-- dimensions: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- endianness: string (nullable = true)
 |    |    |-- type: string (nullable = true)
...
 |-- dimensions: struct (nullable = true)
 |    |-- 3basin: struct (nullable = true)
 |    |    |-- length: long (nullable = true)
 |    |    |-- unlimited: boolean (nullable = true)
 |    |-- axis_nbounds: struct (nullable = true)
 |    |    |-- length: long (nullable = true)
 |    |    |-- unlimited: boolean (nullable = true) root
...


Printing out the whole schema of a DataFrame may produce an overwhelmingly long list. An alternative approach can be to select a specify column via the ".select" API and only print that schema.

For example, the following command prints out the schema of a variable named "GEOLAT". 

Code Block
languagepy
In[3]: df.select("variables.GEOLAT").printSchema()
root 
 |-- GEOLAT: struct (nullable = true)
 |    |-- attributes: struct (nullable = true)
 |    |    |-- _FillValue: double (nullable = true)
 |    |    |-- cell_methods: string (nullable = true)
 |    |    |-- interp_method: string (nullable = true)
 |    |    |-- long_name: string (nullable = true)
 |    |    |-- missing_value: double (nullable = true)
 |    |    |-- units: string (nullable = true)
 |    |-- chunking: struct (nullable = true)
 |    |    |-- sizes: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- storage: string (nullable = true)
 |    |-- deflate: struct (nullable = true)
 |    |    |-- level: long (nullable = true)
 |    |    |-- shuffle: boolean (nullable = true)
 |    |    |-- state: boolean (nullable = true)
 |    |-- dimensions: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endianness: string (nullable = true)
 |    |-- type: string (nullable = true)

You can use the above schema to find out the columns you need to search.

Get unique values of columns

Show unique values of a single column of "attributes.experiment_id" and "attributes.institution_id"

Show unique values of combined columns of "attributes.experiment_id" and "attributes.institution_id"


Code Block
languagepy
In[3]: df.select("attributes.experiment_id")\
.distinct().show() 
+--------------+
| experiment_id|
+--------------+
|   ssp534-over|
|piClim-histall|
| esm-piControl|
|        ssp585|
|     piControl|
|        ssp460|
|piClim-control|
|        ssp370|
|        ssp126|
|        ssp119|
|        ssp245|
|       1pctCO2|
|  abrupt-4xCO2|
|          amip|
|    historical|
|piClim-histghg|
|        ssp434|
|      esm-hist|
|piClim-histaer|
|      hist-aer|
+--------------+
only showing top 20 rows 
 
In[3]: df.select("attributes.institution_id")\
.distinct().show() 
+-----------------+
|institution_id   |
+-----------------+
|UA               |
|IPSL             |
|E3SM-Project     |
|CCCma            |
|CAS              |
|MIROC            |
|NASA-GISS        |
|MRI              |
|NCC              |
|HAMMOZ-Consortium|
|NCAR             |
|NUIST            |
|NOAA-GFDL        |
|NIMS-KMA         |
|BCC              |
|CNRM-CERFACS     |
|AWI              |
|MPI-M            |
|KIOST            |
|MOHC             |
+-----------------+
only showing top 20 rows 



Code Block
languagepy
In[3]: df.select("attributes.institution_id",\
          "attributes.experiment_id")\ .distinct().show(40,truncate=False)

+--------------+--------------+
|institution_id|experiment_id |
+--------------+--------------+
|NCC           |ssp585        |
|NCAR          |esm-piControl |
|MRI           |esm-hist      |
|NOAA-GFDL     |ssp370        |
|E3SM-Project  |ssp585        |
|UA            |ssp245        |
|NUIST         |ssp245        |
|UA            |ssp370        |
|CNRM-CERFACS  |ssp245        |
|NOAA-GFDL     |ssp585        |
|CMCC          |abrupt-4xCO2  |
|MIROC         |ssp534-over   |
|CAMS          |1pctCO2       |
|CNRM-CERFACS  |ssp126        |
|NASA-GISS     |piClim-histall|
|CMCC          |piControl     |
|MRI           |historical    |
|BCC           |piControl     |
|MPI-M         |esm-hist      |
|NCAR          |historical    |
|MIROC         |ssp126        |
|MOHC          |piClim-histall|
|CAMS          |piControl     |
|MIROC         |ssp370        |
|MOHC          |ssp534-over   |
|MPI-M         |amip          |
|NOAA-GFDL     |ssp126        |
|CCCma         |piClim-histall|
|CAMS          |abrupt-4xCO2  |
|CAMS          |amip          |
|CNRM-CERFACS  |ssp370        |
|KIOST         |ssp126        |
|CNRM-CERFACS  |ssp585        |
|NCAR          |piClim-control|
|UA            |ssp126        |
|MOHC          |ssp119        |
|CMCC          |amip          |
|MRI           |abrupt-4xCO2  |
|NUIST         |ssp126        |
|NASA-GISS     |piClim-control|
+--------------+--------------+
only showing top 40 rows


You can specify a larger number in show() function and set "truncate=False" to display more rows in full lengths.

Searching within a Spark DataFrame 

Searching within a Spark DataFrame typically involves filtering the DataFrame based on a particular condition or a set of conditions. You can define a condition using Spark's column operations. You can use any of the available functions in Spark's column API to create the condition and combine them using logical operators like 'and', 'or' and 'not'.

For example, you can define multiple conditions as below

Code Block
languagepy
In[3]: condition=(df.attributes.institution_id == 'AWI') \
	   &(df.attributes.source_id == 'AWI-CM-1-1-MR')  \ 
       &(df.attributes.experiment_id == 'historical')  \
       &(df.attributes.variant_label=='r1i1p1f1') \
       &(df.attributes.table_id == '3hr') \
       &(df.attributes.variable_id == 'pr') \
       &(df.attributes.grid_label == "gn") 

Then you can apply 'filter()' method of the DataFrame to apply the condition and get the filtered DataFrame. The total number of rows in the filtered DataFrame could be obtained via "count()" method as below:

Code Block
languagepy
In[3]: filter_df=df.filter(condition)
       print(filter_df.count())

165

You can also use the 'where()' method to achieve the same result as below:

Code Block
languagepy
In[3]: filter_df=df.where(condition)
       print(filter_df.count())

165

Finally, you can use the show() method to display the filtered DataFrame

Code Block
languagepy
In[3]: filter_df.show()

+--------------------+--------------------+---------+--------------------+--------------------+--------------------+
|          attributes|          dimensions|file_type|            file_uri|              format|           variables|
+--------------------+--------------------+---------+--------------------+--------------------+--------------------+
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
|{null, null, null...|{null, null, null...|        f|/g/data/oi10/repl...|NC_FORMAT_NETCDF4...|{null, null, null...|
+--------------------+--------------------+---------+--------------------+--------------------+--------------------+
only showing top 20 rows

Load the Dataset

From the filtered Spark DataFrame, you can extract a list of file paths from its 'file_uri' column as below:

Code Block
languagepy
In[3]: # import necessary libraries
	  from pyspark.sql.functions import col
	  flist=[row['file_uri'] for row in filter_df.select(col('file_uri')).collect()]

  It is recommended to set up a Dask cluster firstly to accelerate the data loading process.

Code Block
languagepy
titleStart Dask cluster
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

Now you can load the data from a list of files via 'xarray' package as below

Code Block
languagepy
titleLoading dataset
In[3]: ds = xr.open_mfdataset(flist, chunks={'time':720}, combine='by_coords',parallel=True)
       print(ds)

<xarray.Dataset>
Dimensions:    (time: 482120, bnds: 2, lat: 192, lon: 384)
Coordinates:
  * time       (time) datetime64[ns] 1850-01-01T01:30:00 ... 2014-12-31T22:30:00
  * lat        (lat) float64 -89.28 -88.36 -87.42 -86.49 ... 87.42 88.36 89.28
  * lon        (lon) float64 0.0 0.9375 1.875 2.812 ... 356.2 357.2 358.1 359.1
Dimensions without coordinates: bnds
Data variables:
    time_bnds  (time, bnds) datetime64[ns] dask.array<chunksize=(720, 2), meta=np.ndarray>
    lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(2920, 192, 2), meta=np.ndarray>
    lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(2920, 384, 2), meta=np.ndarray>
    pr         (time, lat, lon) float32 dask.array<chunksize=(720, 192, 384), meta=np.ndarray>
Attributes: (12/44)
    Conventions:            CF-1.7 CMIP-6.2
    activity_id:            CMIP
    branch_method:          standard
    branch_time_in_child:   0.0
    branch_time_in_parent:  54421.0
    creation_date:          2019-05-02T11:50:26Z
    ...                     ...
    title:                  AWI-CM-1-1-MR output prepared for CMIP6
    variable_id:            pr
    variant_label:          r1i1p1f1
    license:                CMIP6 model data produced by Alfred Wegener Insti...
    cmor_version:           3.4.0
    tracking_id:            hdl:21.14100/90ae6418-719b-4485-a424-a5682862cb2c

SQL 

To use SQL queries with Apache Spark DataFrames, you can leverage the Spark SQL module, which provides a SQL-like interface for working with structured data. Here are the steps to use SQL queries with a Spark DataFrame.

Creating a SparkSession

First of all, you need to create a SparkSession object, which is the entry point to using Spark functionality. You can simply create a SparkSession with the default config as below

Code Block
languagepy
titleStart a SparkSession
In[3]: from pyspark.sql import SparkSession
       spark = SparkSession.builder.appName("intake-sql").getOrCreate()

Or you can set up a SparkSession with fine-tuning configures as below

Code Block
languagepy
titleStart Spark cluster
import sys
import os
from pyspark.sql import SparkSession
def get_spark_session():
  if '_spark_instance' not in sys.modules:
    def get_env(var_name, default_val):
      return os.environ[var_name] if var_name in os.environ else default_val
    spark_master = get_env('spark_master', 'local[8]')
    executor_memory = get_env('spark_executor_memory', '4g')
    driver_memory = get_env('spark_driver_memory', '6g')
    local_tmp_dir = get_env('spark_local_dir', os.path.abspath(os.path.expanduser('~/tmp/spark-tmp')))
    log_level = get_env('spark_log_level', 'WARN')
    spark = SparkSession.builder.master(spark_master).appName('intake-sql').\
      config('spark.executor.memory', executor_memory).\
      config('spark.driver.memory', driver_memory).\
      config('spark.local.dir', local_tmp_dir).\
      config('spark.sql.caseSensitive', 'true').\
      getOrCreate()
    spark.sparkContext.setLogLevel(log_level)
    sys.modules['_spark_instance'] = spark
  return sys.modules['_spark_instance']
spark = get_spark_session()

Loading Catalog Data

You can load a catalog data source via "read" method of the SparkSession object, which supports various file formats such as CSV, JSON, and Parquet.

For example, you can load the catalog data source from Parquet file via "read.parquet" method as below

Code Block
languagepy
titleRead DataSource file
In[3]: ds_file = spark.read.parquet('/g/data/dk92/catalog/v2/data/cmip6-oi10')

Once you have loaded your data into a Spark DataFrame, you can register it as a temporary table using the createOrReplaceTempView method. This method creates a table name that you can use in your SQL queries to refer to the DataFrame. For example, you can use the following code to create a temporary table

Code Block
languagepy
titleCreate a temporary table
In[3]: ds_file.createOrReplaceTempView('tbl')

Processing Catalog Data

Conduct SQL queries

Now you can execute SQL queries on your DataFrame using the spark.sql method. This method takes a SQL query string and returns a DataFrame with the results. The Spark SQL module supports many SQL features and functions. Keep in mind that SQL queries executed with Spark SQL are transformed into a Spark execution plan, which is then optimised and executed on a distributed cluster, making it a powerful way to work with large-scale data.

You can easily convert the SQL queries results to Pandas table and extract the file path list with Pandas to_list() method.

Code Block
languagepy
titleQuery the SparkDataset
In[3]: df =spark.sql('''
       select file_uri 
       from tbl 
       where attributes.institution_id = 'AWI' 
       and attributes.source_id = 'AWI-CM-1-1-MR'
       and attributes.experiment_id = 'historical'
       and attributes.variant_label='r1i1p1f1'
       and attributes.table_id = '3hr'
       and attributes.variable_id = 'pr'
       and attributes.grid_label = "gn"
       ''').toPandas()
       flist=df.file_uri.to_list()

Load the Dataset

To accelerate the data loading process, you could set up a Dask cluster prior to the loading operations via Xarray. 

Code Block
languagepy
titleStart Dask cluster
In[3]: import xarray as XR
       import dask.array as da
       from dask.distributed import Client, LocalCluster
       cluster = LocalCluster()
       client = Client(cluster)

Now you can load the data from a list of files as below

Code Block
languagepy
titleLoading the dataset
In[3]: ds = xr.open_mfdataset(flist, chunks={'time':720}, combine='by_coords',parallel=True)
       print(ds)

<xarray.Dataset>
Dimensions:    (time: 482120, bnds: 2, lat: 192, lon: 384)
Coordinates:
  * time       (time) datetime64[ns] 1850-01-01T01:30:00 ... 2014-12-31T22:30:00
  * lat        (lat) float64 -89.28 -88.36 -87.42 -86.49 ... 87.42 88.36 89.28
  * lon        (lon) float64 0.0 0.9375 1.875 2.812 ... 356.2 357.2 358.1 359.1
Dimensions without coordinates: bnds
Data variables:
    time_bnds  (time, bnds) datetime64[ns] dask.array<chunksize=(720, 2), meta=np.ndarray>
    lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(2920, 192, 2), meta=np.ndarray>
    lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(2920, 384, 2), meta=np.ndarray>
    pr         (time, lat, lon) float32 dask.array<chunksize=(720, 192, 384), meta=np.ndarray>
Attributes: (12/44)
    Conventions:            CF-1.7 CMIP-6.2
    activity_id:            CMIP
    branch_method:          standard
    branch_time_in_child:   0.0
    branch_time_in_parent:  54421.0
    creation_date:          2019-05-02T11:50:26Z
    ...                     ...
    title:                  AWI-CM-1-1-MR output prepared for CMIP6
    variable_id:            pr
    variant_label:          r1i1p1f1
    license:                CMIP6 model data produced by Alfred Wegener Insti...
    cmor_version:           3.4.0
    tracking_id:            hdl:21.14100/90ae6418-719b-4485-a424-a5682862cb2c