Source code for suncasa.suncasatasks.private.task_calibeovsa.test

from ...casa_compat import check_dependencies

check_dependencies()

import platform
import matplotlib

if platform.system() == 'Linux':
    matplotlib.use('Agg')
import os
import shutil
import numpy as np

from eovsapy.util import extract as eoextract
from eovsapy.util import Time
from eovsapy import cal_header as ch
from eovsapy import dbutil as db
from eovsapy import pipeline_cal as pc
from eovsapy.sqlutil import sql2refcalX, sql2phacalX
from .. import concateovsa
from suncasa.eovsa.update_log import EOVSA15_UPGRADE_DATE, DCM_IF_FILTER_UPGRADE_DATE

from ...casa_compat import import_casatools, import_casatasks

[docs] tasks = import_casatasks('split', 'tclean', 'gencal', 'clearcal', 'applycal', 'flagdata', 'casalog', 'bandpass')
[docs] split = tasks.get('split')
[docs] tclean = tasks.get('tclean')
[docs] gencal = tasks.get('gencal')
[docs] clearcal = tasks.get('clearcal')
[docs] applycal = tasks.get('applycal')
[docs] flagdata = tasks.get('flagdata')
[docs] casalog = tasks.get('casalog')
[docs] bandpass = tasks.get('bandpass')
[docs] tools = import_casatools(['tbtool', 'mstool', 'qatool', 'iatool'])
[docs] tbtool = tools['tbtool']
[docs] mstool = tools['mstool']
[docs] qatool = tools['qatool']
[docs] iatool = tools['iatool']
[docs] tb = tbtool()
[docs] ms = mstool()
[docs] qa = qatool()
[docs] ia = iatool()
[docs] def load_calwidget_v2_npz(npz_path): '''Load refcal + phacals from a calwidget v2 calibeovsa-ready NPZ. Returns a tuple ``(refcal, phacals)`` whose dict layout matches what :func:`eovsapy.sqlutil.sql2refcalX` and :func:`sql2phacalX` produce, so the rest of :func:`calibeovsa` can consume them unchanged. Promoted-antenna metadata (the per-antenna source phacal timestamp written by ``Promote to Refcal``) is attached to ``refcal['promoted_antennas']`` for audit purposes; the scalar ``refcal['timestamp']`` still drives the phacal-to-refcal time filter, so promoted antennas inherit the refcal's time for now (downstream is unchanged). ''' import json as _json with np.load(npz_path, allow_pickle=False) as data: kind = str(data['kind']) if kind != 'calibeovsa_v1': raise ValueError( 'Unrecognized calwidget v2 npz kind {0!r} in {1}'.format(kind, npz_path) ) vis = data['refcal__vis_real'] + 1j * data['refcal__vis_imag'] flag = np.asarray(data['refcal__flag']) # Prefer the widget's smooth analytic-fit phase (Chebyshev for Ant 1, # polynomial for Ant 2+) as the calibration phase — that IS the # calibration the widget intends to apply. Fall back to the raw # band-averaged vis phase for older NPZs without refcal__model_pha. # NaN entries in the model mark (ant, pol, band) combinations the # widget did not fit (missing bands, masked antennas); promote those # to flag=1 so downstream zeroing (pha[flag==1]=0) handles them. if 'refcal__model_pha' in data.files and data['refcal__model_pha'].size > 0: model_pha = np.asarray(data['refcal__model_pha'], dtype=np.float64) fitted = np.isfinite(model_pha) pha = np.where(fitted, model_pha, 0.0) flag = np.where(fitted, flag, 1).astype(flag.dtype) else: pha = np.angle(vis) refcal = { 'pha': pha, 'amp': np.abs(vis), 'flag': flag, 'sigma': np.asarray(data['refcal__sigma']), 'fghz': np.asarray(data['refcal__fghz']), 'timestamp': Time(float(data['refcal__timestamp_lv']), format='lv'), 't_bg': Time(float(data['refcal__t_bg_lv']), format='lv'), 't_ed': Time(float(data['refcal__t_ed_lv']), format='lv'), 'promoted_antennas': _json.loads(str(data['refcal__promoted_antennas_json'])), } phacal_ids = [int(v) for v in np.asarray(data['phacal_scan_ids']).tolist()] phacals = [] for scan_id in phacal_ids: prefix = 'phacal_{0}'.format(scan_id) inner = { 'pha': np.asarray(data[prefix + '__phacal_pha']), 'amp': np.asarray(data[prefix + '__phacal_amp']), 'flag': np.asarray(data[prefix + '__phacal_flag']), 'sigma': np.asarray(data[prefix + '__phacal_sigma']), 'fghz': np.asarray(data[prefix + '__phacal_fghz']), 'timestamp': Time(float(data[prefix + '__t_pha_lv']), format='lv'), 't_bg': Time(float(data[prefix + '__phacal_t_bg_lv']), format='lv'), 't_ed': Time(float(data[prefix + '__phacal_t_ed_lv']), format='lv'), } phacals.append({ 'pslope': np.asarray(data[prefix + '__pslope']), 't_pha': Time(float(data[prefix + '__t_pha_lv']), format='lv'), 'flag': np.asarray(data[prefix + '__flag']), 'poff': np.asarray(data[prefix + '__poff']), 't_ref': Time(float(data[prefix + '__t_ref_lv']), format='lv'), 't_bg': Time(float(data[prefix + '__t_bg_lv']), format='lv'), 't_ed': Time(float(data[prefix + '__t_ed_lv']), format='lv'), 'phacal': inner, }) return refcal, phacals
[docs] def flag_phambd_by_spw(caltb, flagspw='0~1'): sp_st, sp_ed = flagspw.split('~') tb.open(caltb, nomodify=False) phambd_spw = tb.getcol('SPECTRAL_WINDOW_ID') spwindx = np.where(np.logical_and(phambd_spw >= int(sp_st), phambd_spw <= int(sp_ed)))[0] # print(spwindx) if 'CPARAM' in tb.colnames(): data = tb.getcol('CPARAM') data[:, :, spwindx[0]:spwindx[-1] + 1][:] = complex(1) datakey = 'CPARAM' elif 'FPARAM' in tb.colnames(): data = tb.getcol('FPARAM') data[:, :, spwindx[0]:spwindx[-1] + 1][:] = 0.0 datakey = 'FPARAM' else: print(f'No calibration data found in {caltb}') tb.close() return False tb.putcol(datakey, data) tb.close() return True
[docs] def calibeovsa(vis=None, caltype=None, caltbdir='', interp=None, docalib=True, doflag=True, flagant='', flagspw='', doimage=False, imagedir=None, antenna='', timerange=None, spw=None, stokes=None, dosplit=False, outputvis=None, doconcat=False, concatvis=None, keep_orig_ms=True, cal_npz=None): ''' :param vis: EOVSA visibility dataset(s) to be calibrated :param caltype: :param interp: :param docalib: :param qlookimage: :param flagant: :param stokes: :param doconcat: :return: ''' interp0 = interp cal_npz_refcal = None cal_npz_phacals = None if cal_npz: cal_npz_refcal, cal_npz_phacals = load_calwidget_v2_npz(cal_npz) print('Loaded refcal + {0} phacal(s) from calwidget v2 NPZ {1}'.format( len(cal_npz_phacals), cal_npz)) if type(vis) == str: vis = [vis] for idx, f in enumerate(vis): if f[-1] == '/': vis[idx] = f[:-1] vis[idx] = str(vis[idx]) # check if the calibration table directory is defined # pipeline should always use "caltbdir = /data1/eovsa/caltable/" if not caltbdir: print('Task calibeovsa') print('Path for generating calibration tables not defined') print('Use current path') caltbdir = './' failed_vis = [] for msfile in vis: casalog.origin('calibeovsa') if not caltype: casalog.post("Caltype not provided. Perform reference phase calibration and daily phase calibration.") caltype = ['refpha', 'phacal'] if not os.path.exists(msfile): casalog.post("Input visibility does not exist. Skipping...") failed_vis.append(msfile) continue if msfile.endswith('/'): msfile = msfile[:-1] if not msfile[-3:] in ['.ms', '.MS']: casalog.post("Invalid visibility. Please provide a proper visibility file ending with .ms") # if not caltable: # caltable=[os.path.basename(vis).replace('.ms','.'+c) for c in caltype] try: # --- begin per-file try block --- # get band information tb.open(msfile + '/SPECTRAL_WINDOW') nspw = tb.nrows() bdname = tb.getcol('NAME') bd_nchan = tb.getcol('NUM_CHAN') bd = [int(b[4:]) - 1 for b in bdname] reffreqs = tb.getcol('REF_FREQUENCY') bandwidths = tb.getcol('TOTAL_BANDWIDTH') chan_freqs_spw0 = tb.getcol('CHAN_FREQ', startrow=0, nrow=1) cfreq_spw0 = np.mean(chan_freqs_spw0) tb.close() tb.open(msfile + '/ANTENNA') nant = tb.nrows() antname = tb.getcol('NAME') antlist = [str(ll) for ll in range(len(antname) - 1)] antennas = ','.join(antlist) tb.close() # get time stamp, use the beginning of the file tb.open(msfile + '/OBSERVATION') trs = {'BegTime': [], 'EndTime': []} for ll in range(tb.nrows()): tim0, tim1 = Time(tb.getcell('TIME_RANGE', ll) / 24 / 3600, format='mjd') trs['BegTime'].append(tim0) trs['EndTime'].append(tim1) tb.close() trs['BegTime'] = Time(trs['BegTime']) trs['EndTime'] = Time(trs['EndTime']) btime = np.min(trs['BegTime']) etime = np.max(trs['EndTime']) # ms.open(vis) # summary = ms.summary() # ms.close() # btime = Time(summary['BeginTime'], format='mjd') # etime = Time(summary['EndTime'], format='mjd') ## stop using ms.summary to avoid conflicts with importeovsa t_mid = Time((btime.mjd + etime.mjd) / 2., format='mjd') print("This scan observed from {} to {} UTC".format(btime.iso, etime.iso)) gaintables = [] spwmaps = [] if not antenna.strip(): antenna = '0~12' if not flagant.strip(): flagant = '13~15' if t_mid.mjd >= EOVSA15_UPGRADE_DATE.mjd: antenna = '0~14' flagant = '15' if ('refpha' in caltype) or ('refamp' in caltype) or ('refcal' in caltype): if cal_npz_refcal is not None: refcal = cal_npz_refcal else: refcal = sql2refcalX(btime) # shape is 15 (nant) x 2 (npol) x 34 (nband) # EOVSA15 upgrade-related Note: # the number of antennas in refcal['pha'] is changed to 16 after EOVSA15 upgrade # The last 15-ant record is 2025-05-23 and the first 16-ant record is on 2025-06-07. # But because the pha is added to para_pha in a loop of nant-1, it should be fine. # No change is needed in the code below. pha = refcal['pha'] pha[np.where(refcal['flag'] == 1)] = 0. amp = refcal['amp'] amp[np.where(refcal['flag'] == 1)] = 1. t_ref = refcal['timestamp'] # find the start and end time of the local day when refcal is registered try: dhr = t_ref.LocalTime.utcoffset().total_seconds() / 60. / 60. except: dhr = -7. bt = Time(np.fix(t_ref.mjd + dhr / 24.) - dhr / 24., format='mjd') et = Time(bt.mjd + 1., format='mjd') (yr, mon, day) = (bt.datetime.year, bt.datetime.month, bt.datetime.day) dirname = caltbdir + str(yr) + str(mon).zfill(2) + '/' if not os.path.exists(dirname): os.mkdir(dirname) # check if there is any ROACH reboot between the reference calibration found and the current data t_rbts = db.get_reboot(Time([t_ref, btime])) if not t_rbts: casalog.post("Reference calibration is derived from observation at " + t_ref.iso) print("Reference calibration is derived from observation at " + t_ref.iso) else: casalog.post( "Oh crap! Roach reboot detected between the reference calibration time " + t_ref.iso + ' and the current observation at ' + btime.iso) casalog.post("Aborting...") print( "Oh crap! Roach reboot detected between the reference calibration time " + t_ref.iso + ' and the current observation at ' + btime.iso) print("Aborting...") para_pha = [] para_amp = [] calpha = np.zeros((nspw, nant - 1, 2)) calamp = np.zeros((nspw, nant - 1, 2)) for s in range(nspw): for n in range(nant - 1): for p in range(2): calpha[s, n, p] = pha[n, p, bd[s]] calamp[s, n, p] = amp[n, p, bd[s]] para_pha.append(np.degrees(pha[n, p, bd[s]])) para_amp.append(amp[n, p, bd[s]]) if 'fluxcal' in caltype: calfac = pc.get_calfac(Time(t_mid.iso.split(' ')[0] + 'T23:59:59')) t_bp = Time(calfac['timestamp'], format='lv') if int(t_mid.mjd) == int(t_bp.mjd): accalfac = calfac['accalfac'] # (ant x pol x freq) # tpcalfac = calfac['tpcalfac'] # (ant x pol x freq) caltb_autoamp = dirname + t_bp.isot[:-4].replace(':', '').replace('-', '') + '.bandpass' if not os.path.exists(caltb_autoamp): bandpass(vis=msfile, caltable=caltb_autoamp, solint='inf', refant='eo01', minblperant=0, minsnr=0, bandtype='B', docallib=False) tb.open(caltb_autoamp, nomodify=False) # (ant x spw) bd_chanidx = np.hstack([[0], bd_nchan.cumsum()]) for ll in range(nspw): antfac = np.sqrt(accalfac[:, :, bd_chanidx[ll]:bd_chanidx[ll + 1]]) # # antfac *= tpcalfac[:, :,bd_chanidx[ll]:bd_chanidx[ll + 1]] antfac = np.moveaxis(antfac, 0, 2) cparam = np.zeros((2, bd_nchan[ll], nant)) cparam[:, :, :-3] = 1.0 / antfac tb.putcol('CPARAM', cparam + 0j, ll * nant, nant) paramerr = tb.getcol('PARAMERR', ll * nant, nant) paramerr = paramerr * 0 tb.putcol('PARAMERR', paramerr, ll * nant, nant) bpflag = tb.getcol('FLAG', ll * nant, nant) bpant1 = tb.getcol('ANTENNA1', ll * nant, nant) bpflagidx, = np.where(bpant1 >= 13) bpflag[:] = False bpflag[:, :, bpflagidx] = True tb.putcol('FLAG', bpflag, ll * nant, nant) bpsnr = tb.getcol('SNR', ll * nant, nant) bpsnr[:] = 100.0 bpsnr[:, :, bpflagidx] = 0.0 tb.putcol('SNR', bpsnr, ll * nant, nant) tb.close() msg_prompt = "Scaling calibration is derived for {}.".format(msfile) casalog.post(msg_prompt) print(msg_prompt) gaintables.append(caltb_autoamp) spwmaps.append([]) else: msg_prompt = "Caution: No TPCAL is available on {}. No scaling calibration is derived for {}.".format( t_mid.datetime.strftime('%b %d, %Y'), msfile) casalog.post(msg_prompt) print(msg_prompt) if ('refpha' in caltype) or ('refcal' in caltype): # caltb_pha = os.path.basename(vis).replace('.ms', '.refpha') # check if the calibration table already exists caltb_pha = dirname + t_ref.isot[:-4].replace(':', '').replace('-', '') + '.refpha' if not os.path.exists(caltb_pha): gencal(vis=msfile, caltable=caltb_pha, caltype='ph', antenna=antennas, pol='X,Y', spw='0~' + str(nspw - 1), parameter=para_pha) tb.open(caltb_pha, nomodify=False) phaflag_ = refcal['flag'][:, :, np.array(bd)] phaflag_new = np.full((nant, 2, nspw), True, dtype=np.bool_) if t_mid.mjd >= EOVSA15_UPGRADE_DATE.mjd: phaflag_new[...] = phaflag_ else: phaflag_new[:-1, ...] = phaflag_ phaflag_new = np.moveaxis(phaflag_new, 0, 2).reshape(2, 1, nant * nspw) tb.putcol('FLAG', phaflag_new) tb.close() # tb.open(caltb_pha, nomodify=False) # phaparam = np.angle(tb.getcol('CPARAM'),deg=True) # phaparam_ = np.degrees(refcal['pha'][:,:,np.array(bd)]) # phaparam2 = np.zeros((nant, 2, nspw)) # phaparam2[:-1,...] = phaparam_ # # phaparam2 = phaparam2.swapaxes(0,1).reshape(2,1,nant*nspw) # phaparam2 = np.moveaxis(phaparam2,0,2).reshape(2,1,nant*nspw) # tb.close() gaintables.append(caltb_pha) spwmaps.append([]) if ('refamp' in caltype) or ('refcal' in caltype): # caltb_amp = os.path.basename(vis).replace('.ms', '.refamp') caltb_amp = dirname + t_ref.isot[:-4].replace(':', '').replace('-', '') + '.refamp' if not os.path.exists(caltb_amp): gencal(vis=msfile, caltable=caltb_amp, caltype='amp', antenna=antennas, pol='X,Y', spw='0~' + str(nspw - 1), parameter=para_amp) tb.open(caltb_amp, nomodify=False) ampflag_ = refcal['flag'][:, :, np.array(bd)] ampflag_new = np.full((nant, 2, nspw), True, dtype=np.bool_) if t_mid.mjd >= EOVSA15_UPGRADE_DATE.mjd: ampflag_new[...] = ampflag_ else: ampflag_new[:-1, ...] = ampflag_ ampflag_new = np.moveaxis(ampflag_new, 0, 2).reshape(2, 1, nant * nspw) tb.putcol('FLAG', ampflag_new) tb.close() gaintables.append(caltb_amp) spwmaps.append([]) # calibration for the change of delay center between refcal time and beginning of scan -- hopefully none! xml, buf = ch.read_calX(4, t=[t_ref, btime], verbose=False) if buf is not None: dly_t2 = Time(eoextract(buf[0], xml['Timestamp']), format='lv') dlycen_ns2 = eoextract(buf[0], xml['Delaycen_ns'])[:nant - 1] xml, buf = ch.read_calX(4, t=t_ref) dly_t1 = Time(eoextract(buf, xml['Timestamp']), format='lv') dlycen_ns1 = eoextract(buf, xml['Delaycen_ns'])[:nant - 1] dlycen_ns_diff = dlycen_ns2 - dlycen_ns1 for n in range(2): dlycen_ns_diff[:, n] -= dlycen_ns_diff[0, n] print('Multi-band delay is derived from delay center difference at {} & {}'.format(dly_t1.iso, dly_t2.iso)) dlycen_pha0 = np.degrees(dlycen_ns_diff * 1e-9 * cfreq_spw0 * 2. * np.pi) # print('=====Delays relative to Ant 14=====') # for i, dl in enumerate(dlacen_ns_diff[:, 0] - dlacen_ns_diff[13, 0]): # ant = antlist[i] # print 'Ant eo{0:02d}: x {1:.2f} ns & y {2:.2f} ns'.format(int(ant) + 1, dl # dlacen_ns_diff[i, 1] - dlacen_ns_diff[13, 1]) # caltb_mbd0 = os.path.basename(vis).replace('.ms', '.mbd0') caltb_dlycen = dirname + dly_t2.isot[:-4].replace(':', '').replace('-', '') + '.dlycen' caltb_dlycen_pha0 = dirname + dly_t2.isot[:-4].replace(':', '').replace('-', '') + '.dlycen_pha0' if not os.path.exists(caltb_dlycen): gencal(vis=msfile, caltable=caltb_dlycen, caltype='mbd', pol='X,Y', antenna=antennas, parameter=dlycen_ns_diff.flatten().tolist()) if not os.path.exists(caltb_dlycen_pha0): gencal(vis=msfile, caltable=caltb_dlycen_pha0, caltype='ph', pol='X,Y', antenna=antennas, parameter=dlycen_pha0.flatten().tolist()) gaintables.append(caltb_dlycen) spwmaps.append(nspw * [0]) gaintables.append(caltb_dlycen_pha0) spwmaps.append(nspw * [0]) if 'phacal' in caltype: if cal_npz_phacals is not None: phacals = np.array(cal_npz_phacals) else: phacals = np.array(sql2phacalX([bt, et], nrecords=0, neat=True, verbose=False)) if not phacals.any() or len(phacals) == 0: print("Found no phacal records in SQL database, will skip phase calibration") else: # first generate all phacal calibration tables if not already exist t_phas = Time([phacal['t_pha'] for phacal in phacals]) # sort the array in ascending order by t_pha sinds = t_phas.mjd.argsort() t_phas = t_phas[sinds] phacals = phacals[sinds] caltbs_phambd = [] caltbs_phambd_pha0 = [] for i, phacal in enumerate(phacals): # filter out phase cals with reference time stamp >30 min away from the provided refcal time if (phacal['t_ref'].jd - refcal['timestamp'].jd) > 30. / 1440.: del phacals[i] del t_phas[i] continue else: t_pha = phacal['t_pha'] phambd_ns = phacal['pslope'] for n in range(2): phambd_ns[:, n] -= phambd_ns[0, n] # set all flagged values to be zero phambd_ns[np.where(phacal['flag'] == 1)] = 0. caltb_phambd = dirname + t_pha.isot[:-4].replace(':', '').replace('-', '') + '.phambd' caltbs_phambd.append(caltb_phambd) if os.path.exists(caltb_phambd): os.system('rm -rf ' + caltb_phambd) # if not os.path.exists(caltb_phambd): gencal(vis=msfile, caltable=caltb_phambd, caltype='mbd', pol='X,Y', antenna=antennas, parameter=phambd_ns[:nant-1,:].flatten().tolist()) if flagspw != '': flag_phambd_by_spw(caltb_phambd, flagspw=flagspw) # When applying the multi-band delays, they are referenced to the center of spw 0 # Make a corresponding calibration table for the reference phase at the center of spw 0 pha0 = np.degrees(phambd_ns * 1e-9 * cfreq_spw0 * 2. * np.pi) caltb_phambd_pha0 = dirname + t_pha.isot[:-4].replace(':', '').replace('-', '') + '.phambd_pha0' caltbs_phambd_pha0.append(caltb_phambd_pha0) if os.path.exists(caltb_phambd_pha0): os.system('rm -rf ' + caltb_phambd_pha0) # if not os.path.exists(caltb_phambd_pha0): gencal(vis=msfile, caltable=caltb_phambd_pha0, caltype='ph', pol='X,Y', antenna=antennas, parameter=pha0[:nant-1,:].flatten().tolist()) if flagspw != '': flag_phambd_by_spw(caltb_phambd_pha0, flagspw=flagspw) # now decides which table to apply depending on the interpolation method ("nearest" or "linear") dt = np.min(np.abs(t_phas.mjd - t_mid.mjd)) * 24. if interp0 == 'auto': print( f'interp method is set to auto. The interpolation method will be determined based on the time difference between the mid time of the scan and the nearest phase calibration table.') print(f'The time difference threshold is set to 1 hour') if dt < 1.: interp = 'nearest' print(f'The time difference is {dt:.1f} hours. Using nearest interp method.') else: interp = 'linear' print(f'The time difference is {dt:.1f} hours. Using linear interp method.') if interp == 'nearest': tbind = np.argmin(np.abs(t_phas.mjd - t_mid.mjd)) print("Selected nearest phase calibration table at " + t_phas[tbind].iso) gaintables.append(caltbs_phambd[tbind]) ## Note: gencal generates the same solution for all spws, so no need to specify spwmap # spwmaps.append(nspw * [0]) spwmaps.append([]) gaintables.append(caltbs_phambd_pha0[tbind]) # spwmaps.append(nspw * [0]) spwmaps.append([]) if interp == 'linear': # bphacal = sql2phacalX(btime) # ephacal = sql2phacalX(etime,reverse=True) bt_ind, = np.where(t_phas.mjd < btime.mjd) et_ind, = np.where(t_phas.mjd > etime.mjd) if len(bt_ind) == 0 and len(et_ind) == 0: print("No phacal found before or after the ms data within the day of observation") print("Skipping daily phase calibration") elif len(bt_ind) > 0 and len(et_ind) == 0: gaintables.append(caltbs_phambd[bt_ind[-1]]) # spwmaps.append(nspw * [0]) spwmaps.append([]) gaintables.append(caltbs_phambd_pha0[bt_ind[-1]]) # spwmaps.append(nspw * [0]) spwmaps.append([]) print("Using phase calibration table at " + t_phas[bt_ind[-1]].iso) elif len(bt_ind) == 0 and len(et_ind) > 0: gaintables.append(caltbs_phambd[et_ind[0]]) # spwmaps.append(nspw * [0]) spwmaps.append([]) gaintables.append(caltbs_phambd_pha0[et_ind[0]]) # spwmaps.append(nspw * [0]) spwmaps.append([]) print("Using phase calibration table at " + t_phas[et_ind[0]].iso) elif len(bt_ind) > 0 and len(et_ind) > 0: bphacal = phacals[bt_ind[-1]] ephacal = phacals[et_ind[0]] # generate a new table interpolating between two daily phase calibrations dt_obs = t_mid.mjd - bphacal['t_pha'].mjd dt_pha = ephacal['t_pha'].mjd - bphacal['t_pha'].mjd phambd_diff = ephacal['pslope'] - bphacal['pslope'] phambd_ns = bphacal['pslope'] + dt_obs / dt_pha * phambd_diff for n in range(2): phambd_ns[:, n] -= phambd_ns[0, n] # set all flagged values to be zero phambd_ns[np.where(bphacal['flag'] == 1)] = 0. phambd_ns[np.where(ephacal['flag'] == 1)] = 0. caltb_phambd_interp = dirname + t_mid.isot[:-4].replace(':', '').replace('-', '') + '.phambd' caltb_phambd_interp_pha0 = caltb_phambd_interp + '_pha0' pha0 = np.degrees(phambd_ns * 1e-9 * cfreq_spw0 * 2. * np.pi) if os.path.exists(caltb_phambd_interp): os.system('rm -rf ' + caltb_phambd_interp) # if not os.path.exists(caltb_phambd_interp): gencal(vis=msfile, caltable=caltb_phambd_interp, caltype='mbd', pol='X,Y', antenna=antennas, parameter=phambd_ns.flatten().tolist()) if flagspw != '': flag_phambd_by_spw(caltb_phambd_interp, flagspw=flagspw) if os.path.exists(caltb_phambd_interp_pha0): os.system('rm -rf ' + caltb_phambd_interp_pha0) # if not os.path.exists(caltb_phambd_interp_pha0): gencal(vis=msfile, caltable=caltb_phambd_interp_pha0, caltype='ph', pol='X,Y', antenna=antennas, parameter=pha0.flatten().tolist()) if flagspw != '': flag_phambd_by_spw(caltb_phambd_interp_pha0, flagspw=flagspw) print("Using phase calibration table interpolated between records at " + bphacal[ 't_pha'].iso + ' and ' + ephacal['t_pha'].iso) gaintables.append(caltb_phambd_interp) # spwmaps.append(nspw * [0]) spwmaps.append([]) gaintables.append(caltb_phambd_interp_pha0) # spwmaps.append(nspw * [0]) spwmaps.append([]) if docalib: clearcal(msfile) applycal(vis=msfile, gaintable=gaintables, spwmap=spwmaps, applymode='calflag', calwt=False) if doflag: # flag zeros and NaNs flagdata(vis=msfile, mode='clip', clipzeros=True) if flagant: try: flagdata(vis=msfile, antenna=flagant) except: print("Something wrong with flagant. Abort...") if doimage: from matplotlib import pyplot as plt from suncasa.utils import helioimage2fits as hf from sunpy import map as smap if not stokes: stokes = 'XX' if not timerange: timerange = '' if not spw: spw = '1~3' if not imagedir: imagedir = '.' # (yr, mon, day) = (bt.datetime.year, bt.datetime.month, bt.datetime.day) # dirname = imagedir + str(yr) + '/' + str(mon).zfill(2) + '/' + str(day).zfill(2) + '/' # if not os.path.exists(dirname): # os.makedirs(dirname) bds = [spw] nbd = len(bds) imgs = [] for bd in bds: if '~' in bd: bdstr = bd.replace('~', '-') else: bdstr = str(bd).zfill(2) imname = imagedir + '/' + os.path.basename(msfile).replace('.ms', '.bd' + bdstr) print('Cleaning image: ' + imname) try: tclean(vis=msfile, imagename=imname, antenna=antenna, spw=bd, timerange=timerange, imsize=[512], cell=['5.0arcsec'], stokes=stokes, niter=500) except: print('clean not successfull for band ' + str(bd)) else: imgs.append(imname + '.image') junks = ['.flux', '.mask', '.model', '.psf', '.residual'] for junk in junks: if os.path.exists(imname + junk): shutil.rmtree(imname + junk) tranges = [btime.iso + '~' + etime.iso] * nbd fitsfiles = [img.replace('.image', '.fits') for img in imgs] hf.imreg(vis=msfile, timerange=tranges, imagefile=imgs, fitsfile=fitsfiles, usephacenter=False) plt.figure(figsize=(6, 6)) for i, fitsfile in enumerate(fitsfiles): plt.subplot(1, nbd, i + 1) eomap = smap.Map(fitsfile) sz = eomap.data.shape if len(sz) == 4: eomap.data = eomap.data.reshape((sz[2], sz[3])) eomap.plot_settings['cmap'] = plt.get_cmap('jet') eomap.plot() eomap.draw_limb() # the next line would cause trouble in higher versions of SunPy, as it requires WCS # eomap.draw_grid() plt.show() except Exception as e: import traceback as _tb casalog.post(f"ERROR processing {msfile}: {e}. Skipping this file.") print(f"ERROR processing {msfile}: {e}") print(_tb.format_exc()) failed_vis.append(msfile) continue # Remove failed files from vis before dosplit/doconcat for fv in failed_vis: if fv in vis: vis.remove(fv) if not vis: casalog.post("All input files failed calibration. No output produced.") print("All input files failed calibration. No output produced.") return None if dosplit: if not doconcat: if not outputvis: outputvis = [vis[n].split('.')[0] + '.corrected.ms' for n in range(len(vis))] for n in range(len(vis)): split(vis=vis[n], outputvis=outputvis[n], datacolumn='corrected') if not keep_orig_ms: os.system('rm -rf {}'.format(vis[n])) else: outputvis = vis if doconcat: if not concatvis: msoutdir = os.path.dirname(vis[0]) if len(vis) == 1: vis0 = os.path.basename(vis[0]) concatvis = os.path.join(msoutdir, vis0.split('.')[0] + '.corrected.ms') if len(vis) > 1: visb = os.path.basename(vis[0]) vise = os.path.basename(vis[-1]) concatvis = os.path.join(msoutdir, visb.split('.')[0] + '-' + vise.split('.')[0][3:] + '.corrected.ms') if len(vis) == 1: split(vis=vis[0], outputvis=concatvis, datacolumn='corrected') if len(vis) > 1: concateovsa(vis, concatvis, datacolumn='corrected', keep_orig_ms=keep_orig_ms, cols2rm="model,corrected") return concatvis else: return outputvis