Source code for pmaxfit

##################### generated by xml-casa (v2) from pmaxfit.xml ###################
##################### b3be0c8ff5a66315d1c40770a10aa871 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from suncasatasks import pmaxfit as _pmaxfit_t
from collections import OrderedDict
import numpy
import sys
import os

import shutil

[docs] def static_var(varname, value): def decorate(func): setattr(func, varname, value) return func return decorate
[docs] class _pmaxfit: """ pmaxfit ---- Find maximum and do parabolic fit in the sky PARAMETER SUMMARY imagename Name of the input image box Rectangular region(s) to select in direction plane. See "help par.box" for details. Default is to use the entire direction plane. eg "100, 120, 200, 220, 300, 300, 400, 400" to use two boxes. OVERVIEW This application finds the pixel with the maximum value in the region, and then uses function findsources to generate a Componentlist with one component. The method returns a dictionary with fours keys, 'succeeded', 'timestamps', 'imagenames' and 'outputs'. The value of 'outputs' is a dictionary representing a component list reflecting the fit results over multiple channels. Both the 'outputs' dictionaries can be read into a component list tool (default tool is named cl) using the fromrecord() method for easier inspection using tool methods, eg FITTING OVER MULTIPLE CHANNELS For fitting over multiple channels, the result of the previous successful fit is used as the estimate for the next channel. The number of gaussians fit cannot be varied on a channel by channel basis. Thus the variation of source structure should be reasonably smooth in frequency to produce reliable fit results. --------- parameter descriptions --------------------------------------------- imagefiles A list of the input images ncpu Number of cpu cores to use box Rectangular region(s) to select in direction plane. See "help par.box" for details. Default is to use the entire direction plane. width Half-width of fit grid RETURNS void --------- examples ----------------------------------------------------------- EXAMPLE: Here is how one might fit two gaussians to multiple channels of a cube using the fit from the previous channel as the initial estimate for the next. It also illustrates how one can specify a region in the associated continuum image as the region to use as the fit for the channel. begin{verbatim} default pmaxfit imagename = "co_cube.im" # specify region using region from continuum box = "100,120,200,200" pmaxfit() """
[docs] _info_group_ = """analysis"""
[docs] _info_desc_ = """Find maximum and do parabolic fit in the sky"""
[docs] __schema = {'imagefiles': {'type': 'cReqPathVec', 'coerce': [_coerce.to_list,_coerce.expand_pathvec]}, 'ncpu': {'type': 'cInt'}, 'box': {'type': 'cStr', 'coerce': _coerce.to_str}, 'width': {'type': 'cInt'}}
def __init__(self):
[docs] self.__stdout = None
[docs] self.__stderr = None
[docs] self.__root_frame_ = None
[docs] def __globals_(self): if self.__root_frame_ is None: self.__root_frame_ = _find_frame( ) assert self.__root_frame_ is not None, "could not find CASAshell global frame" return self.__root_frame_
[docs] def __to_string_(self,value): if type(value) is str: return "'%s'" % value else: return str(value)
[docs] def __validate_(self,doc,schema): return _pc.validate(doc,schema)
[docs] def __do_inp_output(self,param_prefix,description_str,formatting_chars): out = self.__stdout or sys.stdout description = description_str.split( ) prefix_width = 23 + 10 + 4 output = [ ] addon = '' first_addon = True while len(description) > 0: ## starting a new line..................................................................... if len(output) == 0: ## for first line add parameter information............................................ if len(param_prefix)-formatting_chars > prefix_width - 1: output.append(param_prefix) continue addon = param_prefix + ' #' first_addon = True addon_formatting = formatting_chars else: ## for subsequent lines space over prefix width........................................ addon = (' ' * prefix_width) + '#' first_addon = False addon_formatting = 0 ## if first word of description puts us over the screen width, bail........................ if len(addon + description[0]) - addon_formatting + 1 > self.term_width: ## if we're doing the first line make sure it's output................................. if first_addon: output.append(addon) break while len(description) > 0: ## if the next description word puts us over break for the next line................... if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break addon = addon + ' ' + description[0] description.pop(0) output.append(addon) out.write('\n'.join(output) + '\n')
#--------- return nonsubparam values ----------------------------------------------
[docs] def __imagefiles_dflt( self, glb ): return [ ]
[docs] def __imagefiles( self, glb ): if 'imagefiles' in glb: return glb['imagefiles'] return [ ]
[docs] def __ncpu_dflt( self, glb ): return int(8)
[docs] def __ncpu( self, glb ): if 'ncpu' in glb: return glb['ncpu'] return int(8)
[docs] def __box_dflt( self, glb ): return ''
[docs] def __box( self, glb ): if 'box' in glb: return glb['box'] return ''
[docs] def __width_dflt( self, glb ): return int(5)
[docs] def __width( self, glb ): if 'width' in glb: return glb['width'] return int(5)
#--------- return inp/go default -------------------------------------------------- #--------- return subparam values ------------------------------------------------- #--------- subparam inp output ----------------------------------------------------
[docs] def __imagefiles_inp(self): description = '' value = self.__imagefiles( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'imagefiles': value},{'imagefiles': self.__schema['imagefiles']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-10.10s = %s%-23s%s' % ('imagefiles',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
[docs] def __ncpu_inp(self): description = '' value = self.__ncpu( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'ncpu': value},{'ncpu': self.__schema['ncpu']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-10.10s = %s%-23s%s' % ('ncpu',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
[docs] def __box_inp(self): description = '' value = self.__box( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'box': value},{'box': self.__schema['box']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-10.10s = %s%-23s%s' % ('box',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
[docs] def __width_inp(self): description = '' value = self.__width( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'width': value},{'width': self.__schema['width']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-10.10s = %s%-23s%s' % ('width',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
#--------- global default implementation------------------------------------------- @static_var('state', __sf__('casa_inp_go_state'))
[docs] def set_global_defaults(self): self.set_global_defaults.state['last'] = self glb = self.__globals_( ) if 'imagefiles' in glb: del glb['imagefiles'] if 'ncpu' in glb: del glb['ncpu'] if 'box' in glb: del glb['box'] if 'width' in glb: del glb['width']
#--------- inp function -----------------------------------------------------------
[docs] def inp(self): print("# pmaxfit -- %s" % self._info_desc_) self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24)) self.__imagefiles_inp( ) self.__ncpu_inp( ) self.__box_inp( ) self.__width_inp( )
#--------- tget function ---------------------------------------------------------- @static_var('state', __sf__('casa_inp_go_state'))
[docs] def tget(self,file=None): from casashell.private.stack_manip import find_frame from runpy import run_path filename = None if file is None: if os.path.isfile("pmaxfit.last"): filename = "pmaxfit.last" elif isinstance(file, str): if os.path.isfile(file): filename = file if filename is not None: glob = find_frame( ) newglob = run_path( filename, init_globals={ } ) for i in newglob: glob[i] = newglob[i] self.tget.state['last'] = self else: print("could not find last file, setting defaults instead...") self.set_global_defaults( )
[docs] def __call__( self, imagefiles=None, ncpu=None, box=None, width=None ): def noobj(s): if s.startswith('<') and s.endswith('>'): return "None" else: return s _prefile = os.path.realpath('pmaxfit.pre') _postfile = os.path.realpath('pmaxfit.last') _return_result_ = None _arguments = [imagefiles,ncpu,box,width] _invocation_parameters = OrderedDict( ) if any(map(lambda x: x is not None,_arguments)): # invoke python style # set the non sub-parameters that are not None local_global = { } if imagefiles is not None: local_global['imagefiles'] = imagefiles if ncpu is not None: local_global['ncpu'] = ncpu if box is not None: local_global['box'] = box if width is not None: local_global['width'] = width # the invocation parameters for the non-subparameters can now be set - this picks up those defaults _invocation_parameters['imagefiles'] = self.__imagefiles( local_global ) _invocation_parameters['ncpu'] = self.__ncpu( local_global ) _invocation_parameters['box'] = self.__box( local_global ) _invocation_parameters['width'] = self.__width( local_global ) # the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default else: # invoke with inp/go semantics _invocation_parameters['imagefiles'] = self.__imagefiles( self.__globals_( ) ) _invocation_parameters['ncpu'] = self.__ncpu( self.__globals_( ) ) _invocation_parameters['box'] = self.__box( self.__globals_( ) ) _invocation_parameters['width'] = self.__width( self.__globals_( ) ) try: with open(_prefile,'w') as _f: for _i in _invocation_parameters: _f.write("%-10s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i])))) _f.write("#pmaxfit( ") count = 0 for _i in _invocation_parameters: _f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i])))) count += 1 if count < len(_invocation_parameters): _f.write(",") _f.write(" )\n") except: pass try: _return_result_ = _pmaxfit_t( _invocation_parameters['imagefiles'],_invocation_parameters['ncpu'],_invocation_parameters['box'],_invocation_parameters['width'] ) except Exception as e: from traceback import format_exc from casatasks import casalog casalog.origin('pmaxfit') casalog.post("Exception Reported: Error in pmaxfit: %s" % str(e),'SEVERE') casalog.post(format_exc( )) _return_result_ = False try: os.rename(_prefile,_postfile) except: pass return _return_result_
[docs] pmaxfit = _pmaxfit( )