##################### generated by xml-casa (v2) from pmaxfit.xml ###################
##################### b3be0c8ff5a66315d1c40770a10aa871 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from suncasatasks import pmaxfit as _pmaxfit_t
from collections import OrderedDict
import numpy
import sys
import os
import shutil
[docs]
def static_var(varname, value):
def decorate(func):
setattr(func, varname, value)
return func
return decorate
[docs]
class _pmaxfit:
"""
pmaxfit ---- Find maximum and do parabolic fit in the sky
PARAMETER SUMMARY
imagename Name of the input image
box Rectangular region(s) to select in direction plane. See "help par.box"
for details. Default is to use the entire direction plane.
eg "100, 120, 200, 220, 300, 300, 400, 400" to use two boxes.
OVERVIEW
This application finds the pixel with the maximum value in the region, and then uses function
findsources to generate a Componentlist with one component.
The method returns a dictionary with fours keys, 'succeeded', 'timestamps', 'imagenames'
and 'outputs'. The value of 'outputs' is a dictionary representing
a component list reflecting the fit results over multiple channels.
Both the 'outputs' dictionaries can be read into a component list tool (default tool is named cl)
using the fromrecord() method for easier inspection using tool methods, eg
FITTING OVER MULTIPLE CHANNELS
For fitting over multiple channels, the result of the previous successful fit is used as
the estimate for the next channel. The number of gaussians fit cannot be varied on a channel
by channel basis. Thus the variation of source structure should be reasonably smooth in
frequency to produce reliable fit results.
--------- parameter descriptions ---------------------------------------------
imagefiles A list of the input images
ncpu Number of cpu cores to use
box Rectangular region(s) to select in direction plane. See "help par.box" for details. Default is to use the entire direction plane.
width Half-width of fit grid
[1;42mRETURNS[1;m void
--------- examples -----------------------------------------------------------
EXAMPLE:
Here is how one might fit two gaussians to multiple channels of a cube using the fit
from the previous channel as the initial estimate for the next. It also illustrates
how one can specify a region in the associated continuum image as the region to use
as the fit for the channel.
begin{verbatim}
default pmaxfit
imagename = "co_cube.im"
# specify region using region from continuum
box = "100,120,200,200"
pmaxfit()
"""
[docs]
_info_group_ = """analysis"""
[docs]
_info_desc_ = """Find maximum and do parabolic fit in the sky"""
[docs]
__schema = {'imagefiles': {'type': 'cReqPathVec', 'coerce': [_coerce.to_list,_coerce.expand_pathvec]}, 'ncpu': {'type': 'cInt'}, 'box': {'type': 'cStr', 'coerce': _coerce.to_str}, 'width': {'type': 'cInt'}}
def __init__(self):
[docs]
self.__root_frame_ = None
[docs]
def __globals_(self):
if self.__root_frame_ is None:
self.__root_frame_ = _find_frame( )
assert self.__root_frame_ is not None, "could not find CASAshell global frame"
return self.__root_frame_
[docs]
def __to_string_(self,value):
if type(value) is str:
return "'%s'" % value
else:
return str(value)
[docs]
def __validate_(self,doc,schema):
return _pc.validate(doc,schema)
[docs]
def __do_inp_output(self,param_prefix,description_str,formatting_chars):
out = self.__stdout or sys.stdout
description = description_str.split( )
prefix_width = 23 + 10 + 4
output = [ ]
addon = ''
first_addon = True
while len(description) > 0:
## starting a new line.....................................................................
if len(output) == 0:
## for first line add parameter information............................................
if len(param_prefix)-formatting_chars > prefix_width - 1:
output.append(param_prefix)
continue
addon = param_prefix + ' #'
first_addon = True
addon_formatting = formatting_chars
else:
## for subsequent lines space over prefix width........................................
addon = (' ' * prefix_width) + '#'
first_addon = False
addon_formatting = 0
## if first word of description puts us over the screen width, bail........................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width:
## if we're doing the first line make sure it's output.................................
if first_addon: output.append(addon)
break
while len(description) > 0:
## if the next description word puts us over break for the next line...................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break
addon = addon + ' ' + description[0]
description.pop(0)
output.append(addon)
out.write('\n'.join(output) + '\n')
#--------- return nonsubparam values ----------------------------------------------
[docs]
def __imagefiles_dflt( self, glb ):
return [ ]
[docs]
def __imagefiles( self, glb ):
if 'imagefiles' in glb: return glb['imagefiles']
return [ ]
[docs]
def __ncpu_dflt( self, glb ):
return int(8)
[docs]
def __ncpu( self, glb ):
if 'ncpu' in glb: return glb['ncpu']
return int(8)
[docs]
def __box_dflt( self, glb ):
return ''
[docs]
def __box( self, glb ):
if 'box' in glb: return glb['box']
return ''
[docs]
def __width_dflt( self, glb ):
return int(5)
[docs]
def __width( self, glb ):
if 'width' in glb: return glb['width']
return int(5)
#--------- return inp/go default --------------------------------------------------
#--------- return subparam values -------------------------------------------------
#--------- subparam inp output ----------------------------------------------------
[docs]
def __imagefiles_inp(self):
description = ''
value = self.__imagefiles( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'imagefiles': value},{'imagefiles': self.__schema['imagefiles']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-10.10s = %s%-23s%s' % ('imagefiles',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
[docs]
def __ncpu_inp(self):
description = ''
value = self.__ncpu( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'ncpu': value},{'ncpu': self.__schema['ncpu']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-10.10s = %s%-23s%s' % ('ncpu',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
[docs]
def __box_inp(self):
description = ''
value = self.__box( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'box': value},{'box': self.__schema['box']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-10.10s = %s%-23s%s' % ('box',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
[docs]
def __width_inp(self):
description = ''
value = self.__width( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'width': value},{'width': self.__schema['width']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-10.10s = %s%-23s%s' % ('width',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
#--------- global default implementation-------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
[docs]
def set_global_defaults(self):
self.set_global_defaults.state['last'] = self
glb = self.__globals_( )
if 'imagefiles' in glb: del glb['imagefiles']
if 'ncpu' in glb: del glb['ncpu']
if 'box' in glb: del glb['box']
if 'width' in glb: del glb['width']
#--------- inp function -----------------------------------------------------------
[docs]
def inp(self):
print("# pmaxfit -- %s" % self._info_desc_)
self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24))
self.__imagefiles_inp( )
self.__ncpu_inp( )
self.__box_inp( )
self.__width_inp( )
#--------- tget function ----------------------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
[docs]
def tget(self,file=None):
from casashell.private.stack_manip import find_frame
from runpy import run_path
filename = None
if file is None:
if os.path.isfile("pmaxfit.last"):
filename = "pmaxfit.last"
elif isinstance(file, str):
if os.path.isfile(file):
filename = file
if filename is not None:
glob = find_frame( )
newglob = run_path( filename, init_globals={ } )
for i in newglob:
glob[i] = newglob[i]
self.tget.state['last'] = self
else:
print("could not find last file, setting defaults instead...")
self.set_global_defaults( )
[docs]
def __call__( self, imagefiles=None, ncpu=None, box=None, width=None ):
def noobj(s):
if s.startswith('<') and s.endswith('>'):
return "None"
else:
return s
_prefile = os.path.realpath('pmaxfit.pre')
_postfile = os.path.realpath('pmaxfit.last')
_return_result_ = None
_arguments = [imagefiles,ncpu,box,width]
_invocation_parameters = OrderedDict( )
if any(map(lambda x: x is not None,_arguments)):
# invoke python style
# set the non sub-parameters that are not None
local_global = { }
if imagefiles is not None: local_global['imagefiles'] = imagefiles
if ncpu is not None: local_global['ncpu'] = ncpu
if box is not None: local_global['box'] = box
if width is not None: local_global['width'] = width
# the invocation parameters for the non-subparameters can now be set - this picks up those defaults
_invocation_parameters['imagefiles'] = self.__imagefiles( local_global )
_invocation_parameters['ncpu'] = self.__ncpu( local_global )
_invocation_parameters['box'] = self.__box( local_global )
_invocation_parameters['width'] = self.__width( local_global )
# the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default
else:
# invoke with inp/go semantics
_invocation_parameters['imagefiles'] = self.__imagefiles( self.__globals_( ) )
_invocation_parameters['ncpu'] = self.__ncpu( self.__globals_( ) )
_invocation_parameters['box'] = self.__box( self.__globals_( ) )
_invocation_parameters['width'] = self.__width( self.__globals_( ) )
try:
with open(_prefile,'w') as _f:
for _i in _invocation_parameters:
_f.write("%-10s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i]))))
_f.write("#pmaxfit( ")
count = 0
for _i in _invocation_parameters:
_f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i]))))
count += 1
if count < len(_invocation_parameters): _f.write(",")
_f.write(" )\n")
except: pass
try:
_return_result_ = _pmaxfit_t( _invocation_parameters['imagefiles'],_invocation_parameters['ncpu'],_invocation_parameters['box'],_invocation_parameters['width'] )
except Exception as e:
from traceback import format_exc
from casatasks import casalog
casalog.origin('pmaxfit')
casalog.post("Exception Reported: Error in pmaxfit: %s" % str(e),'SEVERE')
casalog.post(format_exc( ))
_return_result_ = False
try:
os.rename(_prefile,_postfile)
except: pass
return _return_result_