# Copyright (C) 2013 Ian Harry
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""
This module is responsible for setting up the segment generation stage of
workflows. For details about this module and its capabilities see here:
https://ldas-jobs.ligo.caltech.edu/~cbc/docs/pycbc/ahope/segments.html
"""
import os, sys, shutil, stat, copy, itertools
import logging
from six.moves.urllib.request import pathname2url
from six.moves.urllib.parse import urljoin, urlunparse
import lal
from ligo import segments
from ligo.segments import utils as segmentsUtils
from glue.ligolw import table, lsctables, ligolw
from pycbc.workflow.core import Executable, FileList, Node, SegFile, make_analysis_dir, make_external_call, File
from pycbc.workflow.core import resolve_url
from pycbc.workflow.jobsetup import LigolwAddExecutable, LigoLWCombineSegsExecutable
[docs]def get_science_segments(workflow, out_dir, tags=None):
"""
Get the analyzable segments after applying ini specified vetoes.
Parameters
-----------
workflow : Workflow object
Instance of the workflow object
out_dir : path
Location to store output files
tags : list of strings
Used to retrieve subsections of the ini file for
configuration options.
Returns
--------
sci_seg_file : workflow.core.SegFile instance
The segment file combined from all ifos containing the science segments.
sci_segs : Ifo keyed dict of ligo.segments.segmentlist instances
The science segs for each ifo, keyed by ifo
sci_seg_name : str
The name with which science segs are stored in the output XML file.
"""
if tags is None:
tags = []
logging.info('Starting generation of science segments')
make_analysis_dir(out_dir)
start_time = workflow.analysis_time[0]
end_time = workflow.analysis_time[1]
# NOTE: Should this be overrideable in the config file?
sci_seg_name = "SCIENCE"
sci_segs = {}
sci_seg_dict = segments.segmentlistdict()
sci_seg_summ_dict = segments.segmentlistdict()
for ifo in workflow.ifos:
curr_sci_segs, curr_sci_xml, curr_seg_name = get_sci_segs_for_ifo(ifo,
workflow.cp, start_time, end_time, out_dir, tags)
sci_seg_dict[ifo + ':' + sci_seg_name] = curr_sci_segs
sci_segs[ifo] = curr_sci_segs
sci_seg_summ_dict[ifo + ':' + sci_seg_name] = \
curr_sci_xml.seg_summ_dict[ifo + ':' + curr_seg_name]
sci_seg_file = SegFile.from_segment_list_dict(sci_seg_name,
sci_seg_dict, extension='xml',
valid_segment=workflow.analysis_time,
seg_summ_dict=sci_seg_summ_dict,
directory=out_dir, tags=tags)
logging.info('Done generating science segments')
return sci_seg_file, sci_segs, sci_seg_name
[docs]def get_files_for_vetoes(workflow, out_dir,
runtime_names=None, in_workflow_names=None, tags=None):
"""
Get the various sets of veto segments that will be used in this analysis.
Parameters
-----------
workflow : Workflow object
Instance of the workflow object
out_dir : path
Location to store output files
runtime_names : list
Veto category groups with these names in the [workflow-segment] section
of the ini file will be generated now.
in_workflow_names : list
Veto category groups with these names in the [workflow-segment] section
of the ini file will be generated in the workflow. If a veto category
appears here and in runtime_names, it will be generated now.
tags : list of strings
Used to retrieve subsections of the ini file for
configuration options.
Returns
--------
veto_seg_files : FileList
List of veto segment files generated
"""
if tags is None:
tags = []
if runtime_names is None:
runtime_names = []
if in_workflow_names is None:
in_workflow_names = []
logging.info('Starting generating veto files for analysis')
make_analysis_dir(out_dir)
start_time = workflow.analysis_time[0]
end_time = workflow.analysis_time[1]
save_veto_definer(workflow.cp, out_dir, tags)
now_cat_sets = []
for name in runtime_names:
cat_sets = parse_cat_ini_opt(workflow.cp.get_opt_tags(
'workflow-segments', name, tags))
now_cat_sets.extend(cat_sets)
now_cats = set()
for cset in now_cat_sets:
now_cats = now_cats.union(cset)
later_cat_sets = []
for name in in_workflow_names:
cat_sets = parse_cat_ini_opt(workflow.cp.get_opt_tags(
'workflow-segments', name, tags))
later_cat_sets.extend(cat_sets)
later_cats = set()
for cset in later_cat_sets:
later_cats = later_cats.union(cset)
# Avoid duplication
later_cats = later_cats - now_cats
veto_gen_job = create_segs_from_cats_job(workflow.cp, out_dir,
workflow.ifo_string, tags=tags)
cat_files = FileList()
for ifo in workflow.ifos:
for category in now_cats:
cat_files.append(get_veto_segs(workflow, ifo,
cat_to_veto_def_cat(category),
start_time, end_time, out_dir,
veto_gen_job, execute_now=True,
tags=tags))
for category in later_cats:
cat_files.append(get_veto_segs(workflow, ifo,
cat_to_veto_def_cat(category),
start_time, end_time, out_dir,
veto_gen_job, tags=tags,
execute_now=False))
logging.info('Done generating veto segments')
return cat_files
[docs]def get_analyzable_segments(workflow, sci_segs, cat_files, out_dir, tags=None):
"""
Get the analyzable segments after applying ini specified vetoes and any
other restrictions on the science segs, e.g. a minimum segment length, or
demanding that only coincident segments are analysed.
Parameters
-----------
workflow : Workflow object
Instance of the workflow object
sci_segs : Ifo-keyed dictionary of glue.segmentlists
The science segments for each ifo to which the vetoes, or any other
restriction, will be applied.
cat_files : FileList of SegFiles
The category veto files generated by get_veto_segs
out_dir : path
Location to store output files
tags : list of strings
Used to retrieve subsections of the ini file for
configuration options.
Returns
--------
sci_ok_seg_file : workflow.core.SegFile instance
The segment file combined from all ifos containing the analyzable
science segments.
sci_ok_segs : Ifo keyed dict of ligo.segments.segmentlist instances
The analyzable science segs for each ifo, keyed by ifo
sci_ok_seg_name : str
The name with which analyzable science segs are stored in the output
XML file.
"""
if tags is None:
tags = []
logging.info('Starting reducing to analysable science segments')
make_analysis_dir(out_dir)
# NOTE: Should this be overrideable in the config file?
sci_ok_seg_name = "SCIENCE_OK"
sci_ok_seg_dict = segments.segmentlistdict()
sci_ok_segs = {}
cat_sets = parse_cat_ini_opt(workflow.cp.get_opt_tags('workflow-segments',
'segments-science-veto', tags))
if len(cat_sets) > 1:
raise ValueError('Provide only 1 category group to determine'
' analyzable segments')
cat_set = cat_sets[0]
for ifo in workflow.ifos:
curr_segs = copy.copy(sci_segs[ifo])
files = cat_files.find_output_with_ifo(ifo)
for category in cat_set:
veto_def_cat = cat_to_veto_def_cat(category)
file_list = files.find_output_with_tag('VETO_CAT%d' %(veto_def_cat))
if len(file_list) > 1:
err_msg = "Found more than one veto file for %s " %(ifo,)
err_msg += "and category %s." %(category,)
raise ValueError(err_msg)
if len(file_list) == 0:
err_msg = "Found no veto files for %s " %(ifo,)
err_msg += "and category %s." %(category,)
raise ValueError(err_msg)
curr_veto_file = file_list[0]
cat_segs = curr_veto_file.return_union_seglist()
curr_segs -= cat_segs
curr_segs.coalesce()
sci_ok_seg_dict[ifo + ':' + sci_ok_seg_name] = curr_segs
sci_ok_seg_file = SegFile.from_segment_list_dict(sci_ok_seg_name,
sci_ok_seg_dict, extension='xml',
valid_segment=workflow.analysis_time,
directory=out_dir, tags=tags)
if workflow.cp.has_option_tags("workflow-segments",
"segments-minimum-segment-length", tags):
min_seg_length = int( workflow.cp.get_opt_tags("workflow-segments",
"segments-minimum-segment-length", tags) )
sci_ok_seg_file.remove_short_sci_segs(min_seg_length)
# FIXME: Another test we can do is limit to coinc time +/- some window
# this should *not* be set through segments-method, but currently
# is not implemented
#segments_method = workflow.cp.get_opt_tags("workflow-segments",
# "segments-method", tags)
#if segments_method == 'ALL_SINGLE_IFO_TIME':
# pass
#elif segments_method == 'COINC_TIME':
# cum_segs = None
# for ifo in sci_segs:
# if cum_segs is not None:
# cum_segs = (cum_segs & sci_segs[ifo]).coalesce()
# else:
# cum_segs = sci_segs[ifo]
#
# for ifo in sci_segs:
# sci_segs[ifo] = cum_segs
#else:
# raise ValueError("Invalid segments-method, %s. Options are "
# "ALL_SINGLE_IFO_TIME and COINC_TIME" % segments_method)
for ifo in workflow.ifos:
sci_ok_segs[ifo] = \
sci_ok_seg_file.segment_dict[ifo + ':' + sci_ok_seg_name]
logging.info('Done generating analyzable science segments')
return sci_ok_seg_file, sci_ok_segs, sci_ok_seg_name
[docs]def get_cumulative_veto_group_files(workflow, option, cat_files,
out_dir, execute_now=True, tags=None):
"""
Get the cumulative veto files that define the different backgrounds
we want to analyze, defined by groups of vetos.
Parameters
-----------
workflow : Workflow object
Instance of the workflow object
option : str
ini file option to use to get the veto groups
cat_files : FileList of SegFiles
The category veto files generated by get_veto_segs
out_dir : path
Location to store output files
execute_now : Boolean
If true outputs are generated at runtime. Else jobs go into the workflow
and are generated then.
tags : list of strings
Used to retrieve subsections of the ini file for
configuration options.
Returns
--------
seg_files : workflow.core.FileList instance
The cumulative segment files for each veto group.
names : list of strings
The segment names for the corresponding seg_file
cat_files : workflow.core.FileList instance
The list of individual category veto files
"""
if tags is None:
tags = []
logging.info("Starting generating vetoes for groups in %s" %(option))
make_analysis_dir(out_dir)
cat_sets = parse_cat_ini_opt(workflow.cp.get_opt_tags('workflow-segments',
option, tags))
cum_seg_files = FileList()
names = []
for cat_set in cat_sets:
segment_name = "CUMULATIVE_CAT_%s" % (''.join(sorted(cat_set)))
logging.info('getting information for %s' % segment_name)
categories = [cat_to_veto_def_cat(c) for c in cat_set]
cum_seg_files += [get_cumulative_segs(workflow, categories, cat_files,
out_dir, execute_now=execute_now,
segment_name=segment_name, tags=tags)]
names.append(segment_name)
logging.info("Done generating vetoes for groups in %s" %(option))
return cum_seg_files, names, cat_files
[docs]def setup_segment_generation(workflow, out_dir, tag=None):
"""
This function is the gateway for setting up the segment generation steps in a
workflow. It is designed to be able to support multiple ways of obtaining
these segments and to combine/edit such files as necessary for analysis.
The current modules have the capability to generate files at runtime or to
generate files that are not needed for workflow generation within the workflow.
Parameters
-----------
workflow : pycbc.workflow.core.Workflow
The workflow instance that the coincidence jobs will be added to.
This instance also contains the ifos for which to attempt to obtain
segments for this analysis and the start and end times to search for
segments over.
out_dir : path
The directory in which output will be stored.
tag : string, optional (default=None)
Use this to specify a tag. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]). This
is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
FIXME: Filenames may not be unique with current codes!
Returns
-------
segsToAnalyse : dictionay of ifo-keyed glue.segment.segmentlist instances
This will contain the times that your code should analyse. By default this
is science time - CAT_1 vetoes. (This default could be changed if desired)
segFilesList : pycbc.workflow.core.FileList of SegFile instances
These are representations of the various segment files that were constructed
at this stage of the workflow and may be needed at later stages of the
analysis (e.g. for performing DQ vetoes). If the file was generated at
run-time the segment lists contained within these files will be an attribute
of the instance. (If it will be generated in the workflow it will not be
because I am not psychic).
"""
logging.info("Entering segment generation module")
make_analysis_dir(out_dir)
cp = workflow.cp
# Parse for options in ini file
segmentsMethod = cp.get_opt_tags("workflow-segments",
"segments-method", [tag])
# These only needed if calling setup_segment_gen_mixed
if segmentsMethod in ['AT_RUNTIME','CAT2_PLUS_DAG','CAT3_PLUS_DAG',
'CAT4_PLUS_DAG']:
veto_cats = cp.get_opt_tags("workflow-segments",
"segments-veto-categories", [tag])
max_veto_cat = max([int(c) for c in veto_cats.split(',')])
veto_categories = range(1, max_veto_cat + 1)
if cp.has_option_tags("workflow-segments",
"segments-generate-coincident-segments", [tag]):
generate_coincident_segs = True
else:
generate_coincident_segs = False
# Need to curl the veto-definer file
vetoDefUrl = cp.get_opt_tags("workflow-segments",
"segments-veto-definer-url", [tag])
vetoDefBaseName = os.path.basename(vetoDefUrl)
vetoDefNewPath = os.path.join(out_dir, vetoDefBaseName)
resolve_url(vetoDefUrl,out_dir)
# and update location
cp.set("workflow-segments", "segments-veto-definer-file",
vetoDefNewPath)
if cp.has_option_tags("workflow-segments",
"segments-minimum-segment-length", [tag]):
minSegLength = int( cp.get_opt_tags("workflow-segments",
"segments-minimum-segment-length", [tag]) )
else:
minSegLength = 0
if segmentsMethod == "AT_RUNTIME":
max_veto = 1000
elif segmentsMethod == "CAT2_PLUS_DAG":
max_veto = 1
elif segmentsMethod == "CAT3_PLUS_DAG":
max_veto = 2
elif segmentsMethod == "CAT4_PLUS_DAG":
max_veto = 3
else:
msg = "Entry segments-method in [workflow-segments] does not have "
msg += "expected value. Valid values are AT_RUNTIME, CAT4_PLUS_DAG, "
msg += "CAT2_PLUS_DAG or CAT3_PLUS_DAG."
raise ValueError(msg)
logging.info("Generating segments with setup_segment_gen_mixed")
segFilesList = setup_segment_gen_mixed(workflow, veto_categories,
out_dir, max_veto, tag=tag,
generate_coincident_segs=generate_coincident_segs)
logging.info("Segments obtained")
# This creates the segsToAnalyse from the segFilesList. Currently it uses
# the 'SCIENCE_OK' segFilesList, which is science - CAT_1 in
# setup_segment_gen_mixed.
# This also applies the minimum science length
segsToAnalyse = {}
for ifo in workflow.ifos:
analSegs = segFilesList.find_output_with_ifo(ifo)
analSegs = analSegs.find_output_with_tag('SCIENCE_OK')
assert len(analSegs) == 1
analSegs = analSegs[0]
if analSegs.segment_list:
if minSegLength:
analSegs.remove_short_sci_segs(minSegLength)
analSegs.to_segment_xml(override_file_if_exists=True)
segsToAnalyse[ifo] = analSegs.segment_list
else:
msg = "No science segments found for ifo %s. " %(ifo)
msg += "If this is unexpected check the files that were dumped "
msg += "in the %s directory. Also the " %(out_dir)
msg += "commands that can be used to reproduce some of these "
msg += "in %s/*.sh" %(os.path.join(out_dir,'logs'))
logging.warn(msg)
logging.info("Leaving segment generation module")
return segsToAnalyse, segFilesList
[docs]def setup_segment_gen_mixed(workflow, veto_categories, out_dir,
maxVetoAtRunTime, tag=None,
generate_coincident_segs=True):
"""
This function will generate veto files for each ifo and for each veto
category.
It can generate these vetoes at run-time or in the workflow (or do some at
run-time and some in the workflow). However, the CAT_1 vetoes and science
time must be generated at run time as they are needed to plan the workflow.
CATs 2 and higher *may* be needed for other workflow construction.
It can also combine these files to create a set of cumulative,
multi-detector veto files, which can be used in ligolw_thinca and in
pipedown. Again these can be created at run time or within the workflow.
Parameters
-----------
workflow : pycbc.workflow.core.Workflow
The Workflow instance that the coincidence jobs will be added to.
This instance also contains the ifos for which to attempt to obtain
segments for this analysis and the start and end times to search for
segments over.
veto_categories : list of ints
List of veto categories to generate segments for. If this stops being
integers, this can be changed here.
out_dir : path
The directory in which output will be stored.
maxVetoAtRunTime : int
Generate veto files at run time up to this category. Veto categories
beyond this in veto_categories will be generated in the workflow.
If we move to a model where veto
categories are not explicitly cumulative, this will be rethought.
tag : string, optional (default=None)
Use this to specify a tag. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]). This
is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
FIXME: Filenames may not be unique with current codes!
generate_coincident_segs : boolean, optional (default = True)
If given this module will generate a set of coincident, cumulative veto
files that can be used with ligolw_thinca and pipedown.
Returns
-------
segFilesList : dictionary of pycbc.workflow.core.SegFile instances
These are representations of the various segment files that were
constructed
at this stage of the workflow and may be needed at later stages of the
analysis (e.g. for performing DQ vetoes). If the file was generated at
run-time the segment lists contained within these files will be an
attribute
of the instance. (If it will be generated in the workflow it will
not be because I am not psychic).
"""
cp = workflow.cp
segFilesList = FileList([])
start_time = workflow.analysis_time[0]
end_time = workflow.analysis_time[1]
segValidSeg = workflow.analysis_time
# Will I need to add some jobs to the workflow?
vetoGenJob = create_segs_from_cats_job(cp, out_dir, workflow.ifo_string)
for ifo in workflow.ifos:
logging.info("Generating science segments for ifo %s" %(ifo))
currSciSegs, currSciXmlFile, _ = get_sci_segs_for_ifo(ifo, cp,
start_time, end_time, out_dir, tags=tag)
segFilesList.append(currSciXmlFile)
for category in veto_categories:
if category > maxVetoAtRunTime:
msg = "Adding creation of CAT_%d segments " %(category)
msg += "for ifo %s to workflow." %(ifo)
logging.info(msg)
execute_status = False
if category <= maxVetoAtRunTime:
logging.info("Generating CAT_%d segments for ifo %s." \
%(category,ifo))
execute_status = True
currVetoXmlFile = get_veto_segs(workflow, ifo, category,
start_time, end_time, out_dir,
vetoGenJob,
execute_now=execute_status)
segFilesList.append(currVetoXmlFile)
# Store the CAT_1 veto segs for use below
if category == 1:
cat1Segs = currVetoXmlFile.return_union_seglist()
analysedSegs = currSciSegs - cat1Segs
analysedSegs.coalesce()
analysedSegDict = segments.segmentlistdict()
analysedSegDict[ifo + ':SCIENCE_OK'] = analysedSegs
analysedXmlFile = os.path.join(out_dir,
"%s-SCIENCE_OK_SEGMENTS.xml" %(ifo.upper()) )
currUrl = urlunparse(['file', 'localhost', analysedXmlFile,
None, None, None])
if tag:
currTags = [tag, 'SCIENCE_OK']
else:
currTags = ['SCIENCE_OK']
currFile = SegFile(ifo, 'SEGMENTS', analysedSegs,
segment_dict=analysedSegDict, file_url=currUrl,
tags=currTags)
segFilesList.append(currFile)
currFile.to_segment_xml()
if generate_coincident_segs:
# Need to make some combined category veto files to use when vetoing
# segments and triggers.
ifo_string = workflow.ifo_string
categories = []
cum_cat_files = []
for category in veto_categories:
categories.append(category)
# Set file name in workflow standard
if tag:
currTags = [tag]
else:
currTags = []
# And actually make the file (or queue it in the workflow)
logging.info("Generating combined, cumulative CAT_%d segments."\
%(category))
if category <= maxVetoAtRunTime:
execute_status = True
else:
execute_status = False
currSegFile = get_cumulative_segs(workflow, categories,
segFilesList, out_dir,
execute_now=execute_status, tags=currTags)
segFilesList.append(currSegFile)
cum_cat_files.append(currSegFile)
# Create a combined file
# Set file tag in workflow standard
if tag:
currTags = [tag, 'COMBINED_CUMULATIVE_SEGMENTS']
else:
currTags = ['COMBINED_CUMULATIVE_SEGMENTS']
combined_veto_file = os.path.join(out_dir,
'%s-CUMULATIVE_ALL_CATS_SEGMENTS.xml' \
%(ifo_string) )
curr_url = urlunparse(['file', 'localhost',
combined_veto_file, None, None, None])
curr_file = SegFile(ifo_string, 'SEGMENTS', segValidSeg,
file_url=curr_url, tags=currTags)
for category in veto_categories:
if category <= maxVetoAtRunTime:
execute_status = True
break
else:
execute_status = False
add_cumulative_files(workflow, curr_file, cum_cat_files, out_dir,
execute_now=execute_status)
segFilesList.append(curr_file)
return segFilesList
[docs]def get_sci_segs_for_ifo(ifo, cp, start_time, end_time, out_dir, tags=None):
"""
Obtain science segments for the selected ifo
Parameters
-----------
ifo : string
The string describing the ifo to obtain science times for.
start_time : gps time (either int/LIGOTimeGPS)
The time at which to begin searching for segments.
end_time : gps time (either int/LIGOTimeGPS)
The time at which to stop searching for segments.
out_dir : path
The directory in which output will be stored.
tag : string, optional (default=None)
Use this to specify a tag. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]).
This is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
Returns
--------
sci_segs : ligo.segments.segmentlist
The segmentlist generated by this call
sci_xml_file : pycbc.workflow.core.SegFile
The workflow File object corresponding to this science segments file.
out_sci_seg_name : string
The name of the output segment list in the output XML file.
"""
if tags is None:
tags = []
seg_valid_seg = segments.segment([start_time,end_time])
sci_seg_name = cp.get_opt_tags(
"workflow-segments", "segments-%s-science-name" %(ifo.lower()), tags)
sci_seg_url = cp.get_opt_tags(
"workflow-segments", "segments-database-url", tags)
# NOTE: ligolw_segment_query returns slightly strange output. The output
# segment list is put in with name "RESULT". So this is hardcoded here
out_sci_seg_name = "RESULT"
if tags:
sci_xml_file_path = os.path.join(
out_dir, "%s-SCIENCE_SEGMENTS_%s.xml" \
%(ifo.upper(), '_'.join(tags)))
tag_list=tags + ['SCIENCE']
else:
sci_xml_file_path = os.path.join(
out_dir, "%s-SCIENCE_SEGMENTS.xml" %(ifo.upper()) )
tag_list = ['SCIENCE']
if file_needs_generating(sci_xml_file_path, cp, tags=tags):
seg_find_call = [ resolve_url(cp.get("executables","segment_query"),
permissions=stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR),
"--query-segments",
"--segment-url", sci_seg_url,
"--gps-start-time", str(start_time),
"--gps-end-time", str(end_time),
"--include-segments", sci_seg_name,
"--output-file", sci_xml_file_path ]
make_external_call(seg_find_call, out_dir=os.path.join(out_dir,'logs'),
out_basename='%s-science-call' %(ifo.lower()) )
# Yes its yucky to generate a file and then read it back in.
sci_xml_file_path = os.path.abspath(sci_xml_file_path)
sci_xml_file = SegFile.from_segment_xml(sci_xml_file_path, tags=tag_list,
valid_segment=seg_valid_seg)
# NOTE: ligolw_segment_query returns slightly strange output. The output
# segment_summary output does not use RESULT. Therefore move the
# segment_summary across.
sci_xml_file.seg_summ_dict[ifo.upper() + ":" + out_sci_seg_name] = \
sci_xml_file.seg_summ_dict[':'.join(sci_seg_name.split(':')[0:2])]
sci_segs = sci_xml_file.return_union_seglist()
return sci_segs, sci_xml_file, out_sci_seg_name
[docs]def get_veto_segs(workflow, ifo, category, start_time, end_time, out_dir,
veto_gen_job, tags=None, execute_now=False):
"""
Obtain veto segments for the selected ifo and veto category and add the job
to generate this to the workflow.
Parameters
-----------
workflow: pycbc.workflow.core.Workflow
An instance of the Workflow class that manages the workflow.
ifo : string
The string describing the ifo to generate vetoes for.
category : int
The veto category to generate vetoes for.
start_time : gps time (either int/LIGOTimeGPS)
The time at which to begin searching for segments.
end_time : gps time (either int/LIGOTimeGPS)
The time at which to stop searching for segments.
out_dir : path
The directory in which output will be stored.
vetoGenJob : Job
The veto generation Job class that will be used to create the Node.
tag : string, optional (default=None)
Use this to specify a tag. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]).
This is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
FIXME: Filenames may not be unique with current codes!
execute_now : boolean, optional
If true, jobs are executed immediately. If false, they are added to the
workflow to be run later.
Returns
--------
veto_def_file : pycbc.workflow.core.SegFile
The workflow File object corresponding to this DQ veto file.
"""
if tags is None:
tags = []
seg_valid_seg = segments.segment([start_time,end_time])
# FIXME: This job needs an internet connection and X509_USER_PROXY
# For internet connection, it may need a headnode (ie universe local)
# For X509_USER_PROXY, I don't know what pegasus is doing
node = Node(veto_gen_job)
node.add_opt('--veto-categories', str(category))
node.add_opt('--ifo-list', ifo)
node.add_opt('--gps-start-time', str(start_time))
node.add_opt('--gps-end-time', str(end_time))
if tags:
veto_xml_file_name = "%s-VETOTIME_CAT%d_%s-%d-%d.xml" \
%(ifo, category, '_'.join(tags), start_time,
end_time-start_time)
else:
veto_xml_file_name = "%s-VETOTIME_CAT%d-%d-%d.xml" \
%(ifo, category, start_time, end_time-start_time)
veto_xml_file_path = os.path.abspath(os.path.join(out_dir,
veto_xml_file_name))
curr_url = urlunparse(['file', 'localhost',
veto_xml_file_path, None, None, None])
if tags:
curr_tags = tags + ['VETO_CAT%d' %(category)]
else:
curr_tags = ['VETO_CAT%d' %(category)]
if file_needs_generating(veto_xml_file_path, workflow.cp, tags=tags):
if execute_now:
workflow.execute_node(node, verbatim_exe = True)
veto_xml_file = SegFile.from_segment_xml(veto_xml_file_path,
tags=curr_tags,
valid_segment=seg_valid_seg)
else:
veto_xml_file = SegFile(ifo, 'SEGMENTS', seg_valid_seg,
file_url=curr_url, tags=curr_tags)
node._add_output(veto_xml_file)
workflow.add_node(node)
else:
node.executed = True
for fil in node._outputs:
fil.node = None
veto_xml_file = SegFile.from_segment_xml(veto_xml_file_path,
tags=curr_tags,
valid_segment=seg_valid_seg)
return veto_xml_file
[docs]def create_segs_from_cats_job(cp, out_dir, ifo_string, tags=None):
"""
This function creates the CondorDAGJob that will be used to run
ligolw_segments_from_cats as part of the workflow
Parameters
-----------
cp : pycbc.workflow.configuration.WorkflowConfigParser
The in-memory representation of the configuration (.ini) files
out_dir : path
Directory in which to put output files
ifo_string : string
String containing all active ifos, ie. "H1L1V1"
tag : list of strings, optional (default=None)
Use this to specify a tag(s). This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]).
This is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
FIXME: Filenames may not be unique with current codes!
Returns
--------
job : Job instance
The Job instance that will run segments_from_cats jobs
"""
if tags is None:
tags = []
seg_server_url = cp.get_opt_tags("workflow-segments",
"segments-database-url", tags)
veto_def_file = cp.get_opt_tags("workflow-segments",
"segments-veto-definer-file", tags)
job = Executable(cp, 'segments_from_cats', universe='local',
ifos=ifo_string, out_dir=out_dir, tags=tags)
job.add_opt('--separate-categories')
job.add_opt('--segment-url', seg_server_url)
job.add_opt('--veto-file', veto_def_file)
# FIXME: Would like the proxy in the Workflow instance
# FIXME: Explore using the x509 condor commands
# If the user has a proxy set in the environment, add it to the job
return job
[docs]def get_cumulative_segs(workflow, categories, seg_files_list, out_dir,
tags=None, execute_now=False, segment_name=None):
"""
Function to generate one of the cumulative, multi-detector segment files
as part of the workflow.
Parameters
-----------
workflow: pycbc.workflow.core.Workflow
An instance of the Workflow class that manages the workflow.
categories : int
The veto categories to include in this cumulative veto.
seg_files_list : Listionary of SegFiles
The list of segment files to be used as input for combining.
out_dir : path
The directory to write output to.
tags : list of strings, optional
A list of strings that is used to identify this job
execute_now : boolean, optional
If true, jobs are executed immediately. If false, they are added to the
workflow to be run later.
segment_name : str
The name of the combined, cumulative segments in the output file.
"""
if tags is None:
tags = []
add_inputs = FileList([])
valid_segment = workflow.analysis_time
if segment_name is None:
segment_name = 'VETO_CAT%d_CUMULATIVE' % (categories[-1])
cp = workflow.cp
# calculate the cumulative veto files for a given ifo
for ifo in workflow.ifos:
cum_job = LigoLWCombineSegsExecutable(cp, 'ligolw_combine_segments',
out_dir=out_dir, tags=[segment_name]+tags, ifos=ifo)
inputs = []
files = seg_files_list.find_output_with_ifo(ifo)
for category in categories:
file_list = files.find_output_with_tag('VETO_CAT%d' %(category))
inputs+=file_list
cum_node = cum_job.create_node(valid_segment, inputs, segment_name)
if file_needs_generating(cum_node.output_files[0].cache_entry.path,
workflow.cp, tags=tags):
if execute_now:
workflow.execute_node(cum_node)
else:
workflow.add_node(cum_node)
else:
cum_node.executed = True
for fil in cum_node._outputs:
fil.node = None
fil.PFN(urljoin('file:', pathname2url(fil.storage_path)),
site='local')
add_inputs += cum_node.output_files
# add cumulative files for each ifo together
name = '%s_VETO_SEGMENTS' %(segment_name)
outfile = File(workflow.ifos, name, workflow.analysis_time,
directory=out_dir, extension='xml',
tags=[segment_name] + tags)
add_job = LigolwAddExecutable(cp, 'llwadd', ifos=ifo, out_dir=out_dir,
tags=tags)
add_node = add_job.create_node(valid_segment, add_inputs, output=outfile)
if file_needs_generating(add_node.output_files[0].cache_entry.path,
workflow.cp, tags=tags):
if execute_now:
workflow.execute_node(add_node)
else:
workflow.add_node(add_node)
else:
add_node.executed = True
for fil in add_node._outputs:
fil.node = None
fil.PFN(urljoin('file:', pathname2url(fil.storage_path)),
site='local')
return outfile
[docs]def add_cumulative_files(workflow, output_file, input_files, out_dir,
execute_now=False, tags=None):
"""
Function to combine a set of segment files into a single one. This function
will not merge the segment lists but keep each separate.
Parameters
-----------
workflow: pycbc.workflow.core.Workflow
An instance of the Workflow class that manages the workflow.
output_file: pycbc.workflow.core.File
The output file object
input_files: pycbc.workflow.core.FileList
This list of input segment files
out_dir : path
The directory to write output to.
execute_now : boolean, optional
If true, jobs are executed immediately. If false, they are added to the
workflow to be run later.
tags : list of strings, optional
A list of strings that is used to identify this job
"""
if tags is None:
tags = []
llwadd_job = LigolwAddExecutable(workflow.cp, 'llwadd',
ifo=output_file.ifo_list, out_dir=out_dir, tags=tags)
add_node = llwadd_job.create_node(output_file.segment, input_files,
output=output_file)
if file_needs_generating(add_node.output_files[0].cache_entry.path,
workflow.cp, tags=tags):
if execute_now:
workflow.execute_node(add_node)
else:
workflow.add_node(add_node)
else:
add_node.executed = True
for fil in add_node._outputs:
fil.node = None
fil.PFN(urljoin('file:', pathname2url(fil.storage_path)),
site='local')
return add_node.output_files[0]
[docs]def find_playground_segments(segs):
'''Finds playground time in a list of segments.
Playground segments include the first 600s of every 6370s stride starting
at GPS time 729273613.
Parameters
----------
segs : segmentfilelist
A segmentfilelist to find playground segments.
Returns
-------
outlist : segmentfilelist
A segmentfilelist with all playground segments during the input
segmentfilelist (ie. segs).
'''
# initializations
start_s2 = 729273613
playground_stride = 6370
playground_length = 600
outlist = segments.segmentlist()
# loop over segments
for seg in segs:
start = seg[0]
end = seg[1]
# the first playground segment whose end is after the start of seg
playground_start = start_s2 + playground_stride * ( 1 + \
int(start-start_s2-playground_length) / playground_stride)
while playground_start < end:
# find start of playground segment
if playground_start > start:
ostart = playground_start
else:
ostart = start
playground_end = playground_start + playground_length
# find end of playground segment
if playground_end < end:
oend = playground_end
else:
oend = end
# append segment
x = segments.segment(ostart, oend)
outlist.append(x)
# increment
playground_start = playground_start + playground_stride
return outlist
[docs]def get_triggered_coherent_segment(workflow, sciencesegs):
"""
Construct the coherent network on and off source segments. Can switch to
construction of segments for a single IFO search when coherent segments
are insufficient for a search.
Parameters
-----------
workflow : pycbc.workflow.core.Workflow
The workflow instance that the calculated segments belong to.
sciencesegs : dict
Dictionary of all science segments within analysis time.
Returns
--------
onsource : ligo.segments.segmentlistdict
A dictionary containing the on source segments for network IFOs
offsource : ligo.segments.segmentlistdict
A dictionary containing the off source segments for network IFOs
"""
# Load parsed workflow config options
cp = workflow.cp
triggertime = int(os.path.basename(cp.get('workflow', 'trigger-time')))
minduration = int(os.path.basename(cp.get('workflow-exttrig_segments',
'min-duration')))
maxduration = int(os.path.basename(cp.get('workflow-exttrig_segments',
'max-duration')))
onbefore = int(os.path.basename(cp.get('workflow-exttrig_segments',
'on-before')))
onafter = int(os.path.basename(cp.get('workflow-exttrig_segments',
'on-after')))
padding = int(os.path.basename(cp.get('workflow-exttrig_segments',
'pad-data')))
if cp.has_option("workflow-condition_strain", "do-gating"):
padding += int(os.path.basename(cp.get("condition_strain",
"pad-data")))
quanta = int(os.path.basename(cp.get('workflow-exttrig_segments',
'quanta')))
# Check available data segments meet criteria specified in arguments
commonsegs = sciencesegs.extract_common(sciencesegs.keys())
offsrclist = commonsegs[tuple(commonsegs.keys())[0]]
if len(offsrclist) > 1:
logging.info("Removing network segments that do not contain trigger "
"time")
for seg in offsrclist:
if triggertime in seg:
offsrc = seg
else:
offsrc = offsrclist[0]
if abs(offsrc) < minduration + 2 * padding:
fail = segments.segment([triggertime - minduration / 2. - padding,
triggertime + minduration / 2. + padding])
logging.warning("Available network segment shorter than minimum "
"allowed duration.")
return None, fail
# Will segment duration be the maximum desired length or not?
if abs(offsrc) >= maxduration + 2 * padding:
logging.info("Available network science segment duration (%ds) is "
"greater than the maximum allowed segment length (%ds). "
"Truncating..." % (abs(offsrc), maxduration))
else:
logging.info("Available network science segment duration (%ds) is "
"less than the maximum allowed segment length (%ds)."
% (abs(offsrc), maxduration))
logging.info("%ds of padding applied at beginning and end of segment."
% padding)
# Construct on-source
onstart = triggertime - onbefore
onend = triggertime + onafter
oncentre = onstart + ((onbefore + onafter) / 2)
onsrc = segments.segment(onstart, onend)
logging.info("Constructed ON-SOURCE: duration %ds (%ds before to %ds after"
" trigger)."
% (abs(onsrc), triggertime - onsrc[0],
onsrc[1] - triggertime))
onsrc = segments.segmentlist([onsrc])
# Maximal, centred coherent network segment
idealsegment = segments.segment(int(oncentre - padding -
0.5 * maxduration),
int(oncentre + padding +
0.5 * maxduration))
# Construct off-source
if (idealsegment in offsrc):
offsrc = idealsegment
elif idealsegment[1] not in offsrc:
offsrc &= segments.segment(offsrc[1] - maxduration - 2 * padding,
offsrc[1])
elif idealsegment[0] not in offsrc:
offsrc &= segments.segment(offsrc[0],
offsrc[0] + maxduration + 2 * padding)
# Trimming off-source
excess = (abs(offsrc) - 2 * padding) % quanta
if excess != 0:
logging.info("Trimming %ds excess time to make OFF-SOURCE duration a "
"multiple of %ds" % (excess, quanta))
offset = (offsrc[0] + abs(offsrc) / 2.) - oncentre
if 2 * abs(offset) > excess:
if offset < 0:
offsrc &= segments.segment(offsrc[0] + excess,
offsrc[1])
elif offset > 0:
offsrc &= segments.segment(offsrc[0],
offsrc[1] - excess)
assert abs(offsrc) % quanta == 2 * padding
else:
logging.info("This will make OFF-SOURCE symmetrical about trigger "
"time.")
start = int(offsrc[0] - offset + excess / 2)
end = int(offsrc[1] - offset - round(float(excess) / 2))
offsrc = segments.segment(start, end)
assert abs(offsrc) % quanta == 2 * padding
logging.info("Constructed OFF-SOURCE: duration %ds (%ds before to %ds "
"after trigger)."
% (abs(offsrc) - 2 * padding,
triggertime - offsrc[0] - padding,
offsrc[1] - triggertime - padding))
offsrc = segments.segmentlist([offsrc])
# Put segments into segmentlistdicts
onsource = segments.segmentlistdict()
offsource = segments.segmentlistdict()
ifos = ''
for iifo in sciencesegs.keys():
ifos += str(iifo)
onsource[iifo] = onsrc
offsource[iifo] = offsrc
return onsource, offsource
[docs]def generate_triggered_segment(workflow, out_dir, sciencesegs):
cp = workflow.cp
if cp.has_option("workflow", "allow-single-ifo-search"):
min_ifos = 1
else:
min_ifos = 2
triggertime = int(os.path.basename(cp.get('workflow', 'trigger-time')))
minbefore = int(os.path.basename(cp.get('workflow-exttrig_segments',
'min-before')))
minafter = int(os.path.basename(cp.get('workflow-exttrig_segments',
'min-after')))
minduration = int(os.path.basename(cp.get('workflow-exttrig_segments',
'min-duration')))
onbefore = int(os.path.basename(cp.get('workflow-exttrig_segments',
'on-before')))
onafter = int(os.path.basename(cp.get('workflow-exttrig_segments',
'on-after')))
padding = int(os.path.basename(cp.get('workflow-exttrig_segments',
'pad-data')))
if cp.has_option("workflow-condition_strain", "do-gating"):
padding += int(os.path.basename(cp.get("condition_strain",
"pad-data")))
# How many IFOs meet minimum data requirements?
min_seg = segments.segment(triggertime - onbefore - minbefore - padding,
triggertime + onafter + minafter + padding)
scisegs = segments.segmentlistdict({ifo: sciencesegs[ifo]
for ifo in sciencesegs.keys() if min_seg in sciencesegs[ifo]
and abs(sciencesegs[ifo]) >= minduration})
# Find highest number of IFOs that give an acceptable coherent segment
num_ifos = len(scisegs.keys())
while num_ifos >= min_ifos:
# Consider all combinations for a given number of IFOs
ifo_combos = itertools.combinations(scisegs.keys(), num_ifos)
onsource = {}
offsource = {}
for ifo_combo in ifo_combos:
ifos = "".join(ifo_combo)
logging.info("Calculating optimal segment for %s.", ifos)
segs = segments.segmentlistdict({ifo: scisegs[ifo]
for ifo in ifo_combo})
onsource[ifos], offsource[ifos] = get_triggered_coherent_segment(\
workflow, segs)
# Which combination gives the longest coherent segment?
valid_combs = [iifos for iifos in onsource.keys()
if onsource[iifos] is not None]
if len(valid_combs) == 0:
# If none, offsource dict will contain segments showing criteria
# that have not been met, for use in plotting
if len(offsource.keys()) > 1:
seg_lens = {ifos: abs(next(offsource[ifos].values())[0])
for ifos in offsource.keys()}
best_comb = max(seg_lens.iterkeys(),
key=(lambda key: seg_lens[key]))
else:
best_comb = tuple(offsource.keys())[0]
logging.info("No combination of %d IFOs with suitable science "
"segment.", num_ifos)
else:
# Identify best analysis segment
if len(valid_combs) > 1:
seg_lens = {ifos: abs(next(offsource[ifos].values())[0])
for ifos in valid_combs}
best_comb = max(seg_lens.iterkeys(),
key=(lambda key: seg_lens[key]))
else:
best_comb = valid_combs[0]
logging.info("Calculated science segments.")
offsourceSegfile = os.path.join(out_dir, "offSourceSeg.txt")
segmentsUtils.tosegwizard(open(offsourceSegfile, "w"),
list(offsource[best_comb].values())[0])
onsourceSegfile = os.path.join(out_dir, "onSourceSeg.txt")
segmentsUtils.tosegwizard(file(onsourceSegfile, "w"),
list(onsource[best_comb].values())[0])
bufferleft = int(cp.get('workflow-exttrig_segments',
'num-buffer-before'))
bufferright = int(cp.get('workflow-exttrig_segments',
'num-buffer-after'))
onlen = onbefore + onafter
bufferSegment = segments.segment(\
triggertime - onbefore - bufferleft * onlen,
triggertime + onafter + bufferright * onlen)
bufferSegfile = os.path.join(out_dir, "bufferSeg.txt")
segmentsUtils.tosegwizard(file(bufferSegfile, "w"),
segments.segmentlist([bufferSegment]))
return onsource[best_comb], offsource[best_comb]
num_ifos -= 1
logging.warning("No suitable science segments available.")
try:
return None, offsource[best_comb]
except UnboundLocalError:
return None, min_seg
[docs]def save_veto_definer(cp, out_dir, tags=None):
""" Retrieve the veto definer file and save it locally
Parameters
-----------
cp : ConfigParser instance
out_dir : path
tags : list of strings
Used to retrieve subsections of the ini file for
configuration options.
"""
if tags is None:
tags = []
make_analysis_dir(out_dir)
veto_def_url = cp.get_opt_tags("workflow-segments",
"segments-veto-definer-url", tags)
veto_def_base_name = os.path.basename(veto_def_url)
veto_def_new_path = os.path.abspath(os.path.join(out_dir,
veto_def_base_name))
# Don't need to do this if already done
resolve_url(veto_def_url,out_dir)
# and update location
cp.set("workflow-segments", "segments-veto-definer-file", veto_def_new_path)
return veto_def_new_path
[docs]def parse_cat_ini_opt(cat_str):
""" Parse a cat str from the ini file into a list of sets """
if cat_str == "":
return []
cat_groups = cat_str.split(',')
cat_sets = []
for group in cat_groups:
group = group.strip()
cat_sets += [set(c for c in group)]
return cat_sets
[docs]def cat_to_veto_def_cat(val):
""" Convert a category character to the corresponding value in the veto
definer file.
Parameters
----------
str : single character string
The input category character
Returns
-------
pipedown_str : str
The pipedown equivalent notation that can be passed to programs
that expect this definition.
"""
if val == '1':
return 1
if val == '2':
return 2
if val == '3':
return 4
if val == 'H':
return 3
else:
raise ValueError('Invalid Category Choice')
[docs]def file_needs_generating(file_path, cp, tags=None):
"""
This job tests the file location and determines if the file should be
generated now or if an error should be raised. This uses the
generate_segment_files variable, global to this module, which is described
above and in the documentation.
Parameters
-----------
file_path : path
Location of file to check
cp : ConfigParser
The associated ConfigParser from which the
segments-generate-segment-files variable is returned.
It is recommended for most applications to use the default option by
leaving segments-generate-segment-files blank, which will regenerate
all segment files at runtime. Only use this facility if you need it.
Choices are
* 'always' : DEFAULT: All files will be generated even if they already exist.
* 'if_not_present': Files will be generated if they do not already exist. Pre-existing files will be read in and used.
* 'error_on_duplicate': Files will be generated if they do not already exist. Pre-existing files will raise a failure.
* 'never': Pre-existing files will be read in and used. If no file exists the code will fail.
Returns
--------
int
1 = Generate the file. 0 = File already exists, use it. Other cases
will raise an error.
"""
if tags is None:
tags = []
if cp.has_option_tags("workflow-segments",
"segments-generate-segment-files", tags):
value = cp.get_opt_tags("workflow-segments",
"segments-generate-segment-files", tags)
generate_segment_files = value
else:
generate_segment_files = 'always'
# Does the file exist
if os.path.isfile(file_path):
if generate_segment_files in ['if_not_present', 'never']:
return 0
elif generate_segment_files == 'always':
err_msg = "File %s already exists. " %(file_path,)
err_msg += "Regenerating and overwriting."
logging.warn(err_msg)
return 1
elif generate_segment_files == 'error_on_duplicate':
err_msg = "File %s already exists. " %(file_path,)
err_msg += "Refusing to overwrite file and exiting."
raise ValueError(err_msg)
else:
err_msg = 'Global variable generate_segment_files must be one of '
err_msg += '"always", "if_not_present", "error_on_duplicate", '
err_msg += '"never". Got %s.' %(generate_segment_files,)
raise ValueError(err_msg)
else:
if generate_segment_files in ['always', 'if_not_present',
'error_on_duplicate']:
return 1
elif generate_segment_files == 'never':
err_msg = 'File %s does not exist. ' %(file_path,)
raise ValueError(err_msg)
else:
err_msg = 'Global variable generate_segment_files must be one of '
err_msg += '"always", "if_not_present", "error_on_duplicate", '
err_msg += '"never". Got %s.' %(generate_segment_files,)
raise ValueError(err_msg)
[docs]def get_segments_file(workflow, name, option_name, out_dir):
"""Get cumulative segments from option name syntax for each ifo.
Use syntax of configparser string to define the resulting segment_file
e.x. option_name = +up_flag1,+up_flag2,+up_flag3,-down_flag1,-down_flag2
Each ifo may have a different string and is stored separately in the file.
Flags which add time must precede flags which subtract time.
Parameters
----------
workflow: pycbc.workflow.Workflow
name: string
Name of the segment list being created
option_name: str
Name of option in the associated config parser to get the flag list
returns
--------
seg_file: pycbc.workflow.SegFile
SegFile intance that points to the segment xml file on disk.
"""
from pycbc.dq import query_str
make_analysis_dir(out_dir)
cp = workflow.cp
start = workflow.analysis_time[0]
end = workflow.analysis_time[1]
# Check for veto definer file
veto_definer = None
if cp.has_option("workflow-segments", "segments-veto-definer-url"):
veto_definer = save_veto_definer(workflow.cp, out_dir, [])
# Check for provided server
server = "https://segments.ligo.org"
if cp.has_option("workflow-segments", "segments-database-url"):
server = cp.get("workflow-segments",
"segments-database-url")
source = "any"
if cp.has_option("workflow-segments", "segments-source"):
source = cp.get("workflow-segments", "segments-source")
if source == "file":
local_file_path = \
resolve_url(cp.get("workflow-segments", option_name+"-file"))
pfn = os.path.join(out_dir, os.path.basename(local_file_path))
shutil.move(local_file_path, pfn)
return SegFile.from_segment_xml(pfn)
segs = {}
for ifo in workflow.ifos:
flag_str = cp.get_opt_tags("workflow-segments", option_name, [ifo])
key = ifo + ':' + name
segs[key] = query_str(ifo, flag_str, start, end,
source=source, server=server,
veto_definer=veto_definer)
logging.info("%s: got %s flags", ifo, option_name)
return SegFile.from_segment_list_dict(name, segs,
extension='.xml',
valid_segment=workflow.analysis_time,
directory=out_dir)