Source code for suncasa.eovsa.eovsa_synoptic_imaging_pipeline

import argparse
import inspect
import logging
import os
import shutil
import socket
from datetime import datetime, time, timedelta
from glob import glob

import astropy.units as u
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import numpy as np
import numpy.ma as ma
import pandas as pd
from astropy.time import Time
from scipy import ndimage
from sunpy import map as smap
from tqdm import tqdm

from suncasa.casa_compat import import_casatasks
from suncasa.io import ndfits
from suncasa.utils import helioimage2fits as hf
from suncasa.utils import mstools as mstl

[docs] hostname = socket.gethostname()
[docs] is_on_server = hostname in ['pipeline', 'inti.hpcnet.campus.njit.edu']
[docs] is_on_inti = hostname == 'inti.hpcnet.campus.njit.edu'
[docs] script_mode = True
logging.basicConfig(level=logging.INFO, format='EOVSA pipeline: [%(levelname)s] - %(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
[docs] tasks = import_casatasks('gaincal', 'applycal', 'clearcal', 'delmod', 'ft', 'uvsub', 'split', 'concat', 'flagmanager', 'flagdata', 'tclean', 'hanningsmooth', 'imhead')
[docs] gaincal = tasks.get('gaincal')
[docs] applycal = tasks.get('applycal')
[docs] clearcal = tasks.get('clearcal')
[docs] delmod = tasks.get('delmod')
[docs] ft = tasks.get('ft')
[docs] uvsub = tasks.get('uvsub')
[docs] split = tasks.get('split')
[docs] concat = tasks.get('concat')
[docs] flagmanager = tasks.get('flagmanager')
[docs] flagdata = tasks.get('flagdata')
[docs] tclean = tasks.get('tclean')
[docs] hanningsmooth = tasks.get('hanningsmooth')
[docs] imhead = tasks.get('imhead')
from suncasa.casa_compat import import_casatools
[docs] tools = import_casatools(['qatool', 'iatool', 'cltool', 'mstool', 'tbtool'])
[docs] qatool = tools['qatool']
[docs] iatool = tools['iatool']
[docs] cltool = tools['cltool']
[docs] mstool = tools['mstool']
[docs] tbtool = tools['tbtool']
[docs] qa = qatool()
[docs] ia = iatool()
[docs] ms = mstool()
[docs] tb = tbtool()
[docs] def log_print(level, message): # Get the caller's stack frame and extract information frame = inspect.stack()[1] module_name = inspect.getmodulename(frame[1]) function_name = frame[3] # Format the custom context information context = f"{module_name}::{function_name}" # Prepare the full log message full_message = f"{context} - {message}" # Fetch the logger logger = logging.getLogger() # Log the message with the specified level if level.upper() == 'WARNING': logger.warning(full_message) elif level.upper() == 'INFO': logger.info(full_message) elif level.upper() == 'DEBUG': logger.debug(full_message) elif level.upper() == 'ERROR': logger.error(full_message) elif level.upper() == 'CRITICAL': logger.critical(full_message) else: logger.info(full_message) # Default to INFO if an unsupported level is given
[docs] def compute_snr(images, rsun_pix, crpix1, crpix2): """ Compute the SNR for each image. Args: images: 3D array of images (n_images, height, width). rsun_pix: Radius of the circular region in pixels. crpix1: X-coordinate of the circle center. crpix2: Y-coordinate of the circle center. Returns: snrs: Array of SNR values for each image. """ ny, nx = images.shape[1], images.shape[2] y, x = np.ogrid[:ny, :nx] mask = (x - crpix1) ** 2 + (y - crpix2) ** 2 <= rsun_pix ** 2 snrs = [] # signal = np.percentile(images, 99.9995) for img in images: noise_region = img[~mask] # Pixels outside the circle noise = np.sqrt(np.nanmean(noise_region ** 2)) # RMS noise signal = np.percentile(img, 99.9995) snr = signal / noise if noise > 0 else np.nan snrs.append(snr) return np.array(snrs)
[docs] def compute_standard_deviation(images): """Compute the standard deviation for each image.""" std_devs = [np.nanstd(img) for img in images] return np.array(std_devs)
[docs] def compute_mean_residuals(images): """Compute residuals between each image and the mean image.""" mean_image = np.nanmean(images, axis=0) residuals = [np.nansum(np.abs(img - mean_image)) for img in images] return np.array(residuals)
# def analyze_frequency(images): # """Analyze the frequency domain and detect high-frequency noise.""" # high_freq_scores = [] # for img in images: # fft_image = np.log1p(np.abs(fftshift(fft2(img)))) # high_freq_energy = np.sum(fft_image[fft_image > np.percentile(fft_image, 90)]) # high_freq_scores.append(high_freq_energy) # return np.array(high_freq_scores)
[docs] def detect_noisy_images(data_stack, rsun_pix, crpix1, crpix2, std_threshold=2.0, residual_threshold=2.0, snr_threshold=80.0, showplt=False): images = np.transpose(np.squeeze(data_stack), (2, 0, 1)) # Compute scores std_devs = compute_standard_deviation(images) residuals = compute_mean_residuals(images) snrs = compute_snr(images, rsun_pix, crpix1, crpix2) msnrs = ma.masked_less(snrs, snr_threshold) if msnrs.mask.all(): msnrs = ma.masked_less(snrs, snr_threshold / 2) if msnrs.mask.all(): msnrs = ma.masked_less(snrs, snr_threshold / 2) if msnrs.mask.all(): msnrs = ma.masked_less(snrs, snr_threshold / 2) mstd_devs = ma.masked_array(std_devs, mask=msnrs.mask) mresiduals = ma.masked_array(residuals, mask=msnrs.mask) # Normalize scores for comparison std_scores = (std_devs - np.nanmean(mstd_devs)) / np.nanstd(mstd_devs) residual_scores = (residuals - np.nanmean(mresiduals)) / np.nanstd(mresiduals) snr_scores = 1 / snrs noisy_indices = np.vstack( [std_scores > std_threshold, residual_scores > residual_threshold, snr_scores > 1 / snr_threshold]) print(noisy_indices.shape) noisy_indices = np.sum(noisy_indices, axis=0) print(noisy_indices) c = [] for l in noisy_indices: if l > 2: c.append('C3') else: c.append('C0') if showplt: nt = len(images) # Plot images fig_img, axs_img = plt.subplots(1, nt, figsize=(20, 5), sharex=True, sharey=True) img_vmin = 1e4 img_vmax = np.percentile(images, 99.995) for i, ax in enumerate(axs_img): ax.imshow(images[i], cmap='gray', norm=mcolors.LogNorm(vmin=img_vmin, vmax=img_vmax), origin='lower') ax.set_facecolor('black') # Plot scores fig, axs = plt.subplots(3, 1, figsize=(10, 8)) ax = axs[0] ax.scatter(np.arange(nt), std_scores, marker='o', c=c, label='Standard Deviation') ax.axhline(std_threshold, color='r', linestyle='--', label='Threshold') ax.set_xlabel('Image Index') ax.set_ylabel('Standard Deviation Score') ax.legend() ax = axs[1] ax.scatter(np.arange(nt), residual_scores, marker='o', c=c, label='Mean Residuals') ax.axhline(residual_threshold, color='r', linestyle='--', label='Threshold') ax.set_xlabel('Image Index') ax.set_ylabel('Mean Residual Score') ax.legend() ax = axs[2] ax.scatter(np.arange(nt), snr_scores, marker='o', c=c, label='SNR') ax.axhline(1 / snr_threshold, color='r', linestyle='--', label='Threshold') ax.set_xlabel('Image Index') ax.set_ylabel('SNR Score') ax.legend() return noisy_indices, std_scores, residual_scores, snr_scores
[docs] def is_factor_of_60_minutes(tdt): """ Check if tdt is a factor of 60 minutes or a harmonic of 60 minutes. :param tdt: Time duration to check. :type tdt: timedelta :return: True if tdt is a factor or harmonic of 60 minutes, False otherwise. :rtype: bool """ minutes = tdt.total_seconds() / 60 return 60 % minutes == 0 or minutes % 60 == 0
[docs] def generate_trange_series(tbg, ted, tdt, snap_to_full_hour=False): """ Generate a list of time ranges using pandas.date_range, with options to snap the ranges to full hours and adjust the first and last time range based on specific conditions. :param tbg: The start time. :type tbg: str or datetime-like :param ted: The end time. :type ted: str or datetime-like :param tdt: The duration of each time range. :type tdt: timedelta :param snap_to_full_hour: Whether to snap the time series to full hours, defaults to False. :type snap_to_full_hour: bool :return: A list of tuples, each representing the start and end time of a range. :rtype: list of tuple """ if not is_factor_of_60_minutes(tdt) and snap_to_full_hour == True: snap_to_full_hour = False log_print('WARNING', 'snap_to_full_hour is set to False because tdt is not a factor of 60 minutes or a harmonic of 60 minutes.') tbg_datetime = pd.to_datetime(tbg) ted_datetime = pd.to_datetime(ted) time_ranges = [] # Handle the first range differently to potentially snap its end to the next full hour first_end = tbg_datetime + tdt if snap_to_full_hour: if first_end.hour > tbg_datetime.hour or first_end > ted_datetime: first_end = first_end.replace(minute=0, second=0, microsecond=0) if first_end <= tbg_datetime: # In case tdt < 1 hour first_end += pd.Timedelta(hours=1) first_end = min(first_end, ted_datetime) time_ranges.append((tbg_datetime, first_end)) # Generate subsequent ranges, ensuring they start on the full hour when snap_to_full_hour is True current_time = first_end if snap_to_full_hour and current_time != ted_datetime: current_time = current_time.replace(minute=0, second=0, microsecond=0) while current_time < ted_datetime: end_time = min(current_time + tdt, ted_datetime) if snap_to_full_hour and end_time.hour > current_time.hour: end_time = end_time.replace(minute=0, second=0, microsecond=0) time_ranges.append((current_time, end_time)) current_time = end_time # Check for and handle a short first or last range if len(time_ranges) > 1: # If the first range is too short, merge it with the next one if (time_ranges[1][0] - time_ranges[0][0]) < (0.5 * tdt): time_ranges[1] = (time_ranges[0][0], time_ranges[1][1]) time_ranges.pop(0) # If the last range is too short, merge it with the previous one if (time_ranges[-1][1] - time_ranges[-1][0]) < (0.5 * tdt): time_ranges[-2] = (time_ranges[-2][0], time_ranges[-1][1]) time_ranges.pop() return time_ranges
[docs] def trange2timerange(trange): """ Convert a time range tuple in datetime format to a string representation. :param trange: A tuple containing start and end times as datetime objects. :type trange: tuple :return: A string representation of the time range. :rtype: str """ sttime, edtime = trange if isinstance(sttime, str): sttime = pd.to_datetime(sttime) if isinstance(edtime, str): edtime = pd.to_datetime(edtime) timerange = f"{sttime.strftime('%Y/%m/%d/%H:%M:%S')}~{edtime.strftime('%Y/%m/%d/%H:%M:%S')}" return timerange
[docs] def rotateimage(data, xc_centre, yc_centre, p_angle): """ Rotate an image around a specified point (xc_centre, yc_centre) by a given angle. :param data: The image data. :type data: numpy.ndarray :param xc_centre: The x-coordinate of the rotation center. :type xc_centre: int :param yc_centre: The y-coordinate of the rotation center. :type yc_centre: int :param p_angle: The rotation angle in degrees. :type p_angle: float :return: The rotated image. :rtype: numpy.ndarray """ padX = [data.shape[1] - xc_centre, xc_centre] padY = [data.shape[0] - yc_centre, yc_centre] imgP = np.pad(data, [padY, padX], 'constant') imgR = ndimage.rotate(imgP, p_angle, reshape=False, order=0, prefilter=False) return imgR[padY[0]:-padY[1], padX[0]:-padX[1]]
[docs] def sunpymap2helioimage(sunmap, out_image): """ Rotate a SunPy map from helioprojective to RA-DEC coordinates and write it to a CASA image format. :param sunmap: The input SunPy Map object to be rotated. :type sunmap: sunpy.map.Map :param out_image: The filepath for the output CASA image. :type out_image: str :return: The filepath to the output CASA image format. :rtype: str """ p_ang = sunmap.meta['p_angle'] * u.deg ia.open(out_image) data_rot = rotateimage(sunmap.data, int(sunmap.reference_pixel.x.value), int(sunmap.reference_pixel.y.value), -p_ang.to('deg').value) # data = ia.getchunk() data_rot = data_rot[np.newaxis, np.newaxis, :, :].T # indics = data_rot == 0.00 # data_rot[indics] = data[indics] ia.putchunk(data_rot) ia.close() return out_image
[docs] def solar_diff_rot_image(in_map, newtime, out_image, showplt=False): """ Reproject a SunPy map to account for solar differential rotation to a new observation time. :param in_map: The input SunPy Map object to be reprojected. :type in_map: sunpy.map.Map :param newtime: The new time to which the map is reprojected. :type newtime: astropy.time.Time :param out_image: The path for the output image file in CASA format. :type out_image: str :param showplt: Boolean flag to show plots of the original and reprojected maps, defaults to False. :type showplt: bool :return: The path to the output CASA image format. :rtype: str """ import astropy.units as u from astropy.coordinates import SkyCoord from astropy.wcs import WCS from sunpy.coordinates import Helioprojective, propagate_with_solar_surface ## sunpy reproject the map.date as the reference time. so we have to use the diff between the time of the each map and the ref map reftime = in_map.date + in_map.exposure_time / 2 out_time = in_map.date - (reftime - Time(newtime)) # out_frame = Helioprojective(observer='earth', obstime=out_time, # rsun=eomap_ref.coordinate_frame.rsun) out_frame = Helioprojective(observer=in_map.observer_coordinate, obstime=out_time, rsun=in_map.coordinate_frame.rsun) out_center = SkyCoord(0 * u.arcsec, 0 * u.arcsec, frame=out_frame) out_ref_pixel = [in_map.reference_pixel.x.value, in_map.reference_pixel.y.value] * in_map.reference_pixel.x.unit out_header = smap.make_fitswcs_header(in_map.data.shape, out_center, reference_pixel=out_ref_pixel, scale=u.Quantity(in_map.scale)) out_wcs = WCS(out_header) with propagate_with_solar_surface(): out_map, footprint = in_map.reproject_to(out_wcs, return_footprint=True) out_data = out_map.data out_data[footprint == 0] = in_map.data[footprint == 0] out_map = smap.Map(out_data, out_map.meta) out_map.meta['p_angle'] = in_map.meta['p_angle'] if showplt: fig = plt.figure(figsize=(12, 4)) ax1 = fig.add_subplot(121, projection=in_map) in_map.plot(axes=ax1, title='Original map') plt.colorbar() ax2 = fig.add_subplot(122, projection=out_map) out_map.plot(axes=ax2, title=f"Reprojected to an Earth observer {(Time(newtime) - reftime).to('day')} later") plt.colorbar() plt.show() sunpymap2helioimage(out_map, out_image) return out_image
[docs] def get_bmsize(cfreq, refbmsize=70.0, reffreq=1.0, minbmsize=4.0): """ Calculate the beam size at given frequencies based on a reference beam size at a reference frequency. This function supports both single frequency values and lists of frequencies. :param cfreq: Input frequencies in GHz, can be a float or a list of floats. :type cfreq: float or list :param refbmsize: Reference beam size in arcsec, defaults to 70.0. :type refbmsize: float, optional :param reffreq: Reference frequency in GHz, defaults to 1.0. :type reffreq: float, optional :param minbmsize: Minimum beam size in arcsec, defaults to 4.0. :type minbmsize: float, optional :return: Beam size at the given frequencies, same type as input (float or numpy array). :rtype: float or numpy.ndarray """ # Ensure cfreq is an array for uniform processing cfreq = np.array(cfreq, dtype=float) # Calculate beam size bmsize = refbmsize * reffreq / cfreq # Enforce minimum beam size bmsize = np.maximum(bmsize, minbmsize) # If the original input was a single float, return a single float if bmsize.size == 1: return bmsize.item() # Convert numpy scalar to Python float else: return bmsize
[docs] def calc_diskmodel(slashdate, nbands, freq, defaultfreq): from astropy.time import Time # Default disk size measured for 2019/09/03 # todo add monthly fitting procedure for the disk size and flux density defaultsize = np.array([990.6, 989.4, 988.2, 987.1, 986.0, 984.9, 983.8, 982.7, 981.7, 980.7, 979.7, 978.8, 977.8, 976.9, 976.0, 975.2, 974.3, 973.5, 972.7, 972.0, 971.2, 970.5, 969.8, 969.1, 968.5, 967.8, 967.2, 966.7, 966.1, 965.6, 965.1, 964.6, 964.1, 963.7, 963.3, 962.9, 962.5, 962.1, 961.8, 961.5, 961.3, 961.0, 960.8, 960.6, 960.4, 960.2, 960.1, 960.0, 959.9, 959.8]) # Get current solar distance and modify the default size accordingly try: from sunpy.coordinates.sun import earth_distance fac = earth_distance('2019/09/03') / earth_distance(slashdate) except: import sunpy.coordinates.ephemeris as eph fac = eph.get_sunearth_distance('2019/09/03') / eph.get_sunearth_distance(slashdate) newsize = defaultsize * fac.to_value() if nbands == 34: if Time(slashdate.replace('/', '-')).mjd < Time('2018-03-13').mjd: # Interpolate size to 31 spectral windows (bands 4-34 -> spw 0-30) newsize = np.polyval(np.polyfit(defaultfreq, newsize, 5), freq[3:]) else: # Dates between 2018-03-13 have 33 spectral windows newsize = np.polyval(np.polyfit(defaultfreq, newsize, 5), freq[[0] + list(range(2, 34))]) dsize = np.array([str(i)[:5] + 'arcsec' for i in newsize], dtype='U12') # These are nominal flux densities * 2, determined on 2019/09/03 defaultfdens = np.array([891282, 954570, 1173229, 1245433, 1373730, 1506802, 1613253, 1702751, 1800721, 1946756, 2096020, 2243951, 2367362, 2525968, 2699795, 2861604, 3054829, 3220450, 3404182, 3602625, 3794312, 3962926, 4164667, 4360683, 4575677, 4767210, 4972824, 5211717, 5444632, 5648266, 5926634, 6144249, 6339863, 6598018, 6802707, 7016012, 7258929, 7454951, 7742816, 7948976, 8203206, 8411834, 8656720, 8908130, 9087766, 9410760, 9571365, 9827078, 10023598, 8896671]) fdens = defaultfdens if nbands == 34: if Time(slashdate.replace('/', '-')).mjd < Time('2018-03-13').mjd: # Interpolate size to 31 spectal windows (bands 4-34 -> spw 0-30) fdens = np.polyval(np.polyfit(defaultfreq, fdens, 5), freq[3:]) else: # Dates between 2018-03-13 have 33 spectral windows fdens = np.polyval(np.polyfit(defaultfreq, fdens, 5), freq[[0] + list(range(2, 34))]) return dsize, fdens
[docs] def writediskxml(dsize, fdens, freq, xmlfile='SOLDISK.xml'): import xml.etree.ElementTree as ET # create the file structure sdk = ET.Element('SOLDISK') sdk_dsize = ET.SubElement(sdk, 'item') sdk_fdens = ET.SubElement(sdk, 'item') sdk_freqs = ET.SubElement(sdk, 'item') sdk_dsize.set('disk_size', ','.join(dsize)) sdk_fdens.set('flux_dens', ','.join(['{:.1f}Jy'.format(s) for s in fdens])) sdk_freqs.set('freq', ','.join(freq)) # create a new XML file with the results mydata = ET.tostring(sdk) if isinstance(mydata, bytes): mydata = mydata.decode() if os.path.exists(xmlfile): os.system('rm -rf ' + xmlfile) with open(xmlfile, 'w') as sf: sf.write(mydata) return xmlfile
[docs] def readdiskxml(xmlfile): import astropy.units as u import xml.etree.ElementTree as ET tree = ET.parse(xmlfile) root = tree.getroot() diskinfo = {} for elem in root: d = elem.attrib for k, v in d.items(): v_ = v.split(',') v_ = [u.Unit(f).to_string().split(' ') for f in v_] diskinfo[k] = [] for val, uni in v_: diskinfo[k].append(float(val)) diskinfo[k] = np.array(diskinfo[k]) * u.Unit(uni) return diskinfo
[docs] def gaussian2d(x, y, amplitude, x0, y0, sigma_x, sigma_y, theta): x0 = float(x0) y0 = float(y0) a = (np.cos(theta) ** 2) / (2 * sigma_x ** 2) + (np.sin(theta) ** 2) / (2 * sigma_y ** 2) b = -(np.sin(2 * theta)) / (4 * sigma_x ** 2) + (np.sin(2 * theta)) / (4 * sigma_y ** 2) c = (np.sin(theta) ** 2) / (2 * sigma_x ** 2) + (np.cos(theta) ** 2) / (2 * sigma_y ** 2) g = amplitude * np.exp(- (a * ((x - x0) ** 2) + 2 * b * (x - x0) * (y - y0) + c * ((y - y0) ** 2))) return g
[docs] def image_adddisk(eofile, diskinfo, edgeconvmode='frommergeddisk', caltbonly=False, bmfactor=2.0, overwrite=True): ''' :param eofile: input image FITS file :param diskinfo: disk information file :param edgeconvmode: edge convolve mode, 'frommergeddisk' or 'frombeam' :param caltbonly: calculate the Tb of the disk and return the value :param bmfactor: factor to multiply the beam major and minor axes to get the sigma of the Gaussian kernel :param overwrite: whether to overwrite the output fits file if it already exists :return: ''' from sunpy import map as smap from suncasa.utils import plot_mapX as pmX from scipy import constants import astropy.units as u from sunpy import io as sio dsize = diskinfo['disk_size'] fdens = diskinfo['flux_dens'] freqs = diskinfo['freq'] eomap = smap.Map(eofile) data = eomap.data # remember the data order is reversed due to the FITS convension if np.all(data == 0): log_print('WARNING', 'The input image is empty. Skipping...') return None, None, None eomap_ = pmX.Sunmap(eomap) header = eomap.meta bmaj = header['bmaj'] * 3600 * u.arcsec bmin = header['bmin'] * 3600 * u.arcsec cell = (header['cdelt1'] * u.Unit(header['cunit1']) + header['cdelt2'] * u.Unit(header['cunit2'])) / 2.0 bmsize = (bmaj + bmin) / 2.0 keys = list(header.keys()) values = list(header.values()) mapx, mapy = eomap_.map2wcsgrids(cell=False) mapx = mapx[:-1, :-1] mapy = mapy[:-1, :-1] rdisk = np.sqrt(mapx ** 2 + mapy ** 2) k_b = constants.k c_l = constants.c const = 2. * k_b / c_l ** 2 pix_area = (cell.to(u.rad).value) ** 2 jy_to_si = 1e-26 factor2 = 1. try: ## this works for uncompressed FITS faxis = keys[values.index('FREQ')][-1] crval_freq = header['CRVAL' + faxis] cdelt_freq = header['CDELT' + faxis] nu = crval_freq + cdelt_freq * (1 - header['CRPIX' + faxis]) nul = crval_freq + cdelt_freq * (1 - header['CRPIX' + faxis]) nuh = crval_freq + cdelt_freq * (header['NAXIS' + faxis] - header['CRPIX' + faxis]) cunit_freq = header['CUNIT' + faxis] except: ## this works for compressed FITS crval_freq = header['REFFRQ'] cdelt_freq = header['DLTFRQ'] try: cunit_freq = header['FQUNIT'] except: cunit_freq = 'Hz' nu = crval_freq nul = crval_freq - cdelt_freq / 2.0 nuh = crval_freq + cdelt_freq / 2.0 if caltbonly: edgeconvmode = '' if edgeconvmode == 'frommergeddisk': ## get the frequency range of the image nu_bound = (np.array([nul, nuh]) + 0.5 * np.array([-1, 1]) * cdelt_freq) * u.Unit( cunit_freq) nu_bound = nu_bound.to(u.GHz) ## get the frequencies of the disk models fidxs = np.logical_and(freqs > nu_bound[0], freqs < nu_bound[1]) ny, nx = rdisk.shape freqs_ = freqs[fidxs] fdens_ = fdens[fidxs] / 2.0 # divide by 2 because fdens is 2x solar flux density dsize_ = dsize[fidxs] fdisk_ = np.empty((len(freqs_), ny, nx)) fdisk_[:] = np.nan for fidx, freq in enumerate(freqs_): fdisk_[fidx, ...][rdisk <= dsize_[fidx].value] = 1.0 # nu = crval_freq + cdelt_freq * (1 - header['CRPIX' + faxis]) factor = const * freq.to(u.Hz).value ** 2 # SI unit jy2tb = jy_to_si / pix_area / factor * factor2 fdisk_[fidx, ...] = fdisk_[fidx, ...] / np.nansum(fdisk_[fidx, ...]) * fdens_[fidx].value fdisk_[fidx, ...] = fdisk_[fidx, ...] * jy2tb # # fdisk_[np.isnan(fdisk_)] = 0.0 tbdisk = np.nanmean(fdisk_, axis=0) tbdisk[np.isnan(tbdisk)] = 0.0 sig2fwhm = 2.0 * np.sqrt(2 * np.log(2)) * bmfactor x0, y0 = 0, 0 sigx, sigy = bmaj.value / sig2fwhm, bmin.value / sig2fwhm theta = -(90.0 - header['bpa']) * u.deg x = (np.arange(31) - 15) * cell.value y = (np.arange(31) - 15) * cell.value x, y = np.meshgrid(x, y) kernel = gaussian2d(x, y, 1.0, x0, y0, sigx, sigy, theta.to(u.radian).value) kernel = kernel / np.nansum(kernel) from scipy import signal tbdisk = signal.fftconvolve(tbdisk, kernel, mode='same') else: freqghz = nu / 1.0e9 factor = const * nu ** 2 # SI unit jy2tb = jy_to_si / pix_area / factor * factor2 p_dsize = np.poly1d(np.polyfit(freqs.value, dsize.value, 15)) p_fdens = np.poly1d( np.polyfit(freqs.value, fdens.value, 15)) / 2. # divide by 2 because fdens is 2x solar flux density if edgeconvmode == 'frombeam': from scipy.special import erfc factor_erfc = 2.0 ## erfc function ranges from 0 to 2 fdisk = erfc((rdisk - p_dsize(freqghz)) / bmsize.value) / factor_erfc else: fdisk = np.zeros_like(rdisk) fdisk[rdisk <= p_dsize(freqghz)] = 1.0 fdisk = fdisk / np.nansum(fdisk) * p_fdens(freqghz) tbdisk = fdisk * jy2tb tb_disk = np.nanmax(tbdisk) if caltbonly: return tb_disk else: datanew = data + tbdisk # datanew[np.isnan(data)] = 0.0 header['TBDISK'] = tb_disk header['TBUNIT'] = 'K' eomap_disk = smap.Map(datanew, header) nametmp = eofile.split('.') nametmp.insert(-1, 'disk') outfits = '.'.join(nametmp) datanew = datanew.astype(np.float32) if os.path.exists(outfits): if overwrite: os.system('rm -rf ' + outfits) else: eomap_disk, tb_disk, outfits sio.write_file(outfits, datanew, header) return eomap_disk, tb_disk, outfits
[docs] def mk_diskmodel(outname='disk', direction='J2000 10h00m00.0s 20d00m00.0s', reffreq='2.8GHz', flux=660000.0, eqradius='16.166arcmin', polradius='16.166arcmin', pangle='21.1deg', overwrite=True): ''' Create a blank solar disk model image (or optionally a data cube) outname String to use for part of the image and fits file names (default 'disk') direction String specifying the position of the Sun in RA and Dec. Default means use the standard string "J2000 10h00m00.0s 20d00m00.0s" reffreq The reference frequency to use for the disk model (the frequency at which the flux level applies). Default is '2.8GHz'. flux The flux density, in Jy, for the entire disk. Default is 66 sfu. eqradius The equatorial radius of the disk. Default is 16 arcmin + 10" (for typical extension of the radio limb) polradius The polar radius of the disk. Default is 16 arcmin + 10" (for typical extension of the radio limb) pangle The solar P-angle (geographic position of the N-pole of the Sun) in degrees E of N. This only matters if eqradius != polradius index The spectral index to use at other frequencies. Default None means use a constant flux density for all frequencies. cell The cell size (assumed square) to use for the image. The image size is determined from a standard radius of 960" for the Sun, divided by cell size, increased to nearest power of 512 pixels. The default is '2.0arcsec', which results in an image size of 1024 x 1024. Note that the frequency increment used is '325MHz', which is the width of EOVSA bands (not the width of individual science channels) ''' diskcl = outname + reffreq + '.cl' if os.path.exists(diskcl): if overwrite: os.system('rm -rf ' + diskcl) else: return diskcl cl = cltool() try: aspect = 1.01 # Enlarge the equatorial disk by 1% eqradius = qa.quantity(eqradius) diamajor = qa.quantity(2 * aspect * eqradius['value'], eqradius['unit']) polradius = qa.quantity(polradius) diaminor = qa.quantity(2 * polradius['value'], polradius['unit']) except: print('Radius', eqradius, polradius, 'does not have the expected format, number + unit where unit is arcmin or arcsec') return # Add 90 degrees to pangle, due to angle definition in addcomponent() -- it puts the majoraxis vertical pangle = qa.add(qa.quantity(pangle), qa.quantity('90deg')) # Flux density is split between XX and YY cl.addcomponent(dir=direction, flux=flux / 2.0, fluxunit='Jy', freq=reffreq, shape='disk', majoraxis=diamajor, minoraxis=diaminor, positionangle=pangle) cl.setrefdirframe(0, 'J2000') cl.rename(diskcl) cl.done() return diskcl
[docs] def insertdiskmodel(vis, sizescale=1.0, fdens=None, dsize=None, xmlfile='SOLDISK.xml', writediskinfoonly=False, active=False, overwrite=False): # Apply size scale adjustment (default is no adjustment) for i in range(len(dsize)): dsz = dsize[i] if isinstance(dsz, bytes): dsz = dsz.decode() num, unit = dsz.split('arc') dsize[i] = str(float(num) * sizescale)[:6] + 'arc' + unit msfile = vis ms.open(msfile) spwinfo = ms.getspectralwindowinfo() nspw = len(spwinfo.keys()) ms.done() diskcldir = 'diskcl/' if not os.path.exists(diskcldir): os.makedirs(diskcldir) frq = [] spws = range(nspw) for sp in spws: spw = spwinfo[str(sp)] frq.append('{:.4f}GHz'.format((spw['RefFreq'] + spw['TotalWidth'] / 2.0) / 1e9)) frq = np.array(frq) writediskxml(dsize, fdens, frq, xmlfile=xmlfile) if not writediskinfoonly: tb.open(msfile + '/FIELD') phadir = tb.getcol('PHASE_DIR').flatten() tb.close() ra = phadir[0] dec = phadir[1] direction = 'J2000 ' + str(ra) + 'rad ' + str(dec) + 'rad' diskcl = [] for sp in tqdm(spws, desc='Generating {} disk models'.format(nspw), ascii=True): diskcl.append( mk_diskmodel(outname=diskcldir + 'disk{:02d}_'.format(sp), direction=direction, reffreq=frq[sp], flux=fdens[sp], eqradius=dsize[sp], polradius=dsize[sp], overwrite=overwrite)) if not active: delmod(msfile, otf=True, scr=True) for sp in tqdm(spws, desc='Inserting disk model', ascii=True): ft(vis=msfile, spw=str(sp), field='', model="", nterms=1, reffreq="", complist=str(diskcl[sp]), incremental=False, usescratch=True) else: for sp in tqdm(spws, desc='Inserting disk model', ascii=True): model_ft = mstl.getmodel(msfile, spw=str(sp)) ft(vis=msfile, spw=str(sp), field='', model="", nterms=1, reffreq="", complist=str(diskcl[sp]), incremental=False, usescratch=True) model_disk = mstl.getmodel(msfile, spw=str(sp)) mstl.putmodel(msfile, spw=str(sp), model=model_ft + model_disk) return msfile, diskcl
[docs] def uvrange_uplim_from_freq(x, x0, x1, y0, y1): """ Calculate the upper limit of the UV range from frequency using linear interpolation. :param x: The frequency for which the UV range upper limit is calculated. :type x: float :param x0: The start frequency of the interpolation range. :type x0: float :param x1: The end frequency of the interpolation range. :type x1: float :param y0: The UV range upper limit at the start frequency. :type y0: float :param y1: The UV range upper limit at the end frequency. :type y1: float :return: The calculated UV range upper limit for the given frequency. :rtype: float """ return y0 + (x - x0) * ((y1 - y0) / (x1 - x0))
[docs] def disk_slfcal(msfile, tbg, ted, disk_params, workdir='./', overwrite=True, iterbands=False): """ Perform disk self-calibration on measurement set data. :param msfile: The measurement set file to be calibrated. :type msfile: str :param tbg: The beginning time of the calibration range. :type tbg: datetime :param ted: The end time of the calibration range. :type ted: datetime :param disk_params: A dictionary containing disk model parameters. :type disk_params: dict :param iterbands: Boolean flag to run gaincal iterating over frequency bands, defaults to False. iterbands = True is only useful when uvrange_uplim is used, which is currently not implemented. :type iterbands: bool :return: The filepath of the self-calibrated measurement set. :rtype: str """ # Add the reference model back to the visibility data # ft(vis=msfile, model=img_ref_model, usescratch=True) msfile_diskscl = msfile.replace('.ms', '.disk_slfcal.ms') msfile_diskscled = msfile.replace('.ms', '.disk_slfcaled.ms') if overwrite: if os.path.isdir(msfile_diskscled): shutil.rmtree(msfile_diskscled, ignore_errors=True) else: if os.path.isdir(msfile_diskscled): return msfile_diskscled freq = disk_params['freq'] dsize = disk_params['dsize'] fdens = disk_params['fdens'] diskxmlfile = disk_params['diskxmlfile'] msfile, diskcl = insertdiskmodel(msfile, dsize=dsize, fdens=fdens, xmlfile=diskxmlfile, active=True) log_print('INFO', 'First round of phase disk selfcal on the disk using solution interval inf') caltbs_dk = [] caltb = os.path.join(workdir, f"caltb_{tbg.strftime('%H%M')}-{ted.strftime('%H%M')}_dk1.pha") if os.path.isdir(caltb): shutil.rmtree(caltb, ignore_errors=True) if iterbands: appendmode = False for sp, fghz in enumerate(freq): if os.path.isdir(caltb): appendmode = True # uvrange_uplim = uvrange_uplim_from_freq(fghz, x0=2.8875, x1=17.1875, y0=0.2, y1=0.8) * 3 gaincal(vis=msfile, caltable=caltb, selectdata=True, spw=f'{sp}', # uvrange=f"<{uvrange_uplim:.1f}Klambda", uvrange="", antenna="0~12&0~12", # solint=tdtstr, solint='inf', combine="scan", interp="linear", refant="0", refantmode="flex", minsnr=1.0, gaintype="G", calmode="p", append=appendmode) else: gaincal(vis=msfile, caltable=caltb, selectdata=True, uvrange="", antenna="0~12&0~12", # solint=tdtstr, solint='inf', combine="scan", interp="linear", refant="0", refantmode="flex", minsnr=1.0, gaintype="G", calmode="p") if os.path.isdir(caltb): caltbs_dk.append(caltb) # plotms(vis='temp_20211124/caltb_1545-2334_dk1.pha', xaxis='freq', yaxis='phase', # iteraxis='antenna', coloraxis='', # plotrange=[-1, -1, -180, 180]) if len(caltbs_dk) > 0: log_print('INFO', 'Second round of phase disk selfcal on the disk using solution interval 1min') caltb = os.path.join(workdir, f"caltb_{tbg.strftime('%H%M')}-{ted.strftime('%H%M')}_dk2.pha") if os.path.isdir(caltb): shutil.rmtree(caltb, ignore_errors=True) if iterbands: appendmode = False for sp, fghz in enumerate(freq): if os.path.isdir(caltb): appendmode = True # uvrange_uplim = uvrange_uplim_from_freq(fghz, x0=2.8875, x1=17.1875, y0=0.2, y1=0.8) * 3 gaincal(vis=msfile, caltable=caltb, selectdata=True, spw=f'{sp}', # uvrange=f"<{uvrange_uplim:.1f}Klambda", uvrange="", antenna="0~12&0~12", solint='int', gaintable=caltbs_dk, combine="scan", interp="linear", refant="0", refantmode="flex", minsnr=1.0, gaintype="G", calmode="p", append=appendmode) else: gaincal(vis=msfile, caltable=caltb, selectdata=True, uvrange="", antenna="0~12&0~12", solint='int', gaintable=caltbs_dk, combine="scan", interp="linear", refant="0", refantmode="flex", minsnr=1.0, gaintype="G", calmode="p") if os.path.isdir(caltb): caltbs_dk.append(caltb) # plotms(vis='temp_20211124/caltb_1545-2334_dk2.pha', xaxis='time', yaxis='phase', # iteraxis='spw', coloraxis='antenna1', # plotrange=[-1, -1, -180, 180]) if len(caltbs_dk) > 0: log_print('INFO', 'Final round of amplitude disk selfcal using solution interval inf') caltb = os.path.join(workdir, f"caltb_{tbg.strftime('%H%M')}-{ted.strftime('%H%M')}_dk.amp") if os.path.isdir(caltb): shutil.rmtree(caltb, ignore_errors=True) if iterbands: appendmode = False for sp, fghz in enumerate(freq): if os.path.isdir(caltb): appendmode = True # uvrange_uplim = uvrange_uplim_from_freq(fghz, x0=2.8875, x1=17.1875, y0=0.2, y1=0.8) * 3 gaincal(vis=msfile, caltable=caltb, selectdata=True, spw=f'{sp}', # uvrange=f"<{uvrange_uplim:.1f}Klambda", uvrange="", antenna="0~12&0~12", solint='inf', gaintable=caltbs_dk, combine="scan", interp="linear", refant="10", refantmode="flex", minsnr=1.0, gaintype="G", calmode="a", append=appendmode) else: gaincal(vis=msfile, caltable=caltb, selectdata=True, uvrange="", antenna="0~12&0~12", solint='inf', gaintable=caltbs_dk, combine="scan", interp="linear", refant="10", refantmode="flex", minsnr=1.0, gaintype="G", calmode="a") if os.path.isdir(caltb): caltbs_dk.append(caltb) # plotms(vis='temp_20211124/caltb_1545-2334_dk.amp', xaxis='time', yaxis='amp', # iteraxis='spw', coloraxis='antenna1', # plotrange=[-1, -1, -1, -1]) # clearcal(msfile) if len(caltbs_dk) > 0: log_print('INFO', 'Applying disk self-calibration solutions to the data and split to a new ms') applycal(vis=msfile, selectdata=True, antenna="0~12", gaintable=caltbs_dk, interp="linear", calwt=False, applymode="calonly") if os.path.isdir(msfile_diskscl): shutil.rmtree(msfile_diskscl, ignore_errors=True) split(msfile, outputvis=msfile_diskscl, datacolumn="corrected") for sp, dkcl in tqdm(enumerate(diskcl), desc='Inserting disk model', ascii=True): ft(vis=msfile_diskscl, spw=str(sp), field='', model="", nterms=1, reffreq="", complist=str(dkcl), incremental=False, usescratch=True) log_print('INFO', 'Removing the disk model from the data. The residual will be ARs only') uvsub(vis=msfile_diskscl) split(msfile_diskscl, outputvis=msfile_diskscled, datacolumn="corrected") shutil.rmtree(msfile_diskscl, ignore_errors=True) else: log_print('WARNING', 'Failed to create calibration table for disk self-calibration. Proceeding without self-calibration...') msfile_diskscled = msfile return msfile_diskscled
[docs] def shift_corr(mmsfiles, trange_series, spws, imagemodel, imagemodel_fits, reftime_master, workdir='./', pols='XX', overwrite=False, do_featureslfcal=False, do_diskslfcal=True, disk_params={}, do_sbdcal=True, verbose=True): """ Corrects smearing effects in solar observation data by aligning them with a model image. :param mmsfiles: Paths to the measurement set files to be corrected. :type mmsfiles: list of str :param trange_series: Each tuple contains start and end times for the measurement set files. :type trange_series: list of tuple :param spws: Spectral window indices to be considered. :type spws: list of string :param imagemodel: Path to the model image in CASA measurement set format. :type imagemodel: str :param imagemodel_fits: Path to the model image in FITS format. :type imagemodel_fits: str :param reftime_master: Reference time for the rotation of the model image. :type reftime_master: astropy.time.Time :param workdir: Working directory for output files, defaults to './'. :type workdir: str :param do_featureslfcal: Boolean flag to perform feature self-calibration, defaults to False. :type do_featureslfcal: bool :param pols: Polarization types to be considered, defaults to 'XX'. :type pols: str :param overwrite: Boolean flag to overwrite existing files, defaults to False. :type overwrite: bool :param do_diskslfcal: Boolean flag to perform disk self-calibration, defaults to True. :type do_diskslfcal: bool :param disk_params: Dictionary containing parameters for disk self-calibration. :type disk_params: dict :param do_sbdcal: Boolean flag to perform single-band delay calibration, defaults to True. :type do_sbdcal: bool :param verbose: Boolean flag to enable verbose output, defaults to True. :type verbose: bool :return: Paths to the corrected measurement set files. :rtype: list of str """ # Load the model image maps and calculate the reference time imagemodel_map = smap.Map(imagemodel_fits) if isinstance(reftime_master, str): reftime_master = Time(reftime_master) # Calculate mid-points of time ranges for rotation trange_series_mid = [(start + (end - start) / 2) for start, end in trange_series] mmsfiles_rot = [] total_slots = len(trange_series) if verbose: print("=" * 64) print("-" * 64) print(f"Processing {total_slots} sub-MSs for solar rotation correction.") trange_series_bg = trange_series[0][0] trange_series_ed = trange_series[-1][1] trange_series_str = f"{trange_series_bg.strftime('%H%M')}-{trange_series_ed.strftime('%H%M')}UT" reftime_master_str = f"{reftime_master.to_datetime().strftime('%H%M')}UT" imagemodel_rot_master = [] for sp, immodel_map, immodel, immodel_fits in zip(spws, imagemodel_map, imagemodel, imagemodel_fits): spwstr = format_spw(sp) immodel_rot_master = os.path.join(workdir, f"eovsa_{trange_series_str}.s{spwstr}.rot2_{reftime_master_str}.master.model") if overwrite: if os.path.isdir(immodel_rot_master): shutil.rmtree(immodel_rot_master, ignore_errors=True) if not os.path.isdir(immodel_rot_master): shutil.copytree(immodel, immodel_rot_master, dirs_exist_ok=True) solar_diff_rot_image(immodel_map, reftime_master, immodel_rot_master) imagemodel_rot_master.append(immodel_rot_master) for tidx, (mmsfile, time_range, time_mid) in enumerate(zip(mmsfiles, trange_series, trange_series_mid)): tbg, ted = time_range trange = trange2timerange(time_range) if mmsfile is None: mmsfiles_rot.append(None) if verbose: log_print('INFO', f"Corrected {tidx + 1} of {total_slots}: No sub-MS file found for this timerange. Skipping...") continue mmsfile_basename = os.path.splitext(mmsfile.rstrip('/'))[0] msfile_rot = f"{mmsfile_basename}.shift_corrected.ms" if overwrite: if os.path.isdir(msfile_rot): shutil.rmtree(msfile_rot, ignore_errors=True) else: if os.path.exists(msfile_rot): mmsfiles_rot.append(msfile_rot) if verbose: log_print('INFO', f"Corrected {tidx + 1} of {total_slots}: {mmsfile} → {msfile_rot} (already exists and was not re-created)") continue for sp, immodel_map, immodel in zip(spws, imagemodel_map, imagemodel): spwstr = format_spw(sp) imfile_model_rot = os.path.join(workdir, f"eovsa_{tbg.strftime('%H%M')}-{ted.strftime('%H%M')}UT.s{spwstr}.rot.model") if not os.path.isdir(msfile_rot): if overwrite: if os.path.isdir(imfile_model_rot): shutil.rmtree(imfile_model_rot, ignore_errors=True) if not os.path.isdir(imfile_model_rot): shutil.copytree(immodel, imfile_model_rot, dirs_exist_ok=True) if verbose: log_print('INFO', f"Rotate the model image ({os.path.basename(immodel)}) to {time_mid.strftime('%H:%M')}UT") solar_diff_rot_image(immodel_map, time_mid, imfile_model_rot) if verbose: log_print('INFO', f"Add the rotated model ({os.path.basename(imfile_model_rot)}) to the visibility data") ft(vis=mmsfile, model=imfile_model_rot, spw=sp, usescratch=True) if do_featureslfcal: if verbose: log_print('INFO', 'Performing feature self-calibration') mmsfile_slfcaled = f"{mmsfile_basename}.slfcaled.ms" caltb = os.path.join(workdir, f"caltb_{tbg.strftime('%H%M')}-{ted.strftime('%H%M')}.pha") if overwrite: if os.path.isdir(mmsfile_slfcaled): shutil.rmtree(mmsfile_slfcaled, ignore_errors=True) if os.path.isdir(caltb): shutil.rmtree(caltb, ignore_errors=True) if verbose: log_print('INFO', 'Attempt feature self-calibration starting with a uvrange > 1.5 klambda') if not os.path.isdir(caltb): if pols == 'XXYY': mstl.gaincalXY(vis=mmsfile, caltable=caltb, pols=pols, selectdata=True, timerange=trange, uvrange='>1.5klambda', combine="scan", antenna='0~12&0~12', refant='0', solint='inf', refantmode="strict", gaintype='G', minsnr=1.0, calmode='p', append=False) else: gaincal(vis=mmsfile, caltable=caltb, selectdata=True, timerange=trange, uvrange='>1.5klambda', combine="scan", antenna='0~12&0~12', refant='0', solint='inf', refantmode="strict", gaintype='G', minsnr=1.0, calmode='p', append=False) if not os.path.isdir(caltb): if verbose: log_print('INFO', 'The first feature self-calibration attempt fails, try again with no uvrange condition') if pols == 'XXYY': mstl.gaincalXY(vis=mmsfile, caltable=caltb, pols=pols, selectdata=True, timerange=trange, uvrange='', combine="scan", antenna='0~12&0~12', refant='0', solint='inf', refantmode="strict", gaintype='G', minsnr=1.0, calmode='p', append=False) else: gaincal(vis=mmsfile, caltable=caltb, selectdata=True, timerange=trange, uvrange='', combine="scan", antenna='0~12&0~12', refant='0', solint='inf', refantmode="strict", gaintype='G', minsnr=1.0, calmode='p', append=False) if do_sbdcal: caltb_k = os.path.join(workdir, f"caltb_{tbg.strftime('%H%M')}-{ted.strftime('%H%M')}.sbd") if os.path.isdir(caltb_k): os.system('rm -rf ' + caltb_k) if pols == 'XXYY': mstl.gaincalXY(vis=mmsfile, caltable=caltb_k, pols=pols, selectdata=True, timerange=trange, uvrange='', combine="scan", antenna='0~12&0~12', refant='0', solint='inf', refantmode="strict", gaintype='K', calmode='p', minblperant=4, minsnr=2, append=False) else: gaincal(vis=mmsfile, caltable=caltb_k, selectdata=True, timerange=trange, uvrange='', combine="scan", antenna='0~12&0~12', refant='0', solint='inf', refantmode="strict", gaintype='K', minsnr=2, calmode='p', minblperant=4, append=False) if os.path.isdir(caltb): if do_sbdcal: if os.path.isdir(caltb_k): caltb = [caltb, caltb_k] log_print('INFO', 'Applying feature self-calibration solutions to the data and split to a new ms') applycal(vis=mmsfile, selectdata=True, antenna="0~12", gaintable=caltb, interp="linear", calwt=False, applymode="calonly") # split the corrected data to a new ms mstl.splitX(mmsfile, outputvis=mmsfile_slfcaled, datacolumn="corrected", datacolumn2="model_data") shutil.rmtree(mmsfile, ignore_errors=True) mmsfile = mmsfile_slfcaled else: if verbose: log_print('INFO', f'Feature self-calibration failed for {mmsfile}. Proceeding without self-calibration...') uvsub(vis=mmsfile) # Leaves only the residual in the corrected data column delmod(vis=mmsfile) # Add the reference model back to the visibility data for sp, immodel_rot_master in zip(spws, imagemodel_rot_master): ft(vis=mmsfile, model=immodel_rot_master, spw=sp, usescratch=True) uvsub(vis=mmsfile, reverse=True) # Now the viz data contains residuals + corrected models # Generate the corrected MS file if os.path.isdir(msfile_rot): shutil.rmtree(msfile_rot, ignore_errors=True) mstl.splitX(vis=mmsfile, outputvis=msfile_rot, datacolumn='corrected', datacolumn2='model_data') shutil.rmtree(mmsfile, ignore_errors=True) if do_diskslfcal: msfile_rot = disk_slfcal(msfile_rot, tbg, ted, disk_params, workdir) mmsfiles_rot.append(msfile_rot) if verbose: log_print('INFO', f"Corrected {tidx + 1} of {total_slots}: {mmsfile} → {msfile_rot}") return mmsfiles_rot
[docs] def split_mms(msname, timerange_series, spw='', workdir='./', overwrite=False, verbose=True): """ Splits a measurement set into multiple subsets based on specified time ranges. :param msname: Path to the original measurement set. :type msname: str :param timerange_series: Time ranges for each split, with each tuple containing start and end datetime objects. :type timerange_series: list of tuple :param spw: Spectral window specification for the splitting process, defaults to ''. :type spw: str :param workdir: Target directory for saving the split measurement sets, defaults to './'. :type workdir: str :param overwrite: Boolean flag to overwrite existing files, defaults to False. :type overwrite: bool :param verbose: Boolean flag to enable verbose output, defaults to True. :type verbose: bool :return: Paths to the created split measurement sets, corresponding to each specified time range. :rtype: list of str """ tseries_beg = timerange_series[0][0] tseries_end = timerange_series[-1][1] tseries_range = f"{tseries_beg.strftime('%Y/%m/%d/%H:%M:%S')}-{tseries_end.strftime('%Y/%m/%d/%H:%M:%S')}UT" mmsfiles = [] total_slots = len(timerange_series) if verbose: log_print('INFO', f"Initiating split of {msname} at {tseries_range} into {total_slots} segments") for tidx, (start, end) in enumerate(timerange_series): timerange = f"{start.strftime('%Y/%m/%d/%H:%M:%S')}~{end.strftime('%Y/%m/%d/%H:%M:%S')}" outputvis = os.path.join(workdir, f"eovsa_{start.strftime('%H%M')}-{end.strftime('%H%M')}.ms") if overwrite: if os.path.isdir(outputvis): shutil.rmtree(outputvis, ignore_errors=True) else: if os.path.isdir(outputvis): mmsfiles.append(outputvis) if verbose: log_print('INFO', f"Sub-MS {tidx + 1}/{total_slots} created: {outputvis} (Time range: {timerange}) (already exists and was not re-created)") continue try: split(vis=msname, timerange=timerange, spw=spw, datacolumn='data', correlation='XX', outputvis=outputvis) mmsfiles.append(outputvis) if verbose: log_print('INFO', f"Sub-MS {tidx + 1}/{total_slots} created: {outputvis} (Time range: {timerange})") except Exception as e: if verbose: log_print('ERROR', f"Failed to create sub-MS {tidx + 1} for {timerange} due to {e}") mmsfiles.append(None) if verbose: processed_count = sum(1 for f in mmsfiles if f is not None) log_print('INFO', f"Completed: {processed_count}/{total_slots} sub-MSs successfully processed") return mmsfiles
[docs] def all_paths_exist(paths): count = sum(1 for path in paths if os.path.exists(path)) return count == len(paths)
[docs] def ant_trange(vis): ''' Figure out nominal times for tracking of old EOVSA antennas, and return time range in CASA format ''' from eovsapy import eovsa_array as ea from astropy.time import Time # Get timerange from the visibility file # msinfo = dict.fromkeys(['vis', 'scans', 'fieldids', 'btimes', 'btimestr', 'inttimes', 'ras', 'decs', 'observatory']) ms.open(vis) # metadata = ms.metadata() scans = ms.getscansummary() ms.done() sk = sorted(list(scans.keys())) vistrange = np.array([scans[sk[0]]['0']['BeginTime'], scans[sk[-1]]['0']['EndTime']]) # Get the Sun transit time, based on the date in the vis file name (must have UDByyyymmdd in the name) aa = ea.eovsa_array() date = vis.split('UDB')[-1][:8] slashdate = date[:4] + '/' + date[4:6] + '/' + date[6:8] aa.date = slashdate sun = aa.cat['Sun'] mjd_transit = Time(aa.next_transit(sun).datetime(), format='datetime').mjd # Construct timerange limits based on +/- 3h55m from transit time (when all dishes are nominally tracking) # and clip the visibility range not to exceed those limits mjdrange = np.clip(vistrange, mjd_transit - 0.1632, mjd_transit + 0.1632) trange = Time(mjdrange[0], format='mjd').iso[:19] + '~' + Time(mjdrange[1], format='mjd').iso[:19] trange = trange.replace('-', '/').replace(' ', '/') return trange
[docs] def format_spw(spw): """ Formats the spectral window (spw) string for file naming, ensuring start and end values are separated by a dash and zero-padded to two digits. :param str spw: The spectral window in the format "start~end", where start and end are integers. :return: A formatted string with start and end values zero-padded to two digits, separated by a dash. :rtype: str """ return '-'.join(['{:02d}'.format(int(sp_)) for sp_ in spw.split('~')])
[docs] def rm_imname_extensions(imname, keep_ext=[], verbose=False): """ Remove directories and files that match the base image name (`imname`) with specific extensions. :param imname: The base name of images/directories to remove specific extensions for. :type imname: str :param keep_ext: A list of extensions to keep. Default is an empty list. :type keep_ext: list :param verbose: If True, print detailed information about the removal process. Default is False. :type verbose: bool """ # List of known extensions to match and remove extensions = ['.psf', '.pb', '.residual', '.mask', '.prev.mask', '.sumwt', '.model', '.image', '.image.pbcor'] for ext in keep_ext: extensions.remove(ext) # Loop through each extension and remove the matching directories or files for ext in extensions: # Construct the full pattern for each extension, including directories and files pattern_dir = f"{imname}{ext}/" # Pattern for directories pattern_file = f"{imname}{ext}" # Pattern for files # Find all matches for the current extension matching_dirs = glob(pattern_dir) matching_files = glob(pattern_file) # Remove each matched directory for dir_path in matching_dirs: if os.path.isdir(dir_path): # Double-check if it's a directory if verbose: log_print('INFO', f"Removing directory: {dir_path}") shutil.rmtree(dir_path) else: if verbose: log_print('INFO', f"Expected a directory but found none: {dir_path}") # Remove each matched file for file_path in matching_files: if os.path.isfile(file_path): # Double-check if it's a file if verbose: log_print('INFO', f"Removing file: {file_path}") os.remove(file_path) else: if verbose: log_print('INFO', f"Expected a file but found none: {file_path}")
[docs] def check_image_zeros(imname): """ Check if the image contains non-zero data. """ ia = iatool() ia.open(imname) data = ia.getchunk(getmask=True) ia.close() return np.count_nonzero(data) <= 1
[docs] def format_param(param): if isinstance(param, str): return f'"{param}"' else: return str(param)
[docs] def run_tclean_automasking(vis, sp, trange, uvrange, datacolumn, imname, imsize, cell, stokes, scales, niter, reffreq, pbcor, savemodel, usemask, restoringbeam, sidelobethreshold, noisethreshold, lownoisethreshold, minbeamfrac, negativethreshold, growiterations): """ Wrapper function for the tclean task in CASA. :param str vis: Name of the visibility data set to be imaged. :param str sp: Spectral window selection. :param str trange: Time range selection. :param str uvrange: UV range selection. :param str datacolumn: Specifies which data column to use. :param str imname: Name of the output image. :param list imsize: Image size in pixels. :param list cell: Cell size specification. :param str stokes: Stokes parameters to image. :param list scales: Scales for multi-scale clean. :param int niter: Maximum number of iterations. :param float reffreq: Reference frequency for the image. :param bool pbcor: Specifies whether to perform primary beam correction. :param str savemodel: Specifies whether to save the model. :param bool usemask: Specifies whether to use auto-masking. :param list restoringbeam: Restoring beam parameters. :param float sidelobethreshold: Side lobe threshold for auto-masking. :param float noisethreshold: Noise threshold for auto-masking. :param float lownoisethreshold: Low noise threshold for auto-masking. :param float negativethreshold: Negative threshold for auto-masking. :param int growiterations: Number of grow iterations for auto-masking. """ params_str = ', '.join([ f'vis={format_param(vis)}', f'spw={format_param(sp)}', f'timerange={format_param(trange)}', f'uvrange={format_param(uvrange)}', f'datacolumn={format_param(datacolumn)}', f'imagename={format_param(imname)}', f'imsize={format_param(imsize)}', f'cell={format_param(cell)}', f'stokes={format_param(stokes)}', f'scales={format_param(scales)}', f'niter={format_param(niter)}', f'reffreq={format_param(reffreq)}', f'pbcor={format_param(pbcor)}', f'savemodel={format_param(savemodel)}', f'usemask={format_param(usemask)}', f'restoringbeam={format_param(restoringbeam)}', f'sidelobethreshold={format_param(sidelobethreshold)}', f'noisethreshold={format_param(noisethreshold)}', f'lownoisethreshold={format_param(lownoisethreshold)}', f'minbeamfrac={format_param(minbeamfrac)}', f'negativethreshold={format_param(negativethreshold)}', f'growiterations={format_param(growiterations)}' ]) log_print('INFO', f'Running tclean with parameters: {params_str}') tclean(vis=vis, selectdata=True, spw=sp, timerange=trange, uvrange=uvrange, antenna="0~12&0~12", datacolumn=datacolumn, imagename=imname, imsize=imsize, cell=cell, stokes=stokes, projection="SIN", specmode="mfs", interpolation="linear", deconvolver="multiscale", scales=scales, nterms=2, smallscalebias=0.6, restoration=True, weighting="briggs", robust=0.0, niter=niter, gain=0.05, reffreq=reffreq, savemodel=savemodel, pbcor=pbcor, veltype='radio', outframe='TOPO', restoringbeam=restoringbeam, usemask=usemask, pbmask=0.0, sidelobethreshold=sidelobethreshold, noisethreshold=noisethreshold, lownoisethreshold=lownoisethreshold, minbeamfrac=minbeamfrac, negativethreshold=negativethreshold, smoothfactor=1.0, cutthreshold=0.01, growiterations=growiterations, dogrowprune=True, minpercentchange=-1.0)
[docs] class FrequencySetup: """ Manages frequency setup based on observation date for radio astronomy imaging. This class is initialized with an observation time and calculates essential frequency parameters such as effective observing frequencies (eofreq) and spectral windows (spws) based on the observation date. It provides methods to calculate reference frequency and bandwidth for given spectral windows. :param tim: Observation time used to determine the frequency setup. :type tim: astropy.time.Time Attributes: - tim (astropy.time.Time): The observation time. - spw2band (numpy.ndarray): An array mapping spectral window indices to band numbers. - bandwidth (float): The bandwidth in GHz. - defaultfreq (numpy.ndarray): The default effective observing frequencies in GHz. - nbands (int): Number of bands. - eofreq (numpy.ndarray): Effective observing frequencies based on the observation time. - spws (list of str): Spectral window selections based on the observation time. :Example: >>> from astropy.time import Time >>> tim = Time('2022-01-01T00:00:00', format='isot') >>> freq_setup = FrequencySetup(tim) >>> crval, cdelt = freq_setup.get_reffreq_and_cdelt('5~10') >>> print(crval, cdelt) """ def __init__(self, tim=None): if tim is None: tim = Time.now()
[docs] self.tim = tim
[docs] self.spw2band = np.array([0, 1] + list(range(4, 52)))
[docs] self.bandwidth = 0.325 # 325 MHz
[docs] self.defaultfreq = 1.1 + self.bandwidth * (self.spw2band + 0.5)
if self.tim.mjd > 58536: self.nbands = 52 self.eofreq = self.defaultfreq self.spws = ['0~1', '2~4', '5~10', '11~20', '21~30', '31~40', '41~49'] else: self.bandwidth = 0.5 # 500 MHz self.nbands = 34 self.eofreq = 1.419 + np.arange(self.nbands) * self.bandwidth self.spws = ['1~3', '4~9', '10~16', '17~24', '25~30']
[docs] def get_reffreq_and_cdelt(self, spw): """ Calculates the reference frequency (CRVAL) and the frequency delta (CDELT) for a given spectral window range. This method takes a spectral window selection and computes the mean of the effective observing frequencies (eofreq) within that range as the reference frequency. It also calculates the bandwidth covered by the spectral window range as the frequency delta. :param spw: Spectral window selection, specified as a range 'start~end' or a single value. :type spw: str :return: A tuple containing the reference frequency and frequency delta, both in GHz. :rtype: (str, str) :Example: >>> crval, cdelt = freq_setup.get_reffreq_and_cdelt('5~10') >>> print(crval, cdelt) """ sp_st, sp_ed = (int(s) for s in spw.split('~')) if '~' in spw else (int(spw), int(spw)) crval = f'{np.mean(self.eofreq[sp_st:sp_ed + 1]):.4f}GHz' nband = (self.eofreq[sp_ed] - self.eofreq[sp_st]) / self.bandwidth + 1 cdelt = f'{nband * self.bandwidth:.4f}GHz' return crval, cdelt
[docs] def fd_images(vis, cleanup=False, image_marker='', timerange='', niter=None, cell=['2.5arcsec'], imsize=[1024], spws=['0~1', '2~4', '5~10', '11~20', '21~30', '31~43'], imgoutdir='./', bright=None, stokes="XX", uvrange='', toTb=True, pbcor=True, datacolumn='data', savemodel="none", usemask='auto-multithresh', overwrite=False, compress=False, dryrun=False): """ Generates full-disk images, optionally cleans up interim images, and performs image registration. This function creates full-disk solar images based on visibility data, performs an optional cleanup of interim images, and aligns the resulting images to a standard solar disk model. It supports dynamic image size, cell size, and spectral window (SPW) selection. The function also handles the creation of FITS files from the generated images, ensuring the reference frequency in the FITS header is calculated as the middle of the selected frequency range. :param str vis: Path to the visibility data. :param bool cleanup: If True, deletes interim images after processing. Default is False. :param str image_marker: Additional identifier for the image name. Default is ''. :param str timerange: Range of time to select from data. Default is ''. :param int niter: Number of iterations for the cleaning algorithm. If None, defaults to 5000. :param list cell: Size of the image cell, e.g., ['2.5arcsec']. :param list imsize: Dimensions of the output image, e.g., [1024]. :param list spws: Spectral windows to process, specified as ranges, e.g., ['0~1', '2~5']. :param str imgoutdir: Output directory for the resulting images and FITS files. Defaults to './'. :param list bright: Flags to indicate brightness processing per SPW. Defaults to all True. :param str stokes: Stokes parameter to use. Default is "XX". :param str uvrange: UV range to select from data. Default is ''. :param bool toTb: If True, converts image to temperature scale. Default is True. :param bool pbcor: If True, applies primary beam correction. Default is True. :param str datacolumn: Data column to use from the visibility data. Default is 'data'. :param str savemodel: Options for saving the model visibilities. Choices are 'none', 'virtual', and 'modelcolumn'. Default is 'none'. :param bool overwrite: If True, overwrites existing FITS files. Default is False. :param bool dryrun: If True, only returns the paths to the generated images. No actual image processing is performed. Default is False. :return: A tuple containing two lists: paths to the generated FITS files and paths to the CASA image files. :rtype: (list of str, list of str) .. note:: 1. The reference frequency in the FITS header is calculated as the middle of the selected frequency range. This ensures accurate representation of the frequency information in the generated images. 2. The function internally manages directory creation for images, applies multiscale cleaning based on initial beam size determination, and uses default or specified scales for image cleaning. Errors in beam size determination lead to fallback on default scales. The function finally aligns and converts the images to FITS format, with options for temperature scale conversion and phase center alignment. """ # Check if "images" directory exists (if not, create it and mark it for later deletion) if pbcor: imext = '.image.pbcor' else: imext = '.image' imgtmpdir = os.path.join(imgoutdir, 'images') try: if os.stat(imgtmpdir): rm_images = False # Mark as not removeable except: os.mkdir(imgtmpdir) if cleanup: rm_images = True # Mark as removeable else: rm_images = False # Mark as not removeable if timerange == '': trange = ant_trange(vis) (tstart, tend) = trange.split('~') tbg = Time(qa.quantity(tstart, 's')['value'], format='mjd').to_datetime() fitsname_prefix = f'eovsa_{tbg.strftime("%Y%m%d")}' else: trange = timerange (tstart, tend) = timerange.split('~') tbg = Time(qa.quantity(tstart, 's')['value'], format='mjd').to_datetime() ted = Time(qa.quantity(tend, 's')['value'], format='mjd').to_datetime() tdt = ted - tbg if tdt.total_seconds() >= 60: tdtstr = f'{int(tdt.total_seconds() / 60)}min' else: tdtstr = f'{int(tdt.total_seconds())}sec' fitsname_prefix = f'eovsa_{tbg.strftime("%Y%m%dT%H%M%S")}_{tdtstr}' freq_setup = FrequencySetup(Time(tbg)) # eofreq, spws, bandwidth = get_eofreq(vis) if image_marker != '': image_marker = image_marker.lstrip('.') image_marker = f'.{image_marker}' if toTb: fitsname_suffix = '.tb.fits' else: fitsname_suffix = '.fits' if niter is None: niter = 5000 if bright is None: bright = [True] * len(spws) imagefile = [] fitsfile = [] cellvalue = np.float_(cell[0].replace('arcsec', '')) for s, sp in enumerate(spws): if bright[s]: spwstr = format_spw(sp) imname = os.path.join(imgtmpdir, f"{fitsname_prefix}.s{spwstr}{image_marker}") outfits = os.path.join(imgoutdir, f"{fitsname_prefix}.s{spwstr}{image_marker}{fitsname_suffix}") if overwrite: if os.path.exists(outfits): os.remove(outfits) else: if os.path.exists(outfits): imagefile.append(imname + imext) fitsfile.append(outfits) log_print('INFO', f'Processing tclean for SPW {spwstr} -- (timerange: {timerange}) image_marker: {image_marker} -- {outfits} already exists. Skipping.') continue if dryrun: imagefile.append(imname + imext) fitsfile.append(outfits) continue log_print('INFO', f'Processing tclean for SPW {spwstr} -- (timerange: {timerange}) image_marker: {image_marker}') if os.path.exists(imname + imext): rm_imname_extensions(imname) reffreq, cdelt4_real = freq_setup.get_reffreq_and_cdelt(sp) bmsz = get_bmsize(float(reffreq.rstrip('GHz')), minbmsize=5.0) scalesfactor = [0, 1, 3] ## [ 0, 1xbeam, 3xbeam] scales = [np.ceil(l * bmsz / cellvalue) for l in scalesfactor] log_print('INFO', f'Applying scales {scales} for multiscale clean, determined by pre-defined beam size.') restoringbeam = [f'{bmsz:.3f}arcsec'] try: initial_params = {'sidelobethreshold': 1.5, 'noisethreshold': 2.5, 'lownoisethreshold': 1.5, 'minbeamfrac': 0.3, 'negativethreshold': 5.0, 'growiterations': 75} secondary_params = {'sidelobethreshold': 1.0, 'noisethreshold': 2.0, 'lownoisethreshold': 1.0, 'minbeamfrac': 0.2, 'negativethreshold': 3.0, 'growiterations': 75} final_params = {'sidelobethreshold': 0.5, 'noisethreshold': 1.0, 'lownoisethreshold': 0.5, 'minbeamfrac': 0.2, 'negativethreshold': 0.0, 'growiterations': 50} log_print('INFO', 'Initial tclean run') rm_imname_extensions(imname) # if len(eofreq) > 0: run_tclean_automasking(vis, sp, trange, uvrange, datacolumn, imname, imsize, cell, stokes, scales, niter, reffreq, pbcor, savemodel, usemask, restoringbeam, **initial_params) if check_image_zeros(imname + imext): log_print('INFO', 'Second tclean run with adjusted parameters') rm_imname_extensions(imname) run_tclean_automasking(vis, sp, trange, uvrange, datacolumn, imname, imsize, cell, stokes, scales, niter, reffreq, pbcor, savemodel, usemask, restoringbeam, **secondary_params) if check_image_zeros(imname + imext): log_print('INFO', 'Final tclean run with further adjusted parameters') rm_imname_extensions(imname) run_tclean_automasking(vis, sp, trange, uvrange, datacolumn, imname, imsize, cell, stokes, scales, niter, reffreq, pbcor, usemask, restoringbeam, savemodel, **final_params) if check_image_zeros(imname + imext): log_print('INFO', 'tclean run failed with automasking. Trying without automasking...') rm_imname_extensions(imname) tclean(vis=vis, selectdata=True, spw=sp, timerange=trange, uvrange=uvrange, antenna="0~12&0~12", datacolumn=datacolumn, imagename=imname, imsize=imsize, cell=cell, stokes=stokes, projection="SIN", specmode="mfs", interpolation="linear", deconvolver="multiscale", pbcor=True, restoration=True, weighting="briggs", robust=0.0, niter=niter, gain=0.05, savemodel=savemodel, usemask='user', pbmask=0.0) if check_image_zeros(imname + imext): log_print('WARNING', f'Final tclean run for SPW {spwstr} produced an image with all zeros. Skipping...') imagefile.append(None) fitsfile.append(None) ## the cdelt value of the frequency axis is not correct in the image header, so we need to correct it. ctype4 = imhead(imname + imext, mode='get', hdkey='ctype4') if ctype4 == 'Frequency': cdelt4 = qa.convert(imhead(imname + imext, mode='get', hdkey='cdelt4'), "GHz") if cdelt4_real != '': log_print('INFO', f"Correcting cdelt4 from {cdelt4['value']:.4f}GHz to {cdelt4_real}") cdelt4_new = qa.quantity(cdelt4_real) imhead(imname + imext, mode='put', hdkey='cdelt4', hdvalue=cdelt4_new) if os.path.exists(outfits): os.remove(outfits) imagefile.append(imname + imext) fitsfile.append(outfits) except Exception as e: log_print('ERROR', f'Failed to generate image for SPW {spwstr} due to {e}') imagefile.append(None) fitsfile.append(None) if dryrun: return fitsfile, imagefile # Check if all elements in imagefile are None if all(item is None for item in imagefile): log_print('WARNING', "All elements in imagefile are None. Skipping imreg.") else: hf.imreg(vis=vis, imagefile=imagefile, fitsfile=fitsfile, timerange=[trange] * len(fitsfile), toTb=toTb, usephacenter=False, overwrite=overwrite, docompress=compress) if image_marker == '.init': rm_imname_extensions(imname, keep_ext=[imext, '.model']) else: rm_imname_extensions(imname) if rm_images: shutil.rmtree(imgtmpdir) # Remove all images and the folder named images return fitsfile, imagefile
[docs] def merge_FITSfiles(fitsfilesin, outfits, exptime_weight=False, snr_weight=None,deselect_index=None, suppress_ondiskres=False, suppress_thrshd=0.3, overwrite=True): """ Merges multiple FITS files into a single output file by calculating the mean of stacked data. This function stacks the data from a list of input FITS files along a new axis, optionally applying exposure time weighting and suppression of on-disk residuals based on a threshold relative to the disk brightness temperature. The mean of the stacked data is then written to a specified output FITS file. Parameters ---------- fitsfilesin : list of str List of file paths to the input FITS files to be merged. outfits : str File path for the output FITS file where the merged data will be saved. exptime_weight : bool, optional If True, the exposure time is used as a weight for calculating the mean of the stacked data. Defaults to False. snr_weight : list of float, optional List of signal-to-noise ratios (SNRs) to use as weights for calculating the mean of the stacked data. If both exposure time and SNR weights are provided, the weights will be the product of the two. Defaults to None. deselect_index : list of int, optional index of the images to be deselected. Defaults to None. suppress_ondiskres : bool, optional If True, suppresses on-disk residuals based on `suppress_thrshd`. Defaults to False. suppress_thrshd : float, optional Threshold for suppression of on-disk residuals, expressed as a fraction of the disk brightness temperature. May suppress weak but real emission. Defaults to 0.3. overwrite : bool, optional If True, the output file is overwritten if it already exists. Defaults to True. Raises ------ ValueError If any of the input FITS files cannot be read. Returns ------- None Notes ----- The suppression of on-disk residuals is achieved by applying a sigmoid function to pixels within a specified range of the disk brightness temperature, effectively reducing the contribution of residuals while preserving the overall structure. Examples -------- Merge three FITS files without exposure time weighting and with on-disk residual suppression: >>> merge_FITSfiles(['sun_01.fits', 'sun_02.fits', 'sun_03.fits'], 'sun_merged.fits', ... exptime_weight=False, suppress_ondiskres=True, suppress_thrshd=0.3, overwrite=True) """ data_stack = [] exptimes = [] for file in fitsfilesin: meta_, data_ = ndfits.read(file) if data_ is not None: data = data_ meta = meta_ exptimes.append(meta['header']['EXPTIME']) data_stack.append(np.squeeze(data)) data_stack = np.dstack(data_stack) mdata_stack = ma.masked_invalid(data_stack) if suppress_ondiskres: import copy from sunpy.map.maputils import all_coordinates_from_map, coordinate_is_on_solar_disk def sigmoid(x): return 1 / (1 + np.exp(-x)) mdata_stack_ = copy.deepcopy(mdata_stack) tbdisk = meta['header']['tbdisk'] # Step 1: Identify the pixels in the range tbdisk to 1.1*tbdisk threshd = suppress_thrshd # 30% of the disk brightness temperature mask = np.logical_and(mdata_stack_ >= tbdisk, mdata_stack_ <= (1 + threshd) * tbdisk) # Step 2: Calculate the suppression factor # Define the transition boundaries lower_bound = tbdisk upper_bound = (1 + threshd) * tbdisk # Normalize the data to the range [0, 1] normalized_data = (mdata_stack_ - lower_bound) / (upper_bound - lower_bound) # Apply the sigmoid function to the normalized data suppression_factor = sigmoid(10 * (normalized_data - 0.5)) # Step 3: Apply the suppression factor to the pixels mdata_stack_[mask] *= suppression_factor[mask] eomap = meta['refmap'] hpc_coords = all_coordinates_from_map(eomap) mask_disk = (coordinate_is_on_solar_disk(hpc_coords)) # Expand mask_disk along the third axis to match the shape of mdata_stack mask_disk_expanded = np.repeat(mask_disk[..., np.newaxis], mdata_stack.shape[2], axis=2) mdata_stack[mask_disk_expanded] = mdata_stack_[mask_disk_expanded] if snr_weight is not None: weights = np.array(snr_weight) else: weights = np.ones(mdata_stack.shape[-1]) if exptime_weight: ## use the exposure time as the weight to calculate the mean weights = weights*np.array(exptimes) # Adjust the weights of the first and last images by halving them to mitigate edge effects. # weights[[0, -1]] /= 2 weights = weights / np.sum(weights) weights[deselect_index] = 0 for w, f in zip(weights, fitsfilesin): log_print('INFO', f'Weight: {w:.2f} for {os.path.basename(f)}') weights = weights[np.newaxis, np.newaxis, :] data_stack = np.nansum(mdata_stack * weights, axis=-1) meta['header']['EXPTIME'] = np.nansum(exptimes) date_obs = Time(meta['header']['date-obs']).datetime.replace(hour=20, minute=0, second=0) - timedelta( seconds=meta['header']['EXPTIME'] / 2.0) meta['header']['date-obs'] = date_obs.strftime('%Y-%m-%dT%H:%M:%S.%f') ndfits.write(outfits, data_stack, meta['header'], overwrite=overwrite) log_print('INFO', f'Merging {outfits}')
from multiprocessing import Pool from functools import partial
[docs] def process_time_block(tidx_ted_tbg, msfile_in, msname, subdir, total_blocks, tdt, tdtstr, spws, niter_init, reftime_master, do_diskslfcal, disk_params, pols='XX', do_sbdcal=False, overwrite=False): tidx, (tbg, ted) = tidx_ted_tbg timerange = trange2timerange([tbg, ted]) if isinstance(reftime_master, str): reftime_master = Time(reftime_master) if isinstance(tdt, int): ## assume tdt is in seconds tdt = timedelta(seconds=tdt) if isinstance(msfile_in, list): msfile = msfile_in[tidx] log_print('INFO', f"msfile_in is a list. Processing time block {tidx + 1} of {total_blocks} (Time range: {timerange}) ... ") else: msfile = msfile_in log_print('INFO', f"msfile_in is a file. Processing time block {tidx + 1} of {total_blocks} (Time range: {timerange}) ... ") combined_vis_sub = os.path.join(subdir, f'{msname}_shift_corrected.block{tidx + 1}.seg{tdtstr}.ms') tr_series = generate_trange_series(tbg, ted, tdt) mmsfiles = split_mms(msfile, tr_series, workdir=subdir, overwrite=overwrite, verbose=True) log_print('INFO', f"Dry run to check if the final FITS images exist for {timerange} ...") fitsfile, imagefile = fd_images(msfile, timerange=timerange, cleanup=False, niter=5000, spws=spws, pbcor=False, imgoutdir=subdir, dryrun=True) if all_paths_exist(fitsfile): log_print('INFO', f"Final FITS images exist for {timerange}. Skip this block.") return None # Skip processing if images exist fits_ref, img_ref = fd_images(msfile, timerange=timerange, cleanup=False, overwrite=overwrite, pbcor=False, image_marker='init', niter=niter_init, spws=spws, imgoutdir=subdir) # for l in enumerate(img_ref) img_ref_name = [l.rstrip('.image') for l in img_ref] img_ref_model = [f'{l}.model' for l in img_ref_name] img_ref_model_fits = [f'{l}.model.fits' for l in img_ref_name] hf.imreg(vis=msfile, imagefile=img_ref, fitsfile=fits_ref, timerange=[timerange] * len(spws), toTb=False, usephacenter=False, overwrite=overwrite, docompress=True) hf.imreg(vis=msfile, imagefile=img_ref_model, fitsfile=img_ref_model_fits, timerange=[timerange] * len(spws), toTb=False, usephacenter=False, overwrite=overwrite, docompress=True) mmsfiles_rot = shift_corr(mmsfiles, tr_series, spws, img_ref_model, img_ref_model_fits, reftime_master, workdir=subdir, do_featureslfcal=True, pols=pols, overwrite=overwrite, do_diskslfcal=do_diskslfcal, disk_params=disk_params, do_sbdcal=do_sbdcal, verbose=True) if os.path.isdir(combined_vis_sub): shutil.rmtree(combined_vis_sub, ignore_errors=True) concat(vis=[l for l in mmsfiles_rot if l is not None], concatvis=combined_vis_sub) log_print('INFO', f"Shift Corr: Time block {tidx + 1} of {total_blocks} completed.") os.system(f'touch {os.path.join(subdir, f"TimeBlock{tidx + 1}.shift_corr.done")}') return combined_vis_sub
[docs] def process_imaging_timerange(tidx_ted_tbg, msfile_in, spws, subdir, overwrite): tidx, (tbg, ted) = tidx_ted_tbg if isinstance(msfile_in, list): ## this is for running the code on pipeline server msfile = msfile_in[tidx] else: msfile = msfile_in if msfile is None: return None, None timerange = trange2timerange((tbg, ted)) fitsfile, imagefile = fd_images(msfile, timerange=timerange, pbcor=False, overwrite=overwrite, cleanup=False, niter=5000, spws=spws, imgoutdir=subdir, compress=True) log_print('INFO', f"Imaging: Time block {tidx + 1} completed.") os.system(f'touch {os.path.join(subdir, f"TimeBlock{tidx + 1}.imaging.done")}') return fitsfile, imagefile
[docs] def pipeline_run(vis, outputvis='', workdir=None, slfcaltbdir=None, imgoutdir=None, figoutdir=None, clearcache=False, clearlargecache=False, pols='XX', mergeFITSonly=False, verbose=True, do_diskslfcal=True, overwrite=False, niter_init=200, ncpu='auto', tr_series_imaging=None, spws_imaging=None, hanning=False, do_sbdcal=False): """ Executes the EOVSA data processing pipeline for solar observation data. :param vis: Path to the input measurement set (MS) data. :type vis: str :param outputvis: Output path for the processed MS, defaults to an empty string. :type outputvis: str, optional :param workdir: Working directory for intermediate data, defaults to None which sets it to '/data1/workdir'. :type workdir: str, optional :param slfcaltbdir: Directory for storing calibration tables, defaults to None. :type slfcaltbdir: str, optional :param imgoutdir: Output directory for image files, defaults to None. :type imgoutdir: str, optional :param figoutdir: Output directory for figures, defaults to None. :type figoutdir: str, optional :param clearcache: If True, clears all cache files after processing, defaults to False. :type clearcache: bool, optional :param clearlargecache: If True, clears the large cache after processing, defaults to False. If True, clearcache is ignored. :type clearlargecache: bool, optional :param pols: Polarization types to process, defaults to 'XX'. :type pols: str, optional :param mergeFITSonly: If True, skips processing and only merges FITS files, defaults to False. :type mergeFITSonly: bool, optional :param verbose: Enables verbose output during processing, defaults to True. :type verbose: bool, optional :param do_diskslfcal: If True, performs disk self-calibration, defaults to True. :type do_diskslfcal: bool, optional :param overwrite: If True, overwrites existing files, defaults to False. :type overwrite: bool, optional :param niter_init: Initial number of iterations for imaging, defaults to 200. :type niter_init: int, optional :param ncpu: Specifies the number of CPUs for parallel processing, defaults to 'auto'. :type ncpu: str or int, optional :param tr_series_imaging: Time ranges for imaging, defaults to None. :type tr_series_imaging: list of tuple, optional :param spws_imaging: Spectral windows selected for imaging, defaults to None. :type spws_imaging: list of str, optional :param hanning: If True, applies Hanning smoothing to the data, defaults to False. :type hanning: bool, optional :param do_sbdcal: Boolean flag to perform single-band delay calibration, defaults to False. :type do_sbdcal: bool :return: Path to the processed visibility data. :rtype: str :example: ## if you want to specify the spectral windows for imaging >>> spws_imaging = ['5~10', '11~20', '21~30'] ## if you want to specify the time range for imaging >>> from datetime import datetime, timedelta >>> from suncasa.eovsa import eovsa_synoptic_imaging_pipeline as esip >>> tbg_imaging = datetime(2024, 4, 8, 16, 58, 0) >>> ted_imaging = datetime(2024, 4, 8, 19, 00, 0) >>> tdt_imaging = timedelta(minutes=2) >>> tr_series_imaging = esip.generate_trange_series(tbg_imaging, ted_imaging, tdt_imaging) """ msfile = vis.rstrip('/') if workdir is None: workdir = './' os.chdir(workdir) if slfcaltbdir is None: slfcaltbdir = workdir + '/' if imgoutdir is None: imgoutdir = workdir + '/' ## todo figoutdir is obsolete. will be removed once the pipeline is stable. if figoutdir is None: figoutdir = workdir + '/' ## the msfile we use 60 min model image to correct the data in 10 min interval. the model image is shifted to the reftime_master (20:00 UT of each day). tdt_master = timedelta(hours=1) # tdt_master = timedelta(minutes=30) tdt = timedelta(minutes=10) viz_timerange = ant_trange(msfile) (tstart, tend) = viz_timerange.split('~') tbg_master = Time(qa.quantity(tstart, 's')['value'], format='mjd').to_datetime() ted_master = Time(qa.quantity(tend, 's')['value'], format='mjd').to_datetime() reftime_master = Time(datetime.combine(tbg_master.date(), time(20, 0))) tr_series_master = generate_trange_series(tbg_master, ted_master, tdt_master, snap_to_full_hour=True) tdtmststr = f'{int(tdt_master.total_seconds() / 60)}min' tdtstr = f'{int(tdt.total_seconds() / 60)}min' date_str = tbg_master.strftime('%Y%m%d') tdtmst_str = f'{int(tdt_master.total_seconds() / 60)}m' subdir = tbg_master.strftime('temp_%Y%m%d') freq_setup = FrequencySetup(Time(tbg_master)) spws = freq_setup.spws defaultfreq = freq_setup.defaultfreq freq = defaultfreq nbands = freq_setup.nbands slashdate = tbg_master.strftime('%Y/%m/%d') dsize, fdens = calc_diskmodel(slashdate, nbands, freq, defaultfreq) if os.path.isdir(subdir) == False: os.makedirs(subdir) msname, _ = os.path.splitext(msfile) msname = os.path.basename(msname) msfile_copy = os.path.join(subdir, f'{msname}.ms') if not os.path.exists(msfile_copy): if msfile.lower().endswith('.ms'): shutil.copytree(msfile, msfile_copy) elif msfile.lower().endswith('.ms.tar.gz'): os.system(f'tar -zxf {msfile} -C {subdir}') else: raise ValueError(f"Unsupported file format: {msfile}") msfile = msfile_copy diskxmlfile = msfile + '.SOLDISK.xml' disk_params = {'dsize': dsize, 'fdens': fdens, 'freq': freq, 'diskxmlfile': diskxmlfile} disk_params_converted = { key: value.tolist() if isinstance(value, np.ndarray) else value for key, value in disk_params.items() } combined_vis = os.path.join(subdir, f'{msname}_shift_corrected.b{tdtmststr}.s{tdtstr}.ms') if outputvis == '': outputvis = os.path.join(workdir, f'{msname}.b{tdtmststr}.s{tdtstr}.shift_corr.ms') outputvis = outputvis.rstrip('/') # combined_vis_disk = os.path.join(subdir, f'{msname}_shift_corrected.b{tdtmststr}.s{tdtstr}.disk.ms') ### --------------------------------------------------------------### ## pre-processing block. flagging, hanning smoothing... ### --------------------------------------------------------------### if not mergeFITSonly: run_start_time_pre_proc = datetime.now() if os.path.isdir(msfile + '.flagversions') == True: if verbose: log_print('INFO', f'Flagversion of {msfile} already exists. Skipped...') else: if verbose: log_print('INFO', f'Flagging any high amplitudes viz from flares or RFIs in {msfile}') flagmanager(msfile, mode='save', versionname='pipeline_init') flagdata(vis=msfile, mode="tfcrop", spw='', action='apply', display='', timecutoff=3.0, freqcutoff=2.0, maxnpieces=2, flagbackup=False) flagmanager(msfile, mode='save', versionname='pipeline_remove_RFI-and-BURSTS') if hanning: msfile_hanning = msfile + '.hanning' if not os.path.exists(msfile_hanning): if verbose: log_print('INFO', f'Perform hanningsmooth for {msfile}...') hanningsmooth(vis=msfile, datacolumn='data', outputvis=msfile_hanning) else: if verbose: log_print('INFO', f'The hanningsmoothed {msfile} already exists. Skipped...') msfile = msfile_hanning run_end_time_pre_proc = datetime.now() elapsed_time = run_end_time_pre_proc - run_start_time_pre_proc elapsed_time_pre_proc = elapsed_time.total_seconds() / 60 log_print('INFO', f"Pre-processing completed. Elapsed time: {elapsed_time_pre_proc:.1f} minutes") ### --------------------------------------------------------------### ## shift correction block. ### --------------------------------------------------------------### total_blocks = len(tr_series_master) if verbose: log_print('INFO', f"Processing {msfile} into {total_blocks} time blocks based on the viz time ranges...") run_start_time_shift_corr = datetime.now() if ncpu == 'auto': ncpu = total_blocks else: try: ncpu = int(ncpu) except ValueError: raise ValueError("ncpu must be an integer or 'auto'.") mmsfiles_rot_all = [] if not mergeFITSonly: if ncpu == 1: log_print('INFO', f"Using 1 CPU for serial processing ...") for tidx, (tbg, ted) in enumerate(tr_series_master): combined_vis_sub = process_time_block((tidx, (tbg, ted)), msfile_in=msfile, msname=msname, subdir=subdir, total_blocks=total_blocks, tdt=tdt, tdtstr=tdtstr, spws=spws, niter_init=niter_init, reftime_master=reftime_master, do_diskslfcal=do_diskslfcal, disk_params=disk_params, pols=pols, do_sbdcal=do_sbdcal, overwrite=overwrite) mmsfiles_rot_all.append(combined_vis_sub) else: log_print('INFO', f"Using {ncpu} CPUs for parallel processing ...") if script_mode: log_print('INFO', f"Running on pipeline server. Using 1 CPU for splitting ...") mmsfiles = split_mms(msfile, tr_series_master, workdir=subdir, overwrite=overwrite, verbose=True) msfile2proc = mmsfiles import subprocess import json # Directory for the temporary scripts and results script_dir = os.path.join(subdir, "temp_scripts") result_dir = os.path.join(subdir, "temp_results") os.makedirs(script_dir, exist_ok=True) os.makedirs(result_dir, exist_ok=True) # Generate and run a script for each time block processes = [] for tidx, (tbg, ted) in enumerate(tr_series_master): script_path = os.path.join(script_dir, f"process_block_{tidx}.py") result_path = os.path.join(result_dir, f"result_{tidx}.json") # Write the script with arguments for this time block with open(script_path, 'w') as f: f.write(f""" import json from suncasa.eovsa import eovsa_synoptic_imaging_pipeline as esip # Function arguments args = {{ "tidx_ted_tbg": ({tidx}, ("{tbg.strftime('%Y-%m-%d %H:%M:%S')}", "{ted.strftime('%Y-%m-%d %H:%M:%S')}")), "msfile_in": {repr(msfile2proc)}, "msname": {repr(msname)}, "subdir": {repr(subdir)}, "total_blocks": {total_blocks}, "tdt": {repr(tdt.seconds)}, "tdtstr": {repr(tdtstr)}, "spws": {spws}, "niter_init": {niter_init}, "reftime_master": {repr(reftime_master.iso)}, "do_diskslfcal": {do_diskslfcal}, "disk_params": {repr(disk_params_converted)}, "pols": {repr(pols)}, "do_sbdcal": {do_sbdcal}, "overwrite": {overwrite} }} # Run the process_time_block function and save the result result = esip.process_time_block(**args) # Save result to a JSON file with open({repr(result_path)}, 'w') as result_file: json.dump(result, result_file) """) # py_configfile = '/inti/.init_loadpython38' if is_on_inti else '/home/sjyu/.setenv_pyenv38SY' # py_exec = '/inti/software/anaconda3/envs/python38_env/bin/python' if is_on_inti else '/home/user/.pyenv/shims/python' py_config_files = ['/inti/.init_loadpython38', '$HOME/.setenv_pyenv38SY', '$HOME/.setenv_pyenv310'] # Find the first available configuration file py_configfile = None for py_configfile in py_config_files: if os.path.exists(py_configfile): break if py_configfile is None: raise FileNotFoundError("No valid Python configuration file found.") # Source the configuration file and find the Python executable try: result = subprocess.run( ["/bin/bash", "-c", f"source {py_configfile} && which python"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) py_exec = result.stdout.strip() if not py_exec: raise RuntimeError(f"Could not find Python executable after sourcing {py_configfile}.") except Exception as e: raise RuntimeError(f"Error sourcing {py_configfile} or finding Python executable: {e}") # Run the script in a new process p = subprocess.Popen( ["/bin/bash", "-c", f"source {py_configfile} && {py_exec} {script_path}" ] ) processes.append(p) # Wait for all processes to complete for p in processes: p.wait() # Collect results results = [] for tidx in range(len(tr_series_master)): result_path = os.path.join(result_dir, f"result_{tidx}.json") if os.path.exists(result_path): with open(result_path, 'r') as result_file: result = json.load(result_file) results.append(result) else: log_print('ERROR', f"No result found for block {tidx}") else: msfile2proc = msfile worker = partial(process_time_block, msfile_in=msfile2proc, msname=msname, subdir=subdir, total_blocks=total_blocks, tdt=tdt, tdtstr=tdtstr, spws=spws, niter_init=niter_init, reftime_master=reftime_master, do_diskslfcal=do_diskslfcal, disk_params=disk_params, pols=pols, do_sbdcal=do_sbdcal, overwrite=overwrite) with Pool(ncpu) as pool: results = pool.map(worker, enumerate(tr_series_master)) mmsfiles_rot_all = [res for res in results if res is not None] if os.path.isdir(combined_vis) == True: shutil.rmtree(combined_vis, ignore_errors=True) if overwrite: if os.path.exists(combined_vis): shutil.rmtree(combined_vis, ignore_errors=True) if not os.path.exists(combined_vis): concat(vis=[l for l in mmsfiles_rot_all if l is not None], concatvis=combined_vis) add_disk_before_imaging = False if add_disk_before_imaging: ## insert disk back combined_vis, diskcl = insertdiskmodel(combined_vis, dsize=dsize, fdens=fdens, xmlfile=diskxmlfile, active=False) ## add disk model back to the viz uvsub(vis=combined_vis, reverse=True) combined_vis_disk = combined_vis + '.disk' split(vis=combined_vis, outputvis=combined_vis_disk, datacolumn='corrected') combined_vis = combined_vis_disk run_end_time_shift_corr = datetime.now() elapsed_time = run_end_time_shift_corr - run_start_time_shift_corr elapsed_time_shift_corr = elapsed_time.total_seconds() / 60 log_print('INFO', f"Self-calibration processes completed. Elapsed time: {elapsed_time_shift_corr:.1f} minutes") ### --------------------------------------------------------------### ## imaging block. ### --------------------------------------------------------------### run_start_time_imaging = datetime.now() ## imaging the final combined ms file if spws_imaging is None: spws_imaging = spws if tr_series_imaging is None: tr_series_imaging = tr_series_master total_blocks_imaging = len(tr_series_imaging) if not mergeFITSonly: if ncpu == 1: for tidx, (tbg, ted) in enumerate(tr_series_imaging): timerange = trange2timerange((tbg, ted)) log_print('INFO', f"Imaging time block {tidx + 1} of {total_blocks_imaging} (Time range: {timerange}) ... ") fitsfile, imagefile = fd_images(combined_vis, timerange=timerange, pbcor=False, overwrite=overwrite, cleanup=False, niter=5000, spws=spws_imaging, # usemask='user', ## toggle this for single band imaging imgoutdir=subdir) else: if script_mode: msfiles_in = mmsfiles_rot_all # Directory for the temporary scripts and results script_dir = os.path.join(subdir, "temp_scripts_imaging") result_dir = os.path.join(subdir, "temp_results_imaging") os.makedirs(script_dir, exist_ok=True) os.makedirs(result_dir, exist_ok=True) # Generate and run a script for each time block processes = [] for tidx, (tbg, ted) in enumerate(tr_series_imaging): script_path = os.path.join(script_dir, f"process_imaging_block_{tidx}.py") result_path = os.path.join(result_dir, f"result_imaging_{tidx}.json") # Write the script with arguments for this time block with open(script_path, 'w') as f: f.write(f""" import json from suncasa.eovsa import eovsa_synoptic_imaging_pipeline as esip # Function arguments args = {{ "tidx_ted_tbg": ({tidx}, ("{tbg.strftime('%Y-%m-%d %H:%M:%S')}", "{ted.strftime('%Y-%m-%d %H:%M:%S')}")), "msfile_in": {repr(msfiles_in)}, "spws": {spws_imaging}, "subdir": {repr(subdir)}, "overwrite": {overwrite} }} # Run the process_imaging_timerange function and save the result result = esip.process_imaging_timerange(**args) # Save result to a JSON file with open({repr(result_path)}, 'w') as result_file: json.dump(result, result_file) """) # Run the script in a new process # py_configfile = '/inti/.init_loadpython38' if is_on_inti else '/home/sjyu/.setenv_pyenv38SY' # py_exec = '/inti/software/anaconda3/envs/python38_env/bin/python' if is_on_inti else '/home/user/.pyenv/shims/python' py_config_files = ['/inti/.init_loadpython38', '$HOME/.setenv_pyenv38SY', '$HOME/.setenv_pyenv310'] # Find the first available configuration file py_configfile = None for py_configfile in py_config_files: if os.path.exists(py_configfile): break if py_configfile is None: raise FileNotFoundError("No valid Python configuration file found.") # Source the configuration file and find the Python executable try: result = subprocess.run( ["/bin/bash", "-c", f"source {py_configfile} && which python"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) py_exec = result.stdout.strip() if not py_exec: raise RuntimeError(f"Could not find Python executable after sourcing {py_configfile}.") except Exception as e: raise RuntimeError(f"Error sourcing {py_configfile} or finding Python executable: {e}") p = subprocess.Popen([ "/bin/bash", "-c", f"source {py_configfile} && {py_exec} {script_path}" ]) processes.append(p) # Wait for all processes to complete for p in processes: p.wait() # Collect results imaging_results = [] for tidx in range(len(tr_series_imaging)): result_path = os.path.join(result_dir, f"result_imaging_{tidx}.json") if os.path.exists(result_path): with open(result_path, 'r') as result_file: result = json.load(result_file) imaging_results.append(result) else: log_print('ERROR', f"No result found for imaging block {tidx}") else: msfiles_in = combined_vis # Prepare partial function with pre-filled arguments process_with_params = partial(process_imaging_timerange, msfile_in=msfiles_in, spws=spws_imaging, subdir=subdir, overwrite=overwrite) with Pool(ncpu) as p: p.map(process_with_params, enumerate(tr_series_imaging)) run_end_time_imaging = datetime.now() elapsed_time = run_end_time_imaging - run_start_time_imaging elapsed_time_imaging = elapsed_time.total_seconds() / 60 log_print('INFO', f"Imaging processes completed. Elapsed time: {elapsed_time_imaging:.1f} minutes") ### --------------------------------------------------------------### ## post-processing block. adding disk back... ### --------------------------------------------------------------### ## merge the daily synoptic images to a single daily synoptic image run_start_time_post_proc = datetime.now() diskinfo = readdiskxml(diskxmlfile) syndaily_fitsfiles = [] ## filename convention ## eovsa.dailysynoptic.20211124T200000_UTC.s02-05.tb.fits ## eovsa.synoptic_180m.20211124T180000_UTC.s02-05.tb.fits snr_threshold = [15, 25, 50, 80, 80, 40, 20 ] # snr_threshold = [15, 25, 50, 80, 80, 40, 20 ] for sidx, spw in enumerate(spws_imaging): spwstr = format_spw(spw) eofiles_rot = sorted(glob(f"{subdir}/eovsa_{date_str}T*_??min.s{spwstr}.tb.fits")) data_stack = [] for file in eofiles_rot: meta_, data_ = ndfits.read(file) if data_ is not None: data = data_ meta = meta_ data_stack.append(np.squeeze(data)) rsun_pix = meta['header']['RSUN_OBS'] / meta['header']['CDELT1'] data_stack = np.dstack(data_stack) mdata_stack = ma.masked_invalid(data_stack) # Add SNR parameters and compute SNR crpix1 = meta['header']['CRPIX1'] crpix2 = meta['header']['CRPIX2'] noisy_indices, std_scores, residual_scores, snr_scores = detect_noisy_images( mdata_stack, rsun_pix, crpix1, crpix2, std_threshold=5.0, residual_threshold=5.0, snr_threshold=10, showplt=False) snr = 1 / snr_scores deselect_index = np.where(noisy_indices ==3)[0] eofiles_rot_disk = [] for eofile in eofiles_rot: datetimestr = os.path.basename(eofile).split('_')[1] synfitsfile = os.path.join(imgoutdir, f"eovsa.synoptic_{tdtmst_str}.{datetimestr}_UTC.s{spwstr}.tb.fits") eomap_disk, tb_disk, eofile_disk = image_adddisk(eofile, diskinfo) if eofile_disk is not None: shutil.move(eofile_disk, synfitsfile) log_print('INFO', f'Adding solar disk back to {synfitsfile}') eofiles_rot_disk.append(synfitsfile) syndaily_fitsfile = os.path.join(imgoutdir, f"eovsa.synoptic_daily.{date_str}T200000_UTC.s{spwstr}.tb.fits") try: merge_FITSfiles(eofiles_rot_disk, syndaily_fitsfile, exptime_weight=True, snr_weight=snr,deselect_index=deselect_index) syndaily_fitsfiles.append(syndaily_fitsfiles) except Exception as e: log_print('ERROR', f"Error merging FITS files: {e}. Skipping the spectral window {spwstr}.") ## todo synoptic_60m FITS images currently align to 20 UT. Need to rotate them to better reflect actual observation times. Also, the FITS files need to be compressed. ## remove the intermediate ms files for i in mmsfiles_rot_all: if os.path.isdir(i): shutil.rmtree(i, ignore_errors=True) if outputvis: if os.path.exists(outputvis): shutil.rmtree(outputvis) shutil.move(combined_vis, outputvis) combined_vis = outputvis newdiskxmlfile = '{}.SOLDISK.xml'.format(outputvis) if os.path.exists(newdiskxmlfile): os.remove(newdiskxmlfile) shutil.move(diskxmlfile, newdiskxmlfile) if clearlargecache: clearcache = False shutil.rmtree(os.path.join(subdir, 'images'), ignore_errors=True) shutil.rmtree(os.path.join(subdir, '*.ms'), ignore_errors=True) shutil.rmtree(os.path.join(subdir, '*.model'), ignore_errors=True) print(f'files in {os.path.join(subdir, "images")} removed.') print(f'files in {os.path.join(subdir, "*.ms")} removed.') print(f'files in {os.path.join(subdir, "*.model")} removed.') if clearcache: shutil.rmtree(subdir, ignore_errors=True) run_end_time_post_proc = datetime.now() elapsed_time = run_end_time_post_proc - run_start_time_post_proc elapsed_time_post_proc = elapsed_time.total_seconds() / 60 log_print('INFO', f"Merging completed. Elapsed time: {elapsed_time_post_proc:.1f} minutes") # Create a dictionary to store the time taken for each step time_dict = { "Pre-processing": elapsed_time_pre_proc, "Self-calibration": elapsed_time_shift_corr, "Imaging": elapsed_time_imaging, "Post-processing": elapsed_time_post_proc } # Calculate the total processing time total_time = sum(time_dict.values()) # Log the total processing time log_print('INFO', f"Total processing time: {total_time:.1f} minutes.") # Log the time taken for each step for step, tim in time_dict.items(): log_print('INFO', f" --> {step}: {tim:.1f} minutes.") return combined_vis
if __name__ == '__main__': ''' this code is trying to address the issue of smearing effect in all-day synthesis images. description of the issue: https://www.ovsa.njit.edu/wiki/index.php/All-Day_Synthesis_Issues This code is built on python 3.8. '''
[docs] description = 'this code is trying to address the issue of smearing effect in all-day synthesis images. Description of the issue: https://www.ovsa.njit.edu/wiki/index.php/All-Day_Synthesis_Issues. This code is built on python 3.8.'
parser = argparse.ArgumentParser( description="Executes the EOVSA data processing pipeline for solar observation data.") parser.add_argument('vis', type=str, help='Path to the input measurement set (MS) data.') parser.add_argument('--outputvis', type=str, default='', help='Output path for the processed MS.') parser.add_argument('--workdir', type=str, help='Working directory for intermediate data.') parser.add_argument('--slfcaltbdir', type=str, help='Directory for storing calibration tables.') parser.add_argument('--imgoutdir', type=str, help='Output directory for image files.') parser.add_argument('--figoutdir', type=str, help='Output directory for figures.') parser.add_argument('--clearcache', action='store_true', help='Clears all cache files after processing.') parser.add_argument('--clearlargecache', action='store_true', help='Clears the large cache files after processing. If True, clearcache will be set to False.') parser.add_argument('--pols', type=str, default='XX', help='Polarization types to process.') parser.add_argument('--mergeFITSonly', action='store_true', help='Skips processing and only merges FITS files.') # parser.add_argument('--verbose', action='store_true', help='Enables verbose output during processing.') # parser.add_argument('--do_diskslfcal', action='store_true', help='Performs disk self-calibration.') parser.add_argument('--overwrite', action='store_true', help='Overwrites existing files.') parser.add_argument('--niter_init', type=int, default=200, help='Initial number of iterations for imaging.') parser.add_argument('--ncpu', type=str, default='auto', help="Specifies the number of CPUs for parallel processing.") parser.add_argument('--tr_series_imaging', nargs='*', help='Time ranges for imaging, expects a list of tuples.') parser.add_argument('--spws_imaging', nargs='*', help='Spectral windows selected for imaging.') parser.add_argument('--hanning', action='store_true', help='Applies Hanning smoothing to the data.') parser.add_argument('--do_sbdcal', action='store_true', help='Perform single-band delay calibration.') args = parser.parse_args() pipeline_run( vis=args.vis, outputvis=args.outputvis, workdir=args.workdir, slfcaltbdir=args.slfcaltbdir, imgoutdir=args.imgoutdir, figoutdir=args.figoutdir, clearcache=args.clearcache, clearlargecache=args.clearlargecache, pols=args.pols, mergeFITSonly=args.mergeFITSonly, # verbose=args.verbose, # do_diskslfcal=args.do_diskslfcal, overwrite=args.overwrite, niter_init=args.niter_init, ncpu=args.ncpu, tr_series_imaging=args.tr_series_imaging, spws_imaging=args.spws_imaging, hanning=args.hanning, do_sbdcal=args.do_sbdcal )