Source code for suncasa.eovsa.eovsa_pipeline

import argparse
import json
from datetime import datetime, timedelta
# from astropy.time import Time
import traceback
from suncasa.suncasatasks import ptclean6 as ptclean
from suncasa.suncasatasks import calibeovsa
from suncasa.suncasatasks import importeovsa

import re
import sys
import numpy as np
from eovsapy.dump_tsys import findfiles
from eovsapy.sqlutil import sql2phacalX, sql2refcalX
from eovsapy.util import Time
import os
from suncasa.eovsa import eovsa_diskmodel as ed
from suncasa.utils import mstools as mstl

from suncasa.casa_compat import import_casatasks, import_casatools
from suncasa.eovsa.update_log import  EOVSA15_UPGRADE_DATE,DCM_IF_FILTER_UPGRADE_DATE

[docs] tasks = import_casatasks('split', 'tclean', 'gencal', 'clearcal', 'applycal', 'gaincal', 'delmod')
[docs] split = tasks.get('split')
[docs] tclean = tasks.get('tclean')
[docs] gencal = tasks.get('gencal')
[docs] clearcal = tasks.get('clearcal')
[docs] applycal = tasks.get('applycal')
[docs] gaincal = tasks.get('gaincal')
[docs] delmod = tasks.get('delmod')
[docs] tools = import_casatools(['qatool', 'iatool', 'mstool', 'tbtool'])
[docs] qatool = tools['qatool']
[docs] iatool = tools['iatool']
[docs] mstool = tools['mstool']
[docs] tbtool = tools['tbtool']
[docs] ms = mstool()
[docs] tb = tbtool()
[docs] def get_tdate_from_basename(vis): # Define the regular expression pattern pattern = r'UDB(\d{8})\.ms(\.tar\.gz)?' # Extract the basename from the vis path basename = os.path.basename(vis) # Search for the pattern in the basename match = re.search(pattern, basename) if match: # Extract the date string date_str = match.group(1) # Convert the date string to a datetime object tdate = datetime.strptime(date_str, '%Y%m%d').replace(hour=20, minute=0, second=0) return tdate else: raise ValueError("The basename does not match the expected format.")
import socket import os
[docs] hostname = socket.gethostname()
[docs] is_on_server = hostname in ['pipeline', 'inti.hpcnet.campus.njit.edu']
[docs] is_on_inti = hostname == 'inti.hpcnet.campus.njit.edu'
if is_on_server:
[docs] base_dir = "/inti/data/pipeline_mirror" if is_on_inti else ""
else: base_dir = './'
[docs] class Path_config: def __init__(self, base_dir=base_dir):
[docs] self.paths = {}
[docs] self.base_dir = base_dir
# Setting paths
[docs] self.udbmsdir = self._get_env_var('EOVSAUDBMS', f'{base_dir}/data1/eovsa/fits/UDBms/')
[docs] self.udbmsscldir = self._get_env_var('EOVSAUDBMSSCL', f'{base_dir}/data1/eovsa/fits/UDBms_scl/')
[docs] self.udbmsslfcaleddir = self._get_env_var('EOVSAUDBMSSLFCALED', f'{base_dir}/data1/eovsa/fits/UDBms_slfcaled/')
[docs] self.udbdir = self._get_env_var('EOVSAUDB', f'{base_dir}/data1/eovsa/fits/UDB/')
[docs] self.caltbdir = self._get_env_var('EOVSACAL', f'{base_dir}/data1/eovsa/caltable/')
[docs] self.slfcaltbdir = self._get_env_var('EOVSASLFCAL', f'{base_dir}/data1/eovsa/slfcaltable/')
[docs] self.qlookfitsdir = self._get_env_var('EOVSAQLOOKFITS', f'{base_dir}/data1/eovsa/fits/synoptic/')
[docs] self.qlookfigdir = self._get_env_var('EOVSAQLOOKFIG', f'{base_dir}/common/webplots/qlookimg_10m/')
[docs] self.synopticfigdir = self._get_env_var('EOVSASYNOPTICFIG', f'{base_dir}/common/webplots/SynopticImg/')
[docs] self.workdir_default = self._get_env_var('EOVSAWORKDIR', f'{base_dir}/data1/workdir/')
# Print a summary of paths self._print_summary()
[docs] def _get_env_var(self, env_var, default_path): path = os.getenv(env_var) or default_path if not os.path.exists(path): # if not is_on_server: # path = os.path.basename(default_path.rstrip('/')) + '/' # if not os.path.exists(path): # os.makedirs(path) # else: os.makedirs(path) self.paths[env_var] = path return path
[docs] def _print_summary(self): print("Paths Configuration Summary:") for env_var, path in self.paths.items(): print(f" {env_var}: {path}")
# Usage
[docs] pathconfig = Path_config()
print(pathconfig.udbmsdir) # Accessing the directory path
[docs] udbmsdir = pathconfig.udbmsdir
[docs] udbmsscldir = pathconfig.udbmsscldir
[docs] udbmsslfcaleddir = pathconfig.udbmsslfcaleddir
[docs] udbdir = pathconfig.udbdir
[docs] caltbdir = pathconfig.caltbdir
[docs] slfcaltbdir = pathconfig.slfcaltbdir
[docs] qlookfitsdir = pathconfig.qlookfitsdir
[docs] qlookfigdir = pathconfig.qlookfigdir
[docs] synopticfigdir = pathconfig.synopticfigdir
[docs] workdir_default = pathconfig.workdir_default
[docs] def get_local_day_bounds(tim): """Return the local-day bounds used by the daily EOVSA pipeline.""" tim = Time(tim) if tim.mjd == np.fix(tim.mjd): tim = Time(tim.mjd + 0.5, format='mjd') dhr = tim.LocalTime.utcoffset().total_seconds() / 60 / 60 / 24 btime = Time(np.fix(tim.mjd + dhr) - dhr, format='mjd') etime = Time(btime.mjd + 1, format='mjd') return btime, etime
[docs] def get_synoptic_output_info(tim): """Return the expected synoptic daily FITS products and status file for one day.""" from suncasa.eovsa.eovsa_synoptic_imaging_pipeline_wsclean import FrequencySetup tim = Time(tim) date_str = tim.datetime.strftime('%Y%m%d') imgoutdir = os.path.join(qlookfitsdir, tim.datetime.strftime("%Y/%m/%d/")) freq_setup = FrequencySetup(tim) fitsfiles = [] for spw in freq_setup.spws: spwstr = spw.replace('~', '-') fitsfiles.append(os.path.join(imgoutdir, f'eovsa.synoptic_daily.{date_str}T200000Z.s{spwstr}.tb.fits')) statusfile = os.path.join(imgoutdir, f'eovsa.synoptic_pipeline_status.{date_str}.json') return {'date_str': date_str, 'imgoutdir': imgoutdir, 'fitsfiles': fitsfiles, 'statusfile': statusfile}
[docs] def summarize_synoptic_outputs(tim): """Summarize synoptic daily FITS availability for one pipeline day.""" info = get_synoptic_output_info(tim) existing = [f for f in info['fitsfiles'] if os.path.exists(f)] return { **info, 'existing_fitsfiles': existing, 'fits_complete': len(existing) == len(info['fitsfiles']), 'fits_count': len(existing), 'fits_expected_count': len(info['fitsfiles']), }
[docs] def read_pipeline_status(statusfile): """Read the per-day pipeline status JSON if it exists.""" if not os.path.exists(statusfile): return {} try: with open(statusfile, 'r') as infile: return json.load(infile) except Exception as exc: print(f'WARNING: Failed to read status file {statusfile}: {exc}') return {}
[docs] def write_pipeline_status(statusfile, state, **extra): """Persist a compact per-day pipeline status JSON.""" os.makedirs(os.path.dirname(statusfile), exist_ok=True) payload = { 'state': state, 'updated_utc': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), } payload.update(extra) with open(statusfile, 'w') as outfile: json.dump(payload, outfile, indent=2, sort_keys=True) outfile.write('\n') return payload
[docs] def get_calibration_deadline_utc(tim, hour=4, day_offset=2): """Return the UTC hard deadline for one observing day label.""" tim = Time(tim) return datetime( tim.datetime.year, tim.datetime.month, tim.datetime.day, hour, 0, 0, ) + timedelta(days=day_offset)
[docs] def get_calibration_readiness(tim): """Check whether the observer-written daily calibrations are ready for one day.""" tim = Time(tim) btime, etime = get_local_day_bounds(tim) deadline_utc = get_calibration_deadline_utc(tim) now_utc = datetime.utcnow() readiness = { 'ready': False, 'reason': '', 'day_start_utc': btime.iso, 'day_end_utc': etime.iso, 'hard_deadline_utc': deadline_utc.strftime('%Y-%m-%dT%H:%M:%SZ'), 'deadline_expired': now_utc >= deadline_utc, 'refcal_timestamp_utc': None, 'phacal_count': 0, 'latest_phacal_timestamp_utc': None, } try: # Match calibeovsa: the reference calibration only needs to be available # by the start of the observing day and does not need to be written # inside the same local-day window as the phacal products. refcal = sql2refcalX(btime) except Exception as exc: readiness['reason'] = f'refcal_query_failed: {exc}' return readiness if not refcal: readiness['reason'] = 'missing_refcal' return readiness ref_ts = refcal['timestamp'] readiness['refcal_timestamp_utc'] = ref_ts.iso try: phacals = sql2phacalX([btime, etime], nrecords=0, neat=True, verbose=False) or [] except Exception as exc: readiness['reason'] = f'phacal_query_failed: {exc}' return readiness valid_phacals = [] for phacal in phacals: if abs(phacal['t_ref'].jd - ref_ts.jd) <= 30. / 1440.: valid_phacals.append(phacal) readiness['phacal_count'] = len(valid_phacals) if valid_phacals: readiness['latest_phacal_timestamp_utc'] = max(ph['t_pha'] for ph in valid_phacals).iso if not valid_phacals: readiness['reason'] = 'missing_phacal' return readiness readiness['ready'] = True readiness['reason'] = 'ready' return readiness
[docs] def should_enable_smart_cal_check(enable_flag=None): """Resolve the cron-oriented calibration gate behavior.""" if enable_flag is not None: return enable_flag return os.getenv('EOVSA_PIPELINE_CRON') == '1'
[docs] def getspwfreq(vis): ''' :param vis: :return: mid frequencies in GHz of each spw in the vis ''' tb.open(vis + '/SPECTRAL_WINDOW') reffreqs = tb.getcol('REF_FREQUENCY') bdwds = tb.getcol('TOTAL_BANDWIDTH') cfreqs = reffreqs + bdwds / 2. tb.close() cfreqs = cfreqs / 1.0e9 return cfreqs
[docs] def trange2ms(trange=None, doimport=False, verbose=False, doscaling=False, overwrite=True): '''This finds all solar UDBms files within a timerange; If the UDBms file does not exist in EOVSAUDBMSSCL, create one by calling importeovsa Required inputs: trange - can be 1) a single string or Time() object in UTC: use the entire day, e.g., '2017-08-01' or Time('2017-08-01') if just a date, find all scans withing the same date in local time. if a complete time stamp, find the local date first (which may be different from that provided, and return all scans within that day 2) a range of Time(), e.g., Time(['2017-08-01 00:00','2017-08-01 23:00']) 3) None -- use current date Time.now() doimport - Boolean. If true, call importeovsa to import UDB files that are missing from those found in the directory specified in EOVSAUDBMSSCL. Otherwise, return a list of ms files it has found. doscaling - Boolean. If true, scale cross-correlation amplitudes by using auto-correlations verbose - Boolean. If true, return more information ''' import glob if trange is None: trange = Time.now() if type(trange) == list or type(trange) == str: try: trange = Time(trange) except: print('trange format not recognised. Abort....') return None # # in case of a single Time object was passed, adjusting the time range to start at the local beginning of the day. # # Initially, 'trange' is set to start at 20:00:00 UTC of the given day. # # After adjustment, 'trange' spans from 08:00:00 UTC of the same day (start of the local day) # # to 08:00:00 UTC of the following day, covering the entire local day. # # Example of change: # # Before: trange = "yyyy-mm-dd 20:00:00.000" (single starting point) # # After: trange = ["yyyy-mm-dd 08:00:00.000", "yyyy-mm-dd+1 08:00:00.000"] (full day range) try: # if single Time object, the following line would report an error nt = len(trange) if len(trange) > 1: # more than one value trange = Time([trange[0], trange[-1]]) tdatetime = trange[0].to_datetime() else: # single value in a list if trange[0].mjd == np.fix(trange[0].mjd): # if only date is given, move the time from 00 to 12 UT trange[0] = Time(trange[0].mjd + 0.5, format='mjd') tdatetime = trange[0].to_datetime() dhr = trange[0].LocalTime.utcoffset().total_seconds() / 60 / 60 / 24 btime = Time(np.fix(trange[0].mjd + dhr) - dhr, format='mjd') etime = Time(btime.mjd + 1, format='mjd') trange = Time([btime, etime]) except: # the case of a single Time object if trange.mjd == np.fix(trange.mjd): # if only date is given, move the time from 00 to 12 UT trange = Time(trange.mjd + 0.5, format='mjd') tdatetime = trange.to_datetime() dhr = trange.LocalTime.utcoffset().total_seconds() / 60 / 60 / 24 btime = Time(np.fix(trange.mjd + dhr) - dhr, format='mjd') etime = Time(btime.mjd + 1, format='mjd') trange = Time([btime, etime]) print('Selected idb files in the time range (UTC): ', trange.iso) if doscaling: udbmspath = udbmsscldir else: udbmspath = udbmsdir inpath = '{}{}/'.format(udbdir, tdatetime.strftime("%Y")) outpath = '{}{}/'.format(udbmspath, tdatetime.strftime("%Y%m")) if not os.path.exists(outpath): if verbose: print(outpath + ' does not exist. Making a new directory.') os.makedirs(outpath) msfiles = [] else: msfiles = [os.path.basename(ll).split('.')[0] for ll in glob.glob('{}UDB*.ms*'.format(outpath)) if ll.endswith('.ms') or ll.endswith('.ms.tar.gz')] msfile_synoptic = os.path.join(outpath, 'UDB' + tdatetime.strftime("%Y%m%d") + '.ms') if os.path.exists(msfile_synoptic): if overwrite and doimport: os.system(f'rm -rf {msfile_synoptic}*') # sclist = ra.findfiles(trange, projid='NormalObserving', srcid='Sun') sclist = findfiles(trange, projid='NormalObserving', srcid='Sun') udbfilelist = sclist['scanlist'] udbfilelist = [os.path.basename(ll) for ll in udbfilelist] if os.path.exists(msfile_synoptic): return {'mspath': outpath, 'udbpath': inpath, 'udbfile': sorted(udbfilelist), 'udb2ms': [], 'ms': [msfile_synoptic], 'tstlist': sclist['tstlist'], 'tedlist': sclist['tedlist']} else: udbfilelist_set = set(udbfilelist) msfiles = udbfilelist_set.intersection(msfiles) filelist = udbfilelist_set - msfiles filelist = sorted(list(filelist)) if filelist and doimport: # import multiprocessing as mprocs # ncpu = mprocs.cpu_count() # if ncpu > 10: # ncpu = 10 # if ncpu > len(filelist): # ncpu = len(filelist) ncpu = 1 importeovsa(idbfiles=[inpath + ll for ll in filelist], ncpu=ncpu, timebin="0s", width=1, visprefix=outpath, nocreatms=False, doconcat=False, modelms="", doscaling=doscaling, keep_nsclms=False, udb_corr=True) msfiles = [os.path.basename(ll).split('.')[0] for ll in glob.glob('{}UDB*.ms*'.format(outpath)) if ll.endswith('.ms') or ll.endswith('.ms.tar.gz')] udbfilelist_set = set(udbfilelist) msfiles = udbfilelist_set.intersection(msfiles) filelist = udbfilelist_set - msfiles filelist = sorted(list(filelist)) return {'mspath': outpath, 'udbpath': inpath, 'udbfile': sorted(udbfilelist), 'udb2ms': filelist, 'ms': [outpath + ll + '.ms' for ll in sorted(list(msfiles))], 'tstlist': sclist['tstlist'], 'tedlist': sclist['tedlist']}
[docs] def calib_pipeline(trange, workdir=None, doimport=False, overwrite=False, clearcache=False, verbose=False, pols='XX', version='v3.0', ncpu='auto', caltype=['refpha', 'phacal'], interp='nearest', force_imaging_rerun=False): ''' trange: can be 1) a single Time() object: use the entire day 2) a range of Time(), e.g., Time(['2017-08-01 00:00','2017-08-01 23:00']) 3) a single or a list of UDBms file(s) 4) None -- use current date Time.now() ''' if workdir is None: workdir = workdir_default os.chdir(workdir) if isinstance(trange, Time): mslist = trange2ms(trange=trange, doimport=False) invis = mslist['ms'] if isinstance(trange, str): try: mslist = trange2ms(trange=trange, doimport=False) invis = mslist['ms'] except: invis = [trange] for idx, f in enumerate(invis): invis[idx] = os.path.normpath(f) fileexist = False tdate = trange.datetime vispath = os.path.join(udbmsdir, tdate.strftime('%Y%m')) vis = os.path.join(vispath, tdate.strftime('UDB%Y%m%d') + '.ms') print(f'Trying to use visibility file: {vis}') if os.path.exists(vis): print(f'Visibility file {vis} exists.') fileexist = True else: if os.path.exists(f'{vis}.tar.gz'): print(f'Visibility file {vis}.tar.gz exists. Extracting...') fileexist = True vis= f'{vis}.tar.gz' # os.system(f'tar -xzf {vis}.tar.gz -C {vispath}') print(f'Visibility file exists: {fileexist}') if doimport: print(f'doimport: {doimport}') if overwrite: print('Overwriting existing visibility file...') fileexist = False elif fileexist: print('Visibility file already exists; reusing it instead of overwriting.') if not fileexist: if not doimport: print('WARNING: No visibility file exists and doimport=False. Aborting without import.') print(f'DEBUG calib_pipeline: trange={trange}, doimport={doimport}') return None print('Visibility file does not exist. Running calibration pipeline...') if isinstance(trange, Time): mslist = trange2ms(trange=trange, doimport=doimport, overwrite=overwrite) invis = mslist['ms'] if isinstance(trange, str): try: mslist = trange2ms(trange=trange, doimport=doimport, overwrite=overwrite) invis = mslist['ms'] except: invis = [trange] for idx, f in enumerate(invis): invis[idx] = f.rstrip('/') print("DEBUG calib_pipeline: trange2ms result") print(f" trange={trange}") print(f" doimport={doimport}") print(f" fileexist={fileexist}") print(f" mslist.keys()={list(mslist.keys())}") print(f" n_invis={len(invis)}") print(f" invis={invis}") print(f" mspath={mslist.get('mspath')}") print(f" udbpath={mslist.get('udbpath')}") print(f" udbfile={mslist.get('udbfile')}") print(f" udb2ms={mslist.get('udb2ms')}") if not invis: print('WARNING: Import was requested but no MS files were returned. Aborting.') print(f'DEBUG calib_pipeline: mslist={mslist}') return None outputvis = os.path.join(os.path.dirname(invis[0]), os.path.basename(invis[0])[:11] + '.ms') tdate = get_tdate_from_basename(outputvis) flagant = '13~15' if Time(tdate).mjd >= EOVSA15_UPGRADE_DATE.mjd else '15' vis = calibeovsa(invis, caltype=caltype, caltbdir=caltbdir, interp=interp, doflag=True, flagant=flagant, doimage=False, doconcat=True, concatvis=outputvis, keep_orig_ms=False) else: if verbose: print(f'Using existing visibility file: {vis}') # tdate = mstl.get_trange(vis)[0] tdate = get_tdate_from_basename(vis) udbmspath = udbmsslfcaleddir outpath = os.path.join(udbmspath, tdate.strftime('%Y%m')) + '/' if not os.path.exists(outpath): os.makedirs(outpath) imgoutdir = os.path.join(qlookfitsdir, tdate.strftime("%Y/%m/%d/")) if not os.path.exists(imgoutdir): os.makedirs(imgoutdir) figoutdir = os.path.join(synopticfigdir, tdate.strftime("%Y/")) if not os.path.exists(figoutdir): os.makedirs(figoutdir) if version == 'v1.0': output_file_path = os.path.join(outpath, tdate.strftime('UDB%Y%m%d') + '.ms') else: output_file_path = os.path.join(outpath, tdate.strftime('UDB%Y%m%d') + f'.{version}.ms') slfcaltbdir_path = os.path.join(slfcaltbdir, tdate.strftime('%Y%m')) + '/' if verbose: print('input of pipeline_run:') print({'vis': vis, 'outputvis': output_file_path, 'workdir': workdir, 'slfcaltbdir': slfcaltbdir_path, 'imgoutdir': imgoutdir, 'figoutdir': figoutdir, 'overwrite': overwrite, 'clearcache': clearcache, 'pols': pols, 'ncpu': ncpu}) overwrite_pipeline = overwrite or force_imaging_rerun if force_imaging_rerun and version == 'v3.0': print(f'Cron recovery mode enabled for {tdate.strftime("%Y-%m-%d")}: rerunning imaging despite existing outputvis.') if version == 'v1.0': vis = ed.pipeline_run(vis, outputvis=output_file_path, workdir=workdir, slfcaltbdir=slfcaltbdir_path, imgoutdir=imgoutdir, figoutdir=figoutdir, clearcache=clearcache, pols=pols) elif version == 'v2.0': from suncasa.eovsa import eovsa_synoptic_imaging_pipeline as esip vis = esip.pipeline_run(vis, outputvis=output_file_path, workdir=workdir, slfcaltbdir=slfcaltbdir_path, imgoutdir=imgoutdir, figoutdir=figoutdir, clearcache=clearcache, pols=pols, ncpu=ncpu, overwrite=overwrite) elif version == 'v3.0': from suncasa.eovsa import eovsa_synoptic_imaging_pipeline_wsclean as esip vis = esip.pipeline_run(vis, outputvis=output_file_path, workdir=workdir, slfcaltbdir=slfcaltbdir_path, imgoutdir=imgoutdir, pols=pols, overwrite=overwrite_pipeline) if clearcache: os.system(f'rm -rf {workdir}/*') else: print(f'Version {version} is not supported. Valid versions are v1.0, v2.0, and v3.0. Use the default version 1.0.') vis = ed.pipeline_run(vis, outputvis=output_file_path, workdir=workdir, slfcaltbdir=slfcaltbdir_path, imgoutdir=imgoutdir, figoutdir=figoutdir, clearcache=clearcache, pols=pols) return vis
[docs] def mk_qlook_image(trange, doimport=False, docalib=False, ncpu=10, twidth=12, stokes=None, antenna='0~12', lowcutoff_freq=3.7, imagedir=None, spws=['1~5', '6~10', '11~15', '16~25'], toTb=True, overwrite=True, doslfcal=False, verbose=False): ''' trange: can be 1) a single Time() object: use the entire day 2) a range of Time(), e.g., Time(['2017-08-01 00:00','2017-08-01 23:00']) 3) a single or a list of UDBms file(s) 4) None -- use current date Time.now() ''' antenna0 = antenna if isinstance(trange, Time): mslist = trange2ms(trange=trange, doimport=doimport) vis = mslist['ms'] tsts = [l.to_datetime() for l in mslist['tstlist']] if isinstance(trange, str): try: date = Time(trange) mslist = trange2ms(trange=trange, doimport=doimport) vis = mslist['ms'] tsts = [l.to_datetime() for l in mslist['tstlist']] except: vis = [trange] tsts = [] for v in vis: tb.open(v + '/OBSERVATION') tsts.append(Time(tb.getcell('TIME_RANGE')[0] / 24 / 3600, format='mjd').datetime) tb.close() subdir = [tst.strftime("%Y/%m/%d/") for tst in tsts] for idx, f in enumerate(vis): if f[-1] == '/': vis[idx] = f[:-1] if not stokes: stokes = 'XX' if not imagedir: imagedir = './' imres = {'Succeeded': [], 'BeginTime': [], 'EndTime': [], 'ImageName': [], 'Spw': [], 'Vis': [], 'Synoptic': {'Succeeded': [], 'BeginTime': [], 'EndTime': [], 'ImageName': [], 'Spw': [], 'Vis': []}} for n, msfile in enumerate(vis): msfilebs = os.path.basename(msfile) imdir = imagedir + subdir[n] if not os.path.exists(imdir): os.makedirs(imdir) if doslfcal: slfcalms = './' + msfilebs + '.xx' split(msfile, outputvis=slfcalms, datacolumn='corrected', correlation='XX') cfreqs = getspwfreq(msfile) for spw in spws: antenna = antenna0 if spw == '': continue spwran = [s.zfill(2) for s in spw.split('~')] freqran = [cfreqs[int(s)] for s in spw.split('~')] cfreq = np.mean(freqran) bmsz = max(150. / cfreq, 20.) uvrange = '<10klambda' if doslfcal: slfcal_img = './' + msfilebs + '.slf.spw' + spw.replace('~', '-') + '.slfimg' slfcal_tb = './' + msfilebs + '.slf.spw' + spw.replace('~', '-') + '.slftb' try: tclean(vis=slfcalms, antenna=antenna, imagename=slfcal_img, spw=spw, mode='mfs', timerange='', deconvolver='hogbom', imsize=[512, 512], cell=['5arcsec'], niter=100, gain=0.05, stokes='I', weighting='natural', restoringbeam=[str(bmsz) + 'arcsec'], pbcor=False, interactive=False, usescratch=True) except: print('error in cleaning spw: ' + spw) break gaincal(vis=slfcalms, refant='0', antenna=antenna, caltable=slfcal_tb, spw=spw, uvrange='', gaintable=[], selectdata=True, timerange='', solint='600s', gaintype='G', calmode='p', combine='', minblperant=3, minsnr=2, append=False) if not os.path.exists(slfcal_tb): print('No solution found in spw: ' + spw) break else: clearcal(slfcalms) delmod(slfcalms) applycal(vis=slfcalms, gaintable=[slfcal_tb], spw=spw, selectdata=True, antenna=antenna, interp='nearest', flagbackup=False, applymode='calonly', calwt=False) msfile = slfcalms imsize = 512 cell = ['5arcsec'] if len(spwran) == 2: spwstr = spwran[0] + '~' + spwran[1] else: spwstr = spwran[0] restoringbeam = ['{0:.1f}arcsec'.format(bmsz)] imagesuffix = '.spw' + spwstr.replace('~', '-') if cfreq > 10.: antenna = antenna + ';!0&1;!0&2' # deselect the shortest baselines # else: # antenna = antenna + ';!0&1' # deselect the shortest baselines res = ptclean(vis=msfile, imageprefix=imdir, imagesuffix=imagesuffix, twidth=twidth, uvrange=uvrange, spw=spw, ncpu=ncpu, niter=1000, gain=0.05, antenna=antenna, imsize=imsize, cell=cell, stokes=stokes, doreg=True, usephacenter=False, overwrite=overwrite, toTb=toTb, restoringbeam=restoringbeam, specmode="mfs", deconvolver="hogbom", datacolumn='data', pbcor=True) if res: imres['Succeeded'] += res['Succeeded'] imres['BeginTime'] += res['BeginTime'] imres['EndTime'] += res['EndTime'] imres['ImageName'] += res['ImageName'] imres['Spw'] += [spwstr] * len(res['ImageName']) imres['Vis'] += [msfile] * len(res['ImageName']) else: continue if len(vis) == 1: # produce the band-by-band whole-day images ms.open(msfile) ms.selectinit() timfreq = ms.getdata(['time', 'axis_info'], ifraxis=True) tim = timfreq['time'] ms.done() cfreqs = getspwfreq(msfile) imdir = imagedir + subdir[0] if not os.path.exists(imdir): os.makedirs(imdir) for spw in spws: antenna = antenna0 if spw == '': spw = '{:d}~{:d}'.format(next(x[0] for x in enumerate(cfreqs) if x[1] > lowcutoff_freq), len(cfreqs) - 1) spwran = [s.zfill(2) for s in spw.split('~')] freqran = [cfreqs[int(s)] for s in spw.split('~')] cfreq = np.mean(freqran) bmsz = max(150. / cfreq, 20.) uvrange = '' imsize = 512 cell = ['5arcsec'] if len(spwran) == 2: spwstr = spwran[0] + '~' + spwran[1] else: spwstr = spwran[0] restoringbeam = ['{0:.1f}arcsec'.format(bmsz)] imagesuffix = '.synoptic.spw' + spwstr.replace('~', '-') antenna = antenna + ';!0&1' # deselect the shortest baselines res = ptclean(vis=msfile, imageprefix=imdir, imagesuffix=imagesuffix, twidth=len(tim), uvrange=uvrange, spw=spw, ncpu=1, niter=0, gain=0.05, antenna=antenna, imsize=imsize, cell=cell, stokes=stokes, doreg=True, usephacenter=False, overwrite=overwrite, toTb=toTb, restoringbeam=restoringbeam, specmode="mfs", deconvolver="hogbom", datacolumn='data', pbcor=True) if res: imres['Synoptic']['Succeeded'] += res['Succeeded'] imres['Synoptic']['BeginTime'] += res['BeginTime'] imres['Synoptic']['EndTime'] += res['EndTime'] imres['Synoptic']['ImageName'] += res['ImageName'] imres['Synoptic']['Spw'] += [spwstr] * len(res['ImageName']) imres['Synoptic']['Vis'] += [msfile] * len(res['ImageName']) else: continue # save it for debugging purposes np.savez('imres.npz', imres=imres) return imres
[docs] def plt_qlook_image(imres, figdir=None, verbose=True, synoptic=False): from matplotlib import pyplot as plt from sunpy import map as smap from sunpy import sun from matplotlib import colors import astropy.units as u from suncasa.utils import plot_mapX as pmX # from matplotlib import gridspec as gridspec if not figdir: figdir = './' nspw = len(set(imres['Spw'])) plttimes = list(set(imres['BeginTime'])) ntime = len(plttimes) # sort the imres according to time images = np.array(imres['ImageName']) btimes = Time(imres['BeginTime']) etimes = Time(imres['EndTime']) spws = np.array(imres['Spw']) suc = np.array(imres['Succeeded']) inds = btimes.argsort() images_sort = images[inds].reshape(ntime, nspw) btimes_sort = btimes[inds].reshape(ntime, nspw) suc_sort = suc[inds].reshape(ntime, nspw) if verbose: print('{0:d} figures to plot'.format(ntime)) plt.ioff() fig = plt.figure(figsize=(8, 8)) plt.subplots_adjust(left=0, bottom=0, right=1, top=1, wspace=0, hspace=0) axs = [] ims = [] pltst = 0 for i in range(ntime): plt.ioff() plttime = btimes_sort[i, 0] tofd = plttime.mjd - np.fix(plttime.mjd) suci = suc_sort[i] if not synoptic: if tofd < 16. / 24. or sum( suci) < nspw - 2: # if time of the day is before 16 UT (and 24 UT), skip plotting (because the old antennas are not tracking) continue else: if pltst == 0: i0 = i pltst = 1 else: if pltst == 0: i0 = i pltst = 1 if i == i0: if synoptic: timetext = fig.text(0.01, 0.98, plttime.iso[:10], color='w', fontweight='bold', fontsize=12, ha='left') else: timetext = fig.text(0.01, 0.98, plttime.iso[:19], color='w', fontweight='bold', fontsize=12, ha='left') else: if synoptic: timetext.set_text(plttime.iso[:10]) else: timetext.set_text(plttime.iso[:19]) if verbose: print('Plotting image at: ', plttime.iso) for n in range(nspw): plt.ioff() if i == i0: if nspw == 1: ax = fig.add_subplot(111) else: ax = fig.add_subplot(nspw / 2, 2, n + 1) axs.append(ax) else: ax = axs[n] image = images_sort[i, n] if suci[n] or os.path.exists(image): try: eomap = smap.Map(image) except: continue data = eomap.data sz = data.shape if len(sz) == 4: data = data.reshape((sz[2], sz[3])) data[np.isnan(data)] = 0.0 # add a basin flux to the image to avoid negative values data = data + 0.8e5 data[data < 0] = 0.0 data = np.sqrt(data) eomap = smap.Map(data, eomap.meta) # resample the image for plotting dim = u.Quantity([256, 256], u.pixel) eomap = eomap.resample(dim) else: # make an empty map data = np.zeros((256, 256)) header = {"DATE-OBS": plttime.isot, "EXPTIME": 0., "CDELT1": 10., "NAXIS1": 256, "CRVAL1": 0., "CRPIX1": 128.5, "CUNIT1": "arcsec", "CTYPE1": "HPLN-TAN", "CDELT2": 10., "NAXIS2": 256, "CRVAL2": 0., "CRPIX2": 128.5, "CUNIT2": "arcsec", "CTYPE2": "HPLT-TAN", "HGLT_OBS": sun.heliographic_solar_center(plttime)[1].value, "HGLN_OBS": 0., "RSUN_OBS": sun.solar_semidiameter_angular_size(plttime).value, "RSUN_REF": sun.constants.radius.value, "DSUN_OBS": sun.sunearth_distance(plttime).to(u.meter).value, } eomap = smap.Map(data, header) if i == i0: eomap_ = pmX.Sunmap(eomap) # im = eomap_.imshow(axes=ax, cmap='jet', norm=colors.LogNorm(vmin=0.1, vmax=1e8)) im = eomap_.imshow(axes=ax, cmap='jet', norm=colors.Normalize(vmin=150, vmax=700)) ims.append(im) if not synoptic: eomap_.draw_limb(axes=ax) eomap_.draw_grid(axes=ax) ax.set_xlim([-1080, 1080]) ax.set_ylim([-1080, 1080]) try: cfreq = eomap.meta['crval3'] / 1.0e9 bdwid = eomap.meta['cdelt3'] / 1.0e9 ax.text(0.98, 0.01, '{0:.1f} - {1:.1f} GHz'.format(cfreq - bdwid / 2.0, cfreq + bdwid / 2.0), color='w', transform=ax.transAxes, fontweight='bold', ha='right') except: pass ax.set_title(' ') ax.set_xlabel('') ax.set_ylabel('') ax.set_xticklabels(['']) ax.set_yticklabels(['']) else: ims[n].set_data(eomap.data) fig_tdt = plttime.to_datetime() if synoptic: fig_subdir = fig_tdt.strftime("%Y/") figname = 'eovsa_qlimg_' + plttime.iso[:10].replace('-', '') + '.png' else: fig_subdir = fig_tdt.strftime("%Y/%m/%d/") figname = 'eovsa_qlimg_' + plttime.isot.replace(':', '').replace('-', '')[:15] + '.png' figdir_ = figdir + fig_subdir if not os.path.exists(figdir_): os.makedirs(figdir_) if verbose: print('Saving plot to :' + figdir_ + figname) plt.savefig(figdir_ + figname) plt.close(fig)
[docs] def qlook_image_pipeline(date, twidth=10, ncpu=15, doimport=False, docalib=False, synoptic=False, overwrite=True): ''' date: date string or Time object. e.g., '2017-07-15' or Time('2017-07-15') ''' import pytz from datetime import datetime if date is None: date = Time.now() try: date = Time(date) except: print('date format not recognised. Abort....') return None if date.mjd >= DCM_IF_FILTER_UPGRADE_DATE.mjd: ## the last '' window is for fullBD synthesis image. Now obsolete. # spws = ['6~10', '11~20', '21~30', '31~43', ''] # spws = ['6~10', '11~20', '21~30', '31~43'] spws = ['0~1', '2~5', '6~10', '11~20', '21~30', '31~49'] else: ## the last '' window is for fullBD synthesis image. Now obsolete. # spws = ['1~5', '6~10', '11~15', '16~25', ''] spws = ['1~3', '4~6', '7~10', '10~14', '15~20', '21~30'] if docalib: vis = calib_pipeline(date, doimport=doimport, synoptic=synoptic) imagedir = qlookfitsdir if synoptic: vis_synoptic = os.path.join(udbmsdir, date.datetime.strftime("%Y%m"), 'UDB' + date.datetime.strftime("%Y%m%d") + '.ms') if os.path.exists(vis_synoptic): date = vis_synoptic else: print('Whole-day ms file {} not existed. About..... Use pipeline to make one.'.format(vis_synoptic)) return None imres = mk_qlook_image(date, twidth=twidth, ncpu=ncpu, doimport=doimport, docalib=docalib, imagedir=imagedir, spws=spws, verbose=True, overwrite=overwrite) figdir = qlookfigdir plt_qlook_image(imres, figdir=figdir, verbose=True)
# if imres['Synoptic']['Succeeded']: # figdir = synopticfigdir # imres_bds = {} # # imres_allbd = {} # for k, v in imres['Synoptic'].items(): # imres_bds[k] = v # [:4] # # imres_allbd[k] = v[4:] # # plt_qlook_image(imres_bds, figdir=figdir, verbose=True, synoptic=True) # # plt_qlook_image(imres_allbd, figdir=figdir + 'FullBD/', verbose=True, synoptic=True)
[docs] def pipeline(year=None, month=None, day=None, ndays=1, clearcache=True, overwrite=False, doimport=True, pols='XX', version='v1.0', ncpu='auto', debugging=False, caltype=['refpha', 'phacal'], interp='nearest', smart_cal_check=None): """ Main pipeline for importing and calibrating EOVSA visibility data. Name: eovsa_pipeline --- main pipeline for importing and calibrating EOVSA visibility data. Synopsis: eovsa_pipeline.py [options]... [DATE_IN_YY_MM_DD] Description: Import and calibrate EOVSA visibility data of the date specified by DATE_IN_YY_MM_DD (or from ndays before the DATE_IN_YY_MM_DD if option --ndays/-n is provided). If DATE_IN_YY_MM_DD is omitted, it will be set to 2 days before now by default. There are no mandatory arguments in this command. :param year: The year for which data should be processed, defaults to None. :type year: int, optional :param month: The month for which data should be processed, defaults to None. :type month: int, optional :param day: The day for which data should be processed, defaults to None. :type day: int, optional :param ndays: Number of days before the specified date to include in the processing, defaults to 1. :type ndays: int, optional :param clearcache: Whether to clear cache after processing, defaults to True. :type clearcache: bool, optional :param overwrite: Whether to overwrite existing files, defaults to True. :type overwrite: bool, optional :param doimport: Whether to perform the import step, defaults to True. :type doimport: bool, optional :param pols: Polarizations to process, can be 'XX' or 'XXYY', defaults to 'XX'. :type pols: str, optional :param version: Version of the pipeline to use, choices are 'v1.0' or 'v2.0', defaults to 'v1.0'. :type version: str, optional :param ncpu: Number of CPUs to use for processing, defaults to 'auto'. :type ncpu: str, optional :param debugging: Whether to run the pipeline in debugging mode, defaults to False. :type debugging: bool, optional :param caltype: Calibration types to use, defaults to ['refpha','phacal']. :type caltype: list, optional :param interp: Interpolation method to use for calibration tables, defaults to 'nearest'. Options are 'nearest', 'linear' :type interp: str, optional :param smart_cal_check: When True, perform cron-oriented calibration readiness checks and track per-day status before processing, defaults to ``None``. If ``None``, the behavior is enabled automatically when ``EOVSA_PIPELINE_CRON=1``. The gate waits for same observing-day calibration records until the hard deadline at ``04:00 UTC`` two days after the observing-day label, then allows the run to proceed with the latest calibration records available in MySQL. :type smart_cal_check: bool, optional :raises ValueError: Raises an exception if the date parameters are out of the valid Gregorian calendar range. Example: -------- To process data for November 24th, 2021 using version 2.0 of the pipeline, with all options enabled: >>> python eovsa_pipeline.py --date 2021-11-24T20:00 --clearcache --overwrite --doimport --pols XX --version v2.0 --ndays 2 If you want to see the help message, you can run: >>> python eovsa_pipeline.py -h """ smart_cal_check = should_enable_smart_cal_check(smart_cal_check) workdir = workdir_default os.chdir(workdir) if year is None: # Default behavior: Process data from one day prior to the current date. # Calculate the Modified Julian Date (MJD) for yesterday. mjdnow = Time.now().mjd - 1 # Convert MJD to a datetime object and format it to start processing at 20:00 UT. t = Time(Time(mjdnow, format='mjd').to_datetime().strftime('%Y-%m-%dT20:00')) else: t = Time('{}-{:02d}-{:02d} 20:00'.format(year, month, day)) for d in range(ndays): t1 = Time(t.mjd - d, format='mjd') datestr = t1.iso[:10] synoptic_info = summarize_synoptic_outputs(t1) statusfile = synoptic_info['statusfile'] if smart_cal_check and version == 'v3.0': readiness = {} run_state = 'running' if synoptic_info['fits_complete']: write_pipeline_status( statusfile, 'success', date=datestr, fits_count=synoptic_info['fits_count'], fits_expected_count=synoptic_info['fits_expected_count'], fitsfiles=synoptic_info['existing_fitsfiles'], outputvis_exists=os.path.exists( os.path.join(udbmsslfcaleddir, t1.datetime.strftime('%Y%m'), t1.datetime.strftime('UDB%Y%m%d') + f'.{version}.ms') ) or os.path.exists( os.path.join(udbmsslfcaleddir, t1.datetime.strftime('%Y%m'), t1.datetime.strftime('UDB%Y%m%d') + f'.{version}.ms.tar.gz') ), ) print(f'Synoptic FITS already complete for {datestr}. Skipping cron run.') continue readiness = get_calibration_readiness(t1) if not readiness['ready']: if readiness.get('deadline_expired'): run_state = 'running_after_deadline' run_message = ( f'Calibration still not ready for {datestr} after hard deadline ' f'{readiness["hard_deadline_utc"]}; running with the most recent ' f'calibration records currently available in MySQL.' ) else: write_pipeline_status( statusfile, 'waiting_for_calibration', date=datestr, fits_count=synoptic_info['fits_count'], fits_expected_count=synoptic_info['fits_expected_count'], fitsfiles=synoptic_info['existing_fitsfiles'], outputvis_exists=os.path.exists( os.path.join(udbmsslfcaleddir, t1.datetime.strftime('%Y%m'), t1.datetime.strftime('UDB%Y%m%d') + f'.{version}.ms') ) or os.path.exists( os.path.join(udbmsslfcaleddir, t1.datetime.strftime('%Y%m'), t1.datetime.strftime('UDB%Y%m%d') + f'.{version}.ms.tar.gz') ), **readiness, ) print(f'Skipping {datestr}: calibration not ready ({readiness["reason"]}).') continue print(run_message) write_pipeline_status( statusfile, run_state, date=datestr, fits_count=synoptic_info['fits_count'], fits_expected_count=synoptic_info['fits_expected_count'], fitsfiles=synoptic_info['existing_fitsfiles'], **readiness, ) subdir = os.path.join(workdir, t1.datetime.strftime('%Y%m%d/')) if not os.path.exists(subdir): os.makedirs(subdir) else: if overwrite: os.system('rm -rf {}/*'.format(subdir)) # ##debug # vis_corrected = calib_pipeline(datestr, overwrite=overwrite, doimport=doimport, # workdir=subdir, clearcache=False, pols=pols) if debugging: vis_corrected = calib_pipeline(t1, overwrite=overwrite, doimport=doimport, workdir=subdir, clearcache=False, pols=pols, version=version, ncpu=ncpu, caltype=caltype, interp=interp, force_imaging_rerun=smart_cal_check and version == 'v3.0') else: try: vis_corrected = calib_pipeline(t1, overwrite=overwrite, doimport=doimport, workdir=subdir, clearcache=False, pols=pols, version=version, ncpu=ncpu, caltype=caltype, interp=interp, force_imaging_rerun=smart_cal_check and version == 'v3.0') except Exception as e: print(f'error in processing {datestr}. Error message: {e}') print(traceback.format_exc()) if smart_cal_check and version == 'v3.0': synoptic_info = summarize_synoptic_outputs(t1) write_pipeline_status( statusfile, 'failed', date=datestr, fits_count=synoptic_info['fits_count'], fits_expected_count=synoptic_info['fits_expected_count'], fitsfiles=synoptic_info['existing_fitsfiles'], error=str(e), calibration_ready=readiness.get('ready'), calibration_reason=readiness.get('reason'), ran_after_deadline=run_state == 'running_after_deadline', hard_deadline_utc=readiness.get('hard_deadline_utc'), refcal_timestamp_utc=readiness.get('refcal_timestamp_utc'), latest_phacal_timestamp_utc=readiness.get('latest_phacal_timestamp_utc'), ) continue if smart_cal_check and version == 'v3.0': synoptic_info = summarize_synoptic_outputs(t1) outputvis_root = os.path.join( udbmsslfcaleddir, t1.datetime.strftime('%Y%m'), t1.datetime.strftime('UDB%Y%m%d') + f'.{version}.ms' ) outputvis_exists = os.path.exists(outputvis_root) or os.path.exists(f'{outputvis_root}.tar.gz') state = 'success' if synoptic_info['fits_complete'] else 'partial' write_pipeline_status( statusfile, state, date=datestr, fits_count=synoptic_info['fits_count'], fits_expected_count=synoptic_info['fits_expected_count'], fitsfiles=synoptic_info['existing_fitsfiles'], outputvis_exists=outputvis_exists, pipeline_result_type=type(vis_corrected).__name__, calibration_ready=readiness.get('ready'), calibration_reason=readiness.get('reason'), ran_after_deadline=run_state == 'running_after_deadline', hard_deadline_utc=readiness.get('hard_deadline_utc'), refcal_timestamp_utc=readiness.get('refcal_timestamp_utc'), latest_phacal_timestamp_utc=readiness.get('latest_phacal_timestamp_utc'), ) if clearcache: os.chdir(workdir) os.system('rm -rf {}'.format(subdir))
if __name__ == '__main__': # Define the parser
[docs] parser = argparse.ArgumentParser(description='EOVSA Pipeline for importing and calibrating visibility data.')
# Default date is set to one day before the current date, formatted as YYYY-MM-DDT20:00 default_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%dT20:00') parser.add_argument('--date', type=str, default=default_date, help='Date to process in YYYY-MM-DDT20:00 format, defaults to 20:00 UT of one day before the current date.') parser.add_argument('--clearcache', action='store_true', default=False, help='Remove temporary files after processing') parser.add_argument('--ndays', type=int, default=1, help='Process data from DATE_IN_YY_MM_DD-ndays to DATE_IN_YY_MM_DD, default is 1.') parser.add_argument('--overwrite', action='store_true', default=False, help='Overwrite existing processed data') parser.add_argument('--doimport', action='store_true', default=False, help='Perform import step before processing') parser.add_argument('--pols', type=str, default='XX', choices=['XX', 'XXYY'], help='Polarizations to process') parser.add_argument('--ncpu', type=str, default='auto', help='Number of CPUs to use for processing') parser.add_argument('--version', type=str, default='v3.0', choices=['v1.0', 'v2.0', 'v3.0'], help='Version of the EOVSA pipeline to use') parser.add_argument('--debugging', action='store_true', default=False, help='Run the pipeline in debugging mode') parser.add_argument('--caltype', type=str, nargs='+', default=['refpha', 'phacal'], help='Calibration types to use, defaults to refpha and phacal') parser.add_argument('--interp', type=str, default='nearest', choices=['nearest', 'linear', 'auto'], help='Interpolation method for calibration tables. Options: "nearest", "linear", "auto". ' 'If "auto" is selected, calibeovsa automatically chooses the method: ' 'if the time difference between the observation and the phacal calibrations is less than 1 hour, it uses "nearest"; ' 'otherwise, it uses "linear".') parser.add_argument('--smart-cal-check', action='store_true', default=False, help='For cron-style runs, wait for same-day observer-written refcal/phacal records until ' '04:00 UTC two days later, then run with the latest MySQL calibrations; also track ' 'per-day status and allow imaging reruns when outputvis exists but daily FITS are incomplete.') # Parse the arguments args = parser.parse_args() # Convert --date argument to an astropy Time object to ensure consistency in time handling t = Time(args.date) # Extract year, month, day from the --date argument year, month, day = t.datetime.year, t.datetime.month, t.datetime.day # Run the main pipeline function pipeline(year, month, day, args.ndays, args.clearcache, args.overwrite, args.doimport, args.pols, args.version, args.ncpu, args.debugging, args.caltype, args.interp, args.smart_cal_check)