The following example shows how to process magnetotelluric (MT) time series data stored on the National Computational Infrastructure (NCI). As such it makes certain assumptions about the storage structure, mainly that the time series data are stored in separate folders for each site and separate folders for each day. The tutorial uses the Bounded Influence, Remote Reference Processing (BIRRP) code.
For this example we will be processing the Renmark 2009 time series data located on NCI’s /g/data file system on Gadi. The Jupyter notebook example can be downloaded here, and would be expected to be run under Gadi's batch queuing system. We describe this in more detail later in this document.
The first step in processing is to work out the starting and finishing dates for each site in the survey, in order to find the sites with overlapping time-series which can be used for remote-reference.
import pickle import os import datetime from glob import glob def _linecount(filename): """ Return number of lines in a file """ num_lines = sum(1 for line in open(filename)) return num_lines def get_metadata(base_dir, frequency, tmp_dir): """ Return information about each site """ fn = os.path.join(tmp_dir, 'metadata.json') if os.path.exists(fn): with open(fn) as f: sites = pickle.load(f) return sites sites = [os.path.basename(i) for i in glob(os.path.join(base_dir, '*'))] sites = dict([(i, {}) for i in sites]) for site in sites.keys(): sites[site]['name'] = site sites[site]['files'] = [] days = sorted([os.path.basename(i) for i in glob(os.path.join(base_dir, site, '*'))]) for idx, day in enumerate(days): files = glob(os.path.join(base_dir, site, day, '*')) if not files: continue last_files = files sites[site]['files'].append(files) if idx == 0: start_time = os.path.basename(files[0]).split('_')[1].split('.')[0] start_time = datetime.datetime.strptime(start_time, '%y%m%d%H%M%S') sites[site]['start_time'] = start_time if not [j for k in sites[site]['files'] for j in k]: del sites[site] continue sites[site]['files'] = [j for k in sites[site]['files'] for j in k] print(site, sites[site]['files']) #if not files: # continue files = last_files end_date = os.path.basename(files[0]).split('_')[1].split('.')[0] end_date = datetime.datetime.strptime(end_date, '%y%m%d%H%M%S') length = _linecount(files[0]) end_time = end_date + datetime.timedelta(seconds=length/frequency) sites[site]['end_time'] = end_time sites[site]['samples'] = (sites[site]['end_time'] - sites[site]['start_time']).seconds * frequency with open(fn, 'w') as f: pickle.dump(sites, f) return sites
This function will read in the metadata required for processing a survey. After this processing it will write out this metadata into a file, which will be read back in the next time that the metadata is required.
We will also make a function to calculate the overlap between two sites:
def calc_intersection(local_site, remote_site): """ Calculate time overlap of a local site and remote site """ int_start = max(local_site['start_time'], remote_site['start_time']) int_end = min(local_site['end_time'], remote_site['end_time']) if (int_end - int_start).total_seconds()/60/60 < 5: return else: return int_start, int_end
If the overlap is less than 5 hours then the function returns nothing, indicating no substantial overlap, otherwise it will return the start and end times of the intersection.
The next stage is to make functions for writing out the overlap between the time series from two different sites as ASCII files for BIRRP use as inputs. Here one of the sites is designated as the remote reference.
import numpy as np import scipy.signal def decimate_file(fn, decimation, new_fn): """ Read in a time series, apply a decimation and write out """ print(fn, decimation, new_fn) print(os.path.exists(new_fn)) if os.path.exists(new_fn): return print('decimating {}'.format(fn)) f = np.genfromtxt(fn) f = scipy.signal.decimate(f, decimation, n=8) np.savetxt(new_fn, f) def write_files(files, num_skip, num_samples, channel, out_dir, tstart, remote=False): """ Write files """ fn = 'local.' + channel if not remote else 'remote.' + channel fn = tstart.strftime('%y%m%d%H%M%S') + '_' + fn if os.path.exists(os.path.join(out_dir, fn)): return print('writing to {}'.format(os.path.join(out_dir, fn))) ofile = open(os.path.join(out_dir, fn), 'w') ifile = open(files.pop(0)) print(files, num_skip, num_samples) for _ in range(num_skip): try: next(ifile) except StopIteration: ifile = open(files.pop(0)) for _ in range(num_samples): try: line = next(ifile) except StopIteration: try: ifile = open(files.pop(0)) line = next(ifile) except IndexError: pass # print('did not extract from {}'.format(files)) ofile.write(line) def write_birrp_inputs(local_site, remote_site, out_dir): """ Write out the intersection of two files """ if calc_intersection(local_site, remote_site): int_start, int_end = calc_intersection(local_site, remote_site) local_skip = (int_start - local_site['start_time']).total_seconds()*500 remote_skip = (int_start - remote_site['start_time']).total_seconds()*500 local_skip = int(local_skip) remote_skip = int(remote_skip) num_samples = int((int_end - int_start).total_seconds()*500) if local_site['name'] == remote_site['name']: remote_skip += 1 num_samples -= 1 dec_dir = os.path.join(out_dir, 'undecimated') dec10_dir = os.path.join(out_dir, 'decimated10') dec100_dir = os.path.join(out_dir, 'decimated100') try: os.makedirs(dec_dir) except OSError: pass try: os.makedirs(dec10_dir) except OSError: pass try: os.makedirs(dec100_dir) except OSError: pass channels = ['BX', 'BY', 'EX', 'EY'] for channel in channels: files = sorted([i for i in local_site['files'] if channel in i]) write_files(files, local_skip, num_samples, channel, dec_dir, int_start) for channel in [i for i in channels if 'B' in i]: files = sorted([i for i in remote_site['files'] if channel in i]) write_files(files, remote_skip, num_samples, channel, dec_dir, int_start, remote=True) for fn in glob(os.path.join(dec_dir, '*')): if fn.endswith('X') or fn.endswith('Y'): bn = os.path.basename(fn) new_fn = os.path.join(dec10_dir, bn) decimate_file(fn, 10, new_fn) fn = new_fn new_fn = os.path.join(dec100_dir, bn) decimate_file(fn, 10, new_fn) return num_samples, int_start
Note that decimations are also performed, with the decimated time series placed into new folders. By decimating the data we can achieve responses for longer periods than would otherwise be possible. Decimation is made using scipy.signal.decimate, which applies an anti-aliasing filter before decimating.
For each three of these folders we generate a BIRRP script:
def gen_birrp_script(out_dir, samples, sample_rate, site_name, tstart): birrp_string = '\n'.join(['1', '2', '2', '2', '1', '3', '{2}', '65536,2,13', '3,1,3', 'y', '2', '0,0.0003,0.9999', '0.7', '0.0', '0.003,10000', '{3}', '0', '0', '1', '10', '0', '0', '{1}', '0', '{4}_local.EY', '0', '0', '{4}_local.EX', '0', '0', '{4}_local.BY', '0', '0', '{4}_local.BX', '0', '0', '{4}_remote.BY', '0', '0', '{4}_remote.BX', '0', '0,90,0', '0,90,0', '0,90,0']) birrp_string = birrp_string.format(os.path.join(out_dir, ''), samples, sample_rate, site_name, tstart.strftime('%y%m%d%H%M%S')) return birrp_string
The BIRRP script has many different parameters and the best parameters will depend on the dataset. For more information see the BIRRP manual.
We also need a script to run BIRRP and to clean up the output files:
def run_birrp(out_dir, birrp_script, birrp_location): script_fn = os.path.join(out_dir, 'script.txt') with open(script_fn, 'w') as f: f.write(birrp_script) if any(fn.endswith('.j') for fn in os.listdir(out_dir)): return #os.system('mpirun -np 32 {} < {}'.format(birrp_location, script_fn)) ### Use if using the mpi modified version of BIRRP os.system('{} < {}'.format(birrp_location, script_fn)) for f in glob('fft.*'): os.remove(f) for f in glob('remote.*') + glob('local.*'): ### comment this for loop if you want to keep the intermediate files os.remove(f)
Finally, we would like a script to convert the outputs from our three frequency ranges into edi-files and to merge them into a single file. The merging should take place at the longest overlapping period, as this introduces the minimum amount of error resulting from aliasing during the decimation.
import mtpy.core.edi from mtpy.uofa.qel_monitoring_j2edi import convert2edi from mtpy.uofa.simpleplotEDI import plotedi def process_birrp_results(out_dir, survey_cfg_fn, site_name, instr_resp_fn): edi, coh = convert2edi(site_name, out_dir, survey_cfg_fn, instr_resp_fn, None) metadata = mtpy.utils.configfile.read_survey_configfile(survey_cfg_fn)[site_name.upper()] e = mtpy.core.edi.Edi(edi) if 'loc' in metadata: e.head['loc'] = metadata['loc'] else: e.head['loc'] = None if 'acqdate' in metadata: e.head['acqdate'] = metadata['acqdate'] else: e.head['acqdate'] = None if 'acquired_by' in metadata: e.head['acqby'] = metadata['acquired_by'] else: e.head['acqby'] = '' edi = e.writefile(edi, allow_overwrite=True) path = edi.split("qel")[0] edi_file = os.path.basename(edi) new_name = edi_file.split("_")[1] new_edi_name = new_name + '.edi' new_coh_name = new_name + '.coh' new_edi = path + new_edi_name coh_orig = path + coh new_coh = path + new_coh_name shutil.move(edi, new_edi) shutil.move(coh_orig, new_coh) # edi = glob(os.path.join(out_dir, '*.edi'))[0] plotedi(new_edi, saveplot=True) return def merge_birrp_results(out_dir, site_name): edi_fns = (glob(os.path.join(out_dir, 'undecimated', '*edi')) + glob(os.path.join(out_dir, 'decimated10', '*edi')) + glob(os.path.join(out_dir, 'decimated100', '*edi'))) e1 = mtpy.core.edi.Edi(edi_fns[0]) e2 = mtpy.core.edi.Edi(edi_fns[1]) out_small = os.path.join(out_dir, 'small') mtpy.core.edi.combine_edifiles(edi_fns[0], edi_fns[1], out_fn=out_small, merge_freq=e1.freq[-2]) out_large = os.path.join(out_dir, site_name) in_small = out_small + '_merged.edi' mtpy.core.edi.combine_edifiles(in_small, edi_fns[2], out_fn=out_large, merge_freq=e2.freq[-2]) os.remove(in_small) final_fn = out_large + '_merged.edi' plotedi(final_fn, saveplot=True)
Each EDI generated is also plotted. This code makes use of the mtpy
(Krieger and Peacock, 2014) library.
Note that this script requires both a instrument response function file, and a survey configuration filename. Details on the survey configuration file can be found in the mtpy
package, however it will looks something like this:
[RM0105] station_type = mt lat = -34.05543333333333 lon = 140.62353333333334 elev = 0 sampling = 0.002 loc = Renmark acqdate = 2009 acquired_by = The_University_of_Adelaide [RM0106] station_type = mt lat = -34.041183333333336 lon = 140.60265 elev = 0 sampling = 0.002 loc = Renmark acqdate = 2009 acquired_by = The_University_of_Adelaide #[RM0107] #station_type = mt #lat = -34.004266666666666 #lon = 140.5869 #elev = 0 #sampling = 0.002 #loc = Renmark #acqdate = 2009 #acquired_by = The_University_of_Adelaide
And so on, for each site.
Putting it together
These functions can be wrapped into one function to process a single site with a single remote site:
def run_one_site(local_name, remote_name, tmp_dir, **kwargs): dirs = {'undecimated': 0.002, 'decimated10': 0.02, 'decimated100': 0.2} sites = get_metadata(kwargs['base_dir'], kwargs['frequency'], tmp_dir) print('got metadata') local_site = sites[local_name] remote_site = sites[remote_name] dir_name = local_site['name'] + remote_site['name'] out_dir = os.path.join(tmp_dir, dir_name) try: os.makedirs(out_dir) except OSError: pass cwd = os.getcwd() samples, tstart = write_birrp_inputs(local_site, remote_site, out_dir) print('written inputs') for dir_name in dirs.keys(): sample_rate = dirs[dir_name] dir_name = os.path.join(out_dir, dir_name) os.chdir(dir_name) birrp_script = gen_birrp_script(dir_name, samples, sample_rate, local_name, tstart) run_birrp(dir_name, birrp_script, birrp_location) print('ran birrp') process_birrp_results(dir_name, kwargs['survey_cfg_fn'], local_site['name'], kwargs['instr_resp_fn']) print('processed results') os.chdir(cwd) merge_birrp_results(out_dir, local_name)
If we save all of this to a file as nci_birrp.py
we can create a separate file in the same folder which calls it and runs one site:
#!/usr/bin/env python2.7 import sys from nci_birrp import * if __name__ == '__main__': kwargs = {} base_dir = '/g/data/my80/States_and_Territories/SA/Broadband/Renmark_2009/TS' birrp_location = '/g/data/up99/sandbox/bulk_processing_test/birrp/birrp' ### This version of BIRRP was compiled specifically for the Renmark 2009 use case survey_cfg_fn = '/g/data/up99/sandbox/bulk_birrp_processing_test/survey_config/renmark.cfg' instr_resp_fn = '/g/data/up99/sandbox/bulk_birrp_processing_test/instrument_response_function/lemi_coils_instrument_response_freq_real_imag.txt' kwargs['instr_resp_fn'] = instr_resp_fn kwargs['survey_cfg_fn'] = survey_cfg_fn kwargs['birrp_location'] = birrp_location kwargs['base_dir'] = base_dir kwargs['frequency'] = 500 run_one_site(sys.argv[1], sys.argv[2], sys.argv[3], **kwargs)
We can save this as proc_site.py
.
Here we enter the parameters specific to our example. For our example we use the Renmark 2009 MT dataset from the NCI, which is recorded at 500 Hz and has a configuration file located at /g/data/up99/sandbox/bulk_birrp_processing_test/survey_config/renmark.cfg
.
The system arguments are site specific, with the local site name, remote site name, and temporary working directory.
Running on the NCI
We are now almost ready to process the entire survey on the NCI. First we will need a script to submit proc_site.py
to process the data from one site with a single remote site.
import os def add_shell_run(local_site, remote_site, base_dir, tmp_dir): python_call = proc_site.format(local_site, remote_site, tmp_dir) shell_script = '\n'.join(['#!/bin/bash', '', '#PBS -P up99', '#PBS -q normal', '#PBS -l walltime=3:00:00,mem=64GB,ncpus=24,jobfs=30GB','#PBS -l storage=gdata/my80+gdata/up99', '#PBS -l wd', '', 'module load python2/2.7.17', 'module load intel-compiler/2020.2.254', 'module use /g/data/up99/modulefiles/', 'module load mtpy_nci_processing/2013_0.0.1','module load openmpi/4.0.2', '', python_call]) shell_fn = os.path.join(tmp_dir, local_site + '_' + remote_site + '.sh') with open(shell_fn, 'w') as f: f.write(shell_script) os.system('qsub {}'.format(shell_fn))
This function will submit the job to the NCI. Take care to choose optimal values for the walltime and memory consumption, as these will change depending on the recording length. Our example PBS script is using project up99 (change this to a compute project code that you are a member of), the normal queue, a walltime of 3 hours, 24 CPUs and 64 GB of memory. Please see PBS directives explained for more information on the PBS flags.
We can now loop over each combination of local and remote sites, check to see if there is sufficient overlap, and if so, submit a job to the NCI to process.
import mtpy.utils.configfile def loop_sites(base_dir, tmp_dir, survey_cfg_fn=None): sites = get_metadata(base_dir, 500, tmp_dir) if survey_cfg_fn: survey_cfg = mtpy.utils.configfile.read_survey_configfile(survey_cfg_fn) site_names = sites.keys() for site in site_names: if site not in survey_cfg: sites.pop(site, None) print(sites) for local_site in sites: for remote_site in sites: if calc_intersection(sites[local_site], sites[remote_site]): print('add {} and {}'.format(local_site, remote_site)) add_shell_run(local_site, remote_site, base_dir, tmp_dir) #sys.exit() ####uncomment this line if you are troubleshooting
An option is also included to pass a survey configuration file (as detailed above), where you can comment out any sites which you do not want to process.
Let’s write our nci_birrp.py
and proc_site.py
files to our local working directory on Gadi:
%%writefile nci_birrp.py import pickle import os import datetime from glob import glob import numpy as np import scipy.signal import mtpy.core.edi from mtpy.uofa.qel_monitoring_j2edi import convert2edi from mtpy.uofa.simpleplotEDI import plotedi import mtpy.utils.configfile import shutil import sys ####################################################################### # # # Note that you will have to change the following variables to match # # the time series data location, birrp executable location, survey # # config file, instrument response function, temporary directory, # # recorded frequency and and proc_site.py file you are using # # # ####################################################################### base_dir = '/g/data/my80/States_and_Territories/SA/Broadband/Renmark_2009/TS' birrp_location = '/g/data/up99/sandbox/bulk_birrp_processing_test/birrp/birrp' ### BIRRP compiled for this Renmark 2009 use case survey_cfg_fn = '/g/data/up99/sandbox/bulk_birrp_processing_test/survey_config/renmark.cfg' instr_resp_fn = '/g/data/up99/sandbox/bulk_birrp_processing_test/instrument_response_function/lemi_coils_instrument_response_freq_real_imag.txt' tmp_dir = '/g/data/up99/sandbox/bulk_birrp_processing_test/processing_directory/' frequency = 500 proc_site = '/g/data/up99/sandbox/bulk_birrp_processing_test/processing_directory/proc_site.py {} {} {}' def _linecount(filename): """ Return number of lines in a file """ num_lines = sum(1 for line in open(filename)) return num_lines def get_metadata(base_dir, frequency, tmp_dir): """ Return information about each site """ fn = os.path.join(tmp_dir, 'metadata.json') if os.path.exists(fn): with open(fn) as f: sites = pickle.load(f) return sites sites = [os.path.basename(i) for i in glob(os.path.join(base_dir, '*'))] sites = dict([(i, {}) for i in sites]) for site in sites.keys(): sites[site]['name'] = site sites[site]['files'] = [] days = sorted([os.path.basename(i) for i in glob(os.path.join(base_dir, site, '*'))]) for idx, day in enumerate(days): files = glob(os.path.join(base_dir, site, day, '*')) if not files: continue last_files = files sites[site]['files'].append(files) if idx == 0: start_time = os.path.basename(files[0]).split('_')[1].split('.')[0] start_time = datetime.datetime.strptime(start_time, '%y%m%d%H%M%S') sites[site]['start_time'] = start_time if not [j for k in sites[site]['files'] for j in k]: del sites[site] continue sites[site]['files'] = [j for k in sites[site]['files'] for j in k] print(site, sites[site]['files']) #if not files: # continue files = last_files end_date = os.path.basename(files[0]).split('_')[1].split('.')[0] end_date = datetime.datetime.strptime(end_date, '%y%m%d%H%M%S') length = _linecount(files[0]) end_time = end_date + datetime.timedelta(seconds=length/frequency) sites[site]['end_time'] = end_time sites[site]['samples'] = (sites[site]['end_time'] - sites[site]['start_time']).seconds * frequency with open(fn, 'w') as f: pickle.dump(sites, f) return sites def calc_intersection(local_site, remote_site): """ Calculate time overlap of a local site and remote site """ int_start = max(local_site['start_time'], remote_site['start_time']) int_end = min(local_site['end_time'], remote_site['end_time']) if (int_end - int_start).total_seconds()/60/60 < 5: return else: return int_start, int_end def decimate_file(fn, decimation, new_fn): """ Read in a time series, apply a decimation and write out """ print(fn, decimation, new_fn) print(os.path.exists(new_fn)) if os.path.exists(new_fn): return print('decimating {}'.format(fn)) f = np.genfromtxt(fn) f = scipy.signal.decimate(f, decimation, n=8) np.savetxt(new_fn, f) def write_files(files, num_skip, num_samples, channel, out_dir, tstart, remote=False): """ Write files """ fn = 'local.' + channel if not remote else 'remote.' + channel fn = tstart.strftime('%y%m%d%H%M%S') + '_' + fn if os.path.exists(os.path.join(out_dir, fn)): return print('writing to {}'.format(os.path.join(out_dir, fn))) ofile = open(os.path.join(out_dir, fn), 'w') ifile = open(files.pop(0)) print(files, num_skip, num_samples) for _ in range(num_skip): try: next(ifile) except StopIteration: ifile = open(files.pop(0)) for _ in range(num_samples): try: line = next(ifile) except StopIteration: try: ifile = open(files.pop(0)) line = next(ifile) except IndexError: pass # print('did not extract from {}'.format(files)) ofile.write(line) def write_birrp_inputs(local_site, remote_site, out_dir): """ Write out the intersection of two files """ if calc_intersection(local_site, remote_site): int_start, int_end = calc_intersection(local_site, remote_site) local_skip = (int_start - local_site['start_time']).total_seconds()*500 remote_skip = (int_start - remote_site['start_time']).total_seconds()*500 local_skip = int(local_skip) remote_skip = int(remote_skip) num_samples = int((int_end - int_start).total_seconds()*500) if local_site['name'] == remote_site['name']: remote_skip += 1 num_samples -= 1 dec_dir = os.path.join(out_dir, 'undecimated') dec10_dir = os.path.join(out_dir, 'decimated10') dec100_dir = os.path.join(out_dir, 'decimated100') try: os.makedirs(dec_dir) except OSError: pass try: os.makedirs(dec10_dir) except OSError: pass try: os.makedirs(dec100_dir) except OSError: pass channels = ['BX', 'BY', 'EX', 'EY'] for channel in channels: files = sorted([i for i in local_site['files'] if channel in i]) write_files(files, local_skip, num_samples, channel, dec_dir, int_start) for channel in [i for i in channels if 'B' in i]: files = sorted([i for i in remote_site['files'] if channel in i]) write_files(files, remote_skip, num_samples, channel, dec_dir, int_start, remote=True) for fn in glob(os.path.join(dec_dir, '*')): if fn.endswith('X') or fn.endswith('Y'): bn = os.path.basename(fn) new_fn = os.path.join(dec10_dir, bn) decimate_file(fn, 10, new_fn) fn = new_fn new_fn = os.path.join(dec100_dir, bn) decimate_file(fn, 10, new_fn) return num_samples, int_start def gen_birrp_script(out_dir, samples, sample_rate, site_name, tstart): birrp_string = '\n'.join(['1', '2', '2', '2', '1', '3', '{2}', '65536,2,13', '3,1,3', 'y', '2', '0,0.0003,0.9999', '0.7', '0.0', '0.003,10000', '{3}', '0', '0', '1', '10', '0', '0', '{1}', '0', '{4}_local.EY', '0', '0', '{4}_local.EX', '0', '0', '{4}_local.BY', '0', '0', '{4}_local.BX', '0', '0', '{4}_remote.BY', '0', '0', '{4}_remote.BX', '0', '0,90,0', '0,90,0', '0,90,0']) birrp_string = birrp_string.format(os.path.join(out_dir, ''), samples, sample_rate, site_name, tstart.strftime('%y%m%d%H%M%S')) return birrp_string def run_birrp(out_dir, birrp_script, birrp_location): script_fn = os.path.join(out_dir, 'script.txt') with open(script_fn, 'w') as f: f.write(birrp_script) if any(fn.endswith('.j') for fn in os.listdir(out_dir)): return #os.system('mpirun -np 32 {} < {}'.format(birrp_location, script_fn)) ### Uncomment if using the mpi modified version of BIRRP os.system('{} < {}'.format(birrp_location, script_fn)) for f in glob('fft.*'): os.remove(f) #for f in glob('remote.*') + glob('local.*'): ###uncomment this for loop if you want to remove the intermediate files # #os.remove(f) def process_birrp_results(out_dir, survey_cfg_fn, site_name, instr_resp_fn): edi, coh = convert2edi(site_name, out_dir, survey_cfg_fn, instr_resp_fn, None) metadata = mtpy.utils.configfile.read_survey_configfile(survey_cfg_fn)[site_name.upper()] e = mtpy.core.edi.Edi(edi) if 'loc' in metadata: e.head['loc'] = metadata['loc'] else: e.head['loc'] = None if 'acqdate' in metadata: e.head['acqdate'] = metadata['acqdate'] else: e.head['acqdate'] = None if 'acquired_by' in metadata: e.head['acqby'] = metadata['acquired_by'] else: e.head['acqby'] = '' edi = e.writefile(edi, allow_overwrite=True) path = edi.split("qel")[0] edi_file = os.path.basename(edi) new_name = edi_file.split("_")[1] new_edi_name = new_name + '.edi' new_coh_name = new_name + '.coh' new_edi = path + new_edi_name coh_orig = path + coh new_coh = path + new_coh_name shutil.move(edi, new_edi) shutil.move(coh_orig, new_coh) # edi = glob(os.path.join(out_dir, '*.edi'))[0] plotedi(new_edi, saveplot=True) return def merge_birrp_results(out_dir, site_name): edi_fns = (glob(os.path.join(out_dir, 'undecimated', '*edi')) + glob(os.path.join(out_dir, 'decimated10', '*edi')) + glob(os.path.join(out_dir, 'decimated100', '*edi'))) e1 = mtpy.core.edi.Edi(edi_fns[0]) e2 = mtpy.core.edi.Edi(edi_fns[1]) out_small = os.path.join(out_dir, 'small') mtpy.core.edi.combine_edifiles(edi_fns[0], edi_fns[1], out_fn=out_small, merge_freq=e1.freq[-2]) out_large = os.path.join(out_dir, site_name) in_small = out_small + '_merged.edi' mtpy.core.edi.combine_edifiles(in_small, edi_fns[2], out_fn=out_large, merge_freq=e2.freq[-2]) os.remove(in_small) final_fn = out_large + '_merged.edi' plotedi(final_fn, saveplot=True) def run_one_site(local_name, remote_name, tmp_dir, **kwargs): dirs = {'undecimated': 0.002, 'decimated10': 0.02, 'decimated100': 0.2} sites = get_metadata(kwargs['base_dir'], kwargs['frequency'], tmp_dir) print('got metadata') local_site = sites[local_name] remote_site = sites[remote_name] dir_name = local_site['name'] + remote_site['name'] out_dir = os.path.join(tmp_dir, dir_name) try: os.makedirs(out_dir) except OSError: pass cwd = os.getcwd() samples, tstart = write_birrp_inputs(local_site, remote_site, out_dir) print('written inputs') for dir_name in dirs.keys(): sample_rate = dirs[dir_name] dir_name = os.path.join(out_dir, dir_name) os.chdir(dir_name) birrp_script = gen_birrp_script(dir_name, samples, sample_rate, local_name, tstart) run_birrp(dir_name, birrp_script, birrp_location) print('ran birrp') process_birrp_results(dir_name, kwargs['survey_cfg_fn'], local_site['name'], kwargs['instr_resp_fn']) print('processed results') os.chdir(cwd) merge_birrp_results(out_dir, local_name) def add_shell_run(local_site, remote_site, base_dir, tmp_dir): python_call = proc_site.format(local_site, remote_site, tmp_dir) shell_script = '\n'.join(['#!/bin/bash', '', '#PBS -P up99', '#PBS -q normal', '#PBS -l walltime=3:00:00,mem=64GB,ncpus=24,jobfs=30GB','#PBS -l storage=gdata/my80+gdata/up99', '#PBS -l wd', '', 'module load python2/2.7.17', 'module load intel-compiler/2020.2.254', 'module use /g/data/up99/modulefiles/', 'module load mtpy_nci_processing/2013_0.0.1','module load openmpi/4.0.2', '', python_call]) shell_fn = os.path.join(tmp_dir, local_site + '_' + remote_site + '.sh') with open(shell_fn, 'w') as f: f.write(shell_script) os.system('qsub {}'.format(shell_fn)) def loop_sites(base_dir, tmp_dir, survey_cfg_fn=None): sites = get_metadata(base_dir, 500, tmp_dir) if survey_cfg_fn: survey_cfg = mtpy.utils.configfile.read_survey_configfile(survey_cfg_fn) site_names = sites.keys() for site in site_names: if site not in survey_cfg: sites.pop(site, None) print(sites) for local_site in sites: for remote_site in sites: if calc_intersection(sites[local_site], sites[remote_site]): print('add {} and {}'.format(local_site, remote_site)) add_shell_run(local_site, remote_site, base_dir, tmp_dir) #sys.exit() ####uncomment line if you are troubleshooting
%%writefile proc_site.py #!/usr/bin/env python2.7 import sys from nci_birrp import * if __name__ == '__main__': kwargs = {} base_dir = '/g/data/my80/States_and_Territories/SA/Broadband/Renmark_2009/TS' birrp_location = '/g/data/up99/sandbox/bulk_birrp_processing_test/birrp/birrp' survey_cfg_fn = '/g/data/up99/sandbox/bulk_birrp_processing_test/survey_config/renmark.cfg' instr_resp_fn = '/g/data/up99/sandbox/bulk_birrp_processing_test/instrument_response_function/lemi_coils_instrument_response_freq_real_imag.txt' kwargs['instr_resp_fn'] = instr_resp_fn kwargs['survey_cfg_fn'] = survey_cfg_fn kwargs['birrp_location'] = birrp_location kwargs['base_dir'] = base_dir kwargs['frequency'] = 500 run_one_site(sys.argv[1], sys.argv[2], sys.argv[3], **kwargs)
Note that for our example, we are using a BIRRP version compiled specifically for the Renmark 2009 use case (located in /g/data/up99/sandbox/bulk_birrp_processing_test/birrp/birrp
). There is also an MPI parallelised version of BIRRP that can be used (located in /g/data/up99/sandbox/bulk_birrp_processing_test/birrp_mpi
) - this version runs much faster on the undecimated data, but was not compiled to work with the decimated100 dataset.
Now let’s log onto Gadi and change into our working directory where the nci_birrp.py
and proc_site.py
files are located. Next we need to run the following commands to set up our local environment:
$ module purge $ module load pbs $ module load python2/2.7.17 $ module use /g/data/up99/modulefiles $ module load mtpy_nci_processing/2013_0.0.1 $ module load openmpi/4.0.2
To process the entire Renmark MT survey we would need, for example, to open up an IPython terminal and execute the following commands:
%run nci_birrp.py base_dir = '/g/data/my80/States_and_Territories/SA/Broadband/Renmark_2009/TS/' birrp_location = '/g/data/up99/sandbox/bulk_birrp_processing_test/birrp/birrp' survey_cfg_fn = '/g/data/up99/sandbox/bulk_birrp_processing_test/survey_config/renmark.cfg' loop_sites(base_dir, tmp_dir, survey_cfg_fn)
We can view the progress of the jobs in their queue by running qstat -x
in a bash terminal:
%%bash qstat -x
After the jobs have executed on the NCI, the processed EDIs and plots would be found in your tmp_dir, which for this example was /g/data/up99/sandbox/bulk_birrp_processing_test/processing_directory/
.
If the processing fails, the cause can be debugged using the error file created when running the job. These are located in the same directory as the jobs are submitted (in our case the directory in which this script is run).