Source code for ABXpy.task

"""This module is used for creating a new task and preprocessing.

This module contains the functions to specify and initialise a new ABX task,
compute and display the statistics, and generate the ABX triplets and pairs.

It can also be used in a command line. See task --help for the documentation

Usage
-----

Form the command line:

.. code-block:: bash

    python task.py my_data.item -o column1 -a column2 column3 -b column4 \
column5 -f "[attr == 0 for attr in column3_X]"

my_data.item is a special file containing an index of the database and a set
of labels or attributes. See input format [#TODO insert hypertext]

In python:

.. code-block:: python

    import ABXpy.task
    # create a new task and compute the statistics
    myTask = ABXpy.task.Task('data.item', 'on_label', 'across_feature', \
'by_label', filters=my_filters, regressors=my_regressors)
    print myTask.stats  # display statistics
    myTask.generate_triplets()  # generate a h5db file 'data.abx'containing \
all the triplets and pairs

Example
-------

#TODO this example is for the front page or ABX module, to move
An example of ABX triplet:

+------+------+------+
|  A   |  B   |  X   |
+======+======+======+
| on_1 | on_2 | on_1 |
+------+------+------+
| ac_1 | ac_1 | ac_2 |
+------+------+------+
| by   | by   | by   |
+------+------+------+

A and X share the same 'on' attribute; A and B share the same 'across'
attribute; A,B and X share the same 'by' attribute
"""
# -*- coding: utf-8 -*-

# make sure the rest of the ABXpy package is accessible
import os
import sys
package_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
if not(package_path in sys.path):
    sys.path.append(package_path)

import h5py
import numpy as np
import pandas as pd
import ABXpy.database.database as database
import ABXpy.h5tools.np2h5 as np2h5
import ABXpy.h5tools.h52np as h52np
import ABXpy.h5tools.h5_handler as h5_handler
import ABXpy.h5tools.h5io as h5io
import ABXpy.misc.type_fitting as type_fitting
import ABXpy.sideop.filter_manager as filter_manager
import ABXpy.sideop.regressor_manager as regressor_manager
import ABXpy.sampling.sampler as sampler
import ABXpy.misc.progress_display as progress_display

# FIXME many of the fixmes should be presented as feature requests in a
# github instead of fixmes
"""
# FIXME get a memory and speed efficient mechanism for storing a task on disk
and loading it back (pickling doesn't work well)
# FIXME filter out empty  'on-across-by' blocks and empty 'by' blocks as soon
as possible (i.e. when computing stats)
# FIXME generate unique_pairs in separate file
# FIXME find a better scheme for naming 'by' datasets in HDF5 files (to remove
    the current warning)
# FIXME efficiently dealing with case where there is no across
# FIXME syntax to specify names for side-ops when computing them on the fly or
at the very least number of output (default is one)
# FIXME implementing file locking, md5 hash and path for integrity checks and
logging warnings using the standard logging library of python + a verbose stuff
# FIXME putting metadata in h5files + pretty print it
# FIXME dataset size for task file seems too big when filtering so as to get
only 3 different talkers ???
# FIXME allow specifying regressors and filters from within python using
something like (which should be integrated with the existing dbfun stuff):
# class ABX_context(object):
#		def __init__(self, db):
                        # init fields with None
#	context = ABX_context(db_file)
# def new_filter(context):
#	return [True for e in context.talker_A]
# FIXME allow other ways of providing the hierarchical db (directly in
# pandas format, etc.)
"""


"""More complicated FIXMES
# FIXME taking by datasets as the basic unit was a mistake, because cases
where there many small by datasets happen. Find a way to group them when
needed both in the computations and in the h5 files
# FIXME allow by sampling customization depending on the analyzes to be
# carried out
"""


[docs]class Task(object): """ Define an ABX task for a given database. Attributes ---------- `stats` : dict. Contain several statistics about the task. The main \ 3 attributes are: - nb_blocks the number of blocks of ABX triplets sharing the same 'on', \ 'across' and 'by' features. - nb_triplets the number of triplets considered. - nb_by_levels the number of blocks of ABX triplets sharing the same \ 'by' attribute. Parameters ---------- db_name : str the filename of database on which the ABX task is applied. on : str the 'on' attribute of the ABX task. A and X share the same 'on' attribute and B has a different one. across : list, optional a list of strings containing the 'across' attributes of the ABX task. A and B share the same 'across' attributes and X has a different one. by : list, optional a list of strings containing the 'by' attributes of the ABX task. A,B and X share the same 'by' attributes. filters : list, optional a list of string specifying a filter on A, B or X. regressors : list, optional a list of string specifying a filter on A, B or X. verbose : int, optional display additionnal information is set superior to 0. verify : str, optionnal verify the correctness of the database file, do by default. features : str, otpionnal the features file. Add it to verify the consistency with the item file """ def __init__(self, db_name, on, across=None, by=None, filters=None, regressors=None, verbose=0, verify=True, features=None): self.verbose = verbose assert os.path.exists(db_name), ('the item file {0} was not found:' .format(db_name)) if across is None: across = [] if by is None: by = [] if filters is None: filters = [] if regressors is None: regressors = [] # check parameters # using several 'on' isn't supported by the toolbox assert isinstance(on, basestring), \ 'ON attribute must be specified by a string' on = [on] if isinstance(across, basestring): across = [across] if isinstance(by, basestring): by = [by] if verify: verifydb(db_name, features) # open database db, db_hierarchy, feat_db = database.load(db_name, features_info=True) # check that required columns are present cols = set(db.columns) message = ' argument is invalid, check that all \ the provided attributes are defined in the database ' + db_name # the argument of issuperset needs to be a list ... assert cols.issuperset(on), 'ON' + message assert cols.issuperset(across), 'ACROSS' + message assert cols.issuperset(by), 'BY' + message # FIXME add additional checks, for example that columns # in BY, ACROSS, ON are not the same ? (see task structure notes) # also that location columns are not used for col in cols: assert '_' not in col, col + ': you cannot use underscore in \ column names' assert '#' not in col, col + ': you cannot use \'#\' in \ column names' # if 'by' or 'across' are empty create appropriate dummy columns # (note that '#' is forbidden in user names for columns) if not by: db['#by'] = 0 by = ['#by'] if not across: db['#across'] = range(len(db)) across = ['#across'] # note that this additional columns are not in the db_hierarchy, # but I don't think this is problematic self.filters = filter_manager.FilterManager(db_hierarchy, on, across, by, filters) self.regressors = regressor_manager.RegressorManager(db, db_hierarchy, on, across, by, regressors) self.sampling = False # prepare the database for generating the triplets self.by_dbs = {} self.feat_dbs = {} self.on_blocks = {} self.across_blocks = {} self.on_across_blocks = {} self.antiacross_blocks = {} by_groups = db.groupby(by) if self.verbose > 0: display = progress_display.ProgressDisplay() display.add('block', 'Preprocessing by block', len(by_groups)) for by_key, by_frame in by_groups: if self.verbose > 0: display.update('block', 1) display.display() # allow to get by values as well as values of other variables # that are determined by these by_values = dict(by_frame.iloc[0]) # apply 'by' filters if self.filters.by_filter(by_values): # get analogous feat_db by_feat_db = feat_db.iloc[by_frame.index] # drop indexes by_frame = by_frame.reset_index(drop=True) # reset_index to get an index relative to the 'by' db, # the original index could be conserved in an additional # 'index' column if necessary by removing the drop=True, but # this would add another constraint on the possible column name by_feat_db = by_feat_db.reset_index(drop=True) # apply generic filters by_frame = self.filters.generic_filter(by_values, by_frame) self.by_dbs[by_key] = by_frame self.feat_dbs[by_key] = by_feat_db self.on_blocks[by_key] = self.by_dbs[by_key].groupby(on) self.across_blocks[by_key] = self.by_dbs[ by_key].groupby(across) self.on_across_blocks[by_key] = self.by_dbs[ by_key].groupby(on + across) if len(across) > 1: self.antiacross_blocks[by_key] = dict() for across_key in (self.across_blocks[by_key] .groups.iterkeys()): b = True for i, col in enumerate(across): b = b * (by_frame[col] != across_key[i]) self.antiacross_blocks[by_key][ across_key] = by_frame[b].index # store parameters self.database = db_name self.db = db self.db_hierarchy = db_hierarchy self.on = on self.across = across self.by = by # determining appropriate numeric type to represent index (currently # used only for numpy arrays and h5 storage, might also be used for # panda frames) types = {} for key, db in self.by_dbs.iteritems(): # len(db)-1 wouldn't work here because there could be missing index # due to generic filtering n = np.max(db.index.values) types[key] = type_fitting.fit_integer_type(n, is_signed=False) self.types = types # compute some statistics about the task self.compute_statistics()
[docs] def compute_statistics(self, approximate=False): """Compute the statistics of the task The number of ABX triplets is exact in most cases if approximate is set to false. The other statistics can only be approxrimate in the case where there are A, B, X or ABX filters. Parameters ---------- Approximate : bool approximate the number of triplets """ self.stats = {} self.stats['approximate'] = bool(self.filters.A or self.filters.B or self.filters.X or self.filters.ABX) self.stats['approximate_nb_triplets'] = approximate and self.stats[ 'approximate'] self.stats['nb_by_levels'] = len(self.by_dbs) self.by_stats = {} if self.verbose > 0: display = progress_display.ProgressDisplay() display.add('block', 'Computing statistics for by block', self.stats['nb_by_levels']) for by in self.by_dbs: if self.verbose > 0: display.update('block', 1) display.display() stats = {} stats['nb_items'] = len(self.by_dbs[by]) stats['on_levels'] = self.on_blocks[by].size() stats['nb_on_levels'] = len(stats['on_levels']) stats['across_levels'] = self.across_blocks[by].size() stats['nb_across_levels'] = len(stats['across_levels']) stats['on_across_levels'] = self.on_across_blocks[by].size() stats['nb_on_across_levels'] = len(stats['on_across_levels']) self.by_stats[by] = stats self.stats['nb_blocks'] = sum([bystats['nb_on_across_levels'] for bystats in self.by_stats.values()]) if self.verbose > 0: display = progress_display.ProgressDisplay() display.add( 'block', 'Computing statistics for by/on/across block', self.stats['nb_blocks']) for by, db in self.by_dbs.iteritems(): stats = self.by_stats[by] stats['block_sizes'] = {} stats['nb_triplets'] = 0 stats['nb_across_pairs'] = 0 stats['nb_on_pairs'] = 0 # iterate over on/across blocks for block_key, count in stats['on_across_levels'].iteritems(): if self.verbose > 0: display.update('block', 1) display.display() block = self.on_across_blocks[by].groups[block_key] on_across_by_values = dict(db.ix[block[0]]) # retrieve the on and across keys (as they are stored in # the panda object) on, across = on_across_from_key( block_key) # apply the filter and check if block is empty if self.filters.on_across_by_filter(on_across_by_values): n_A = count n_X = stats['on_levels'][on] # FIXME quick fix to process case whith no across, but # better done in a separate loop ... if self.across == ['#across']: n_B = stats['nb_items'] - n_X else: n_B = stats['across_levels'][across] - n_A n_X = n_X - n_A stats['nb_across_pairs'] += n_A * n_B stats['nb_on_pairs'] += n_A * n_X if ((approximate or not(self.filters.A or self.filters.B or self.filters.X or self.filters.ABX)) and type(across) != tuple): stats['nb_triplets'] += n_A * n_B * n_X stats['block_sizes'][block_key] = n_A * n_B * n_X else: # count exact number of triplets, could be further # optimized because it isn't necessary to do the whole # triplet generation, in particular in the case where # there are no ABX filters triplets = self.on_across_triplets( by, on, across, block, on_across_by_values, with_regressors=False) stats['nb_triplets'] += triplets.shape[0] stats['block_sizes'][block_key] = triplets.shape[0] else: stats['block_sizes'][block_key] = 0 self.stats['nb_triplets'] = sum( [bystats['nb_triplets'] for bystats in self.by_stats.values()]) # FIXME: remove empty by blocks then remove empty on_across_by blocks # here, also reset self.n_blocks in consequence self.n_blocks = self.stats['nb_blocks']
[docs] def on_across_triplets(self, by, on, across, on_across_block, on_across_by_values, with_regressors=True): """Generate all possible triplets for a given by block. Given an on_across_block of the database and the parameters of the \ task, this function will generate the complete set of triplets and \ the regressors. Parameters ---------- by : int The block index on, across : int The task attributes on_across_block : list the block on_across_by_values : dict the actual values with_regressors : bool, optional By default, true Returns ------- triplets : numpy.Array the set of triplets generated regressors : numpy.Array the regressors generated """ # find all possible A, B, X where A and X have the 'on' feature of the # block and A and B have the 'across' feature of the block A = np.array(on_across_block, dtype=self.types[by]) on_set = set(self.on_blocks[by].groups[on]) # FIXME quick fix to process case whith no across, but better done in a # separate loop ... if self.across == ['#across']: # in this case A is a singleton and B can be anything in the by # block that doesn't have the same 'on' as A B = np.array( list(set(self.by_dbs[by].index).difference(on_set)), dtype=self.types[by]) else: B = self.across_blocks[by].groups[across] # remove B with the same 'on' than A B = np.array(list(set(B).difference(A)), dtype=self.types[by]) # remove X with the same 'across' than A if type(across) is tuple: antiacross_set = set(self.antiacross_blocks[by][across]) X = np.array(list(antiacross_set & on_set), dtype=self.types[by]) else: X = np.array(list(on_set.difference(A)), dtype=self.types[by]) # apply singleton filters db = self.by_dbs[by] if self.filters.A: A = self.filters.A_filter(on_across_by_values, db, A) if self.filters.B: B = self.filters.B_filter(on_across_by_values, db, B) if self.filters.X: X = self.filters.X_filter(on_across_by_values, db, X) # instantiate A, B, X regressors here if with_regressors: self.regressors.set_A_regressors(on_across_by_values, db, A) self.regressors.set_B_regressors(on_across_by_values, db, B) self.regressors.set_X_regressors(on_across_by_values, db, X) # A, B, X can then be combined efficiently in a full (or randomly # sampled) factorial design size = len(A) * len(B) * len(X) if size > 0: ind_type = type_fitting.fit_integer_type(size, is_signed=False) # if sampling in the absence of triplets filters, do it here if self.sampling and not(self.filters.ABX): indices = self.sampler.sample(size, dtype=ind_type) else: indices = np.arange(size, dtype=ind_type) # generate triplets from indices iX = np.mod(indices, len(X)) iB = np.mod(np.divide(indices, len(X)), len(B)) iA = np.divide(indices, len(B) * len(X)) triplets = np.column_stack((A[iA], B[iB], X[iX])) # apply triplets filters if self.filters.ABX: triplets = self.filters.ABX_filter( on_across_by_values, db, triplets) size = triplets.shape[0] # if sampling in the presence of triplets filters, do it here if self.sampling: ind_type = type_fitting.fit_integer_type( size, is_signed=False) indices = self.sampler.sample(size, dtype=ind_type) triplets = triplets[indices, :] else: triplets = np.empty(shape=(0, 3), dtype=self.types[by]) indices = np.empty(shape=size, dtype=np.uint8) iA = indices iB = indices iX = indices if with_regressors: if self.regressors.ABX: # instantiate ABX regressors here self.regressors.set_ABX_regressors( on_across_by_values, db, triplets) # self.regressors.XXX contains either (for by and on_across_by) # [[scalar_output_1_dbfun_1, scalar_output_2_dbfun_1,...], # [scalar_output_1_dbfun_2, ...], ...] # or: # [[np_array_output_1_dbfun_1, np_array_output_2_dbfun_1,...], # [np_array_output_1_dbfun_2, ...], ...] # FIXME change manager API so that self.regressors.A contains the # data and not the list of dbfun_s ? regressors = {} scalar_names = self.regressors.by_names + \ self.regressors.on_across_by_names scalar_regressors = self.regressors.by_regressors + \ self.regressors.on_across_by_regressors for names, regs in zip(scalar_names, scalar_regressors): for name, reg in zip(names, regs): regressors[name] = np.tile(np.array(reg), (np.size(triplets, 0), 1)) for names, regs in zip(self.regressors.A_names, self.regressors.A_regressors): for name, reg in zip(names, regs): regressors[name] = reg[iA] for names, regs in zip(self.regressors.B_names, self.regressors.B_regressors): for name, reg in zip(names, regs): regressors[name] = reg[iB] for names, regs in zip(self.regressors.X_names, self.regressors.X_regressors): for name, reg in zip(names, regs): regressors[name] = reg[iX] # FIXME implement this # for names, regs in zip(self.regressors.ABX_names, # self.regressors.ABX_regressors): # for name, reg in zip(names, regs): # regressors[name] = reg[indices,:] return triplets, regressors else: return triplets # FIXME add a mechanism to allow the specification of a random seed in a # way that would produce reliably the same triplets on different machines # (means cross-platform random number generator + having its state so as # to be sure that no other random number generation calls to it are # altering the sequence) # FIXME in case of sampling, get rid of blocks with no samples ?
[docs] def generate_triplets(self, output=None, sample=None): """Generate all possible triplets for the whole task and the \ associated pairs Generate the triplets and the pairs for an ABXpy.Task and store it in a h5db file. Parameters ---------- output : filename, optional The output file. If not specified, it will automatically create a new file with the same name as the input file. sample : bool, optional apply the function on a sample of the task """ # FIXME change this to a random file name to avoid overwriting problems # default name for output file if output is None: (basename, _) = os.path.splitext(self.database) output = basename + '.abx' # FIXME use an object that guarantees that the stream will not be # perturbed by external codes calls to np.random # set up sampling if any self.total_n_triplets = self.stats['nb_triplets'] if sample is not None: self.sampling = True if self.stats['approximate_nb_triplets']: raise ValueError('Cannot sample if number of triplets is \ computed approximately') # FIXME for now just something as random a possible np.random.seed() N = self.total_n_triplets if sample < 1: # proportion of triplets to be sampled sample = np.uint64(round(sample * N)) self.sampler = sampler.IncrementalSampler(N, sample) self.n_triplets = sample else: self.sampling = False self.n_triplets = self.total_n_triplets if self.verbose > 0: display = progress_display.ProgressDisplay() display.add( 'block', 'Computing triplets for by/on/across block', self.n_blocks) display.add( 'triplets', 'Triplets considered:', self.total_n_triplets) display.add( 'sampled_triplets', 'Triplets sampled:', self.n_triplets) # fill output file with list of needed ABX triplets, it is done # independently for each 'by' value for by, db in self.by_dbs.iteritems(): # class for efficiently writing to datasets of the output file # (using a buffer under the hood) with np2h5.NP2H5(h5file=output) as fh: # FIXME test if not fixed size impacts performance a lot datasets, indexes = self.regressors.get_regressor_info() with (h5io.H5IO( filename=output, datasets=datasets, indexes=indexes, group='/regressors/' + str(by) + '/')) as out_regs: if sample is not None: n_rows = np.uint64( round(sample * (self.by_stats[by]['nb_triplets'] / np.float(self.total_n_triplets)))) else: n_rows = self.by_stats[by]['nb_triplets'] # not fixed_size datasets are necessary only when sampling # is performed out = fh.add_dataset(group='triplets', dataset=str( by), n_rows=n_rows, n_columns=3, item_type=self.types[by], fixed_size=False) # allow to get by values as well as values of other # variables that are determined by these by_values = dict(db.iloc[0]) # instantiate by regressors here self.regressors.set_by_regressors(by_values) # iterate over on/across blocks for block_key, block in (self.on_across_blocks[by] .groups.iteritems()): if self.verbose > 0: display.update('block', 1) # allow to get on, across, by values as well as values # of other variables that are determined by these on_across_by_values = dict(db.ix[block[0]]) if ((self.filters .on_across_by_filter(on_across_by_values))): # instantiate on_across_by regressors here self.regressors.set_on_across_by_regressors( on_across_by_values) on, across = on_across_from_key(block_key) triplets, regressors = self.on_across_triplets( by, on, across, block, on_across_by_values) out.write(triplets) out_regs.write(regressors, indexed=True) if self.verbose > 0: display.update( 'sampled_triplets', triplets.shape[0]) display.update('triplets', (self.by_stats[by] ['block_sizes'][block_key])) if self.verbose > 0: display.display() self.generate_pairs(output) # FIXME clean this function (maybe do a few well-separated sub-functions # for getting the pairs and unique them)
[docs] def generate_pairs(self, output=None): """Generate the pairs associated to the triplet list .. note:: This function is called by generate_triplets and should not be used independantly """ # FIXME change this to a random file name to avoid overwriting problems # default name for output file if output is None: (basename, _) = os.path.splitext(self.database) output = basename + '.abx' # list all pairs all_empty = True for by, db in self.by_dbs.iteritems(): # FIXME maybe care about this case earlier ? with h5py.File(output) as fh: not_empty = fh['/triplets/' + str(by)].size if not_empty: all_empty = False max_ind = np.max(db.index.values) pair_key_type = type_fitting.fit_integer_type( (max_ind + 1) ** 2 - 1, is_signed=False) with h52np.H52NP(output) as f_in: with np2h5.NP2H5(output) as f_out: inp = f_in.add_dataset('triplets', str(by)) out = f_out.add_dataset( 'pairs', str(by), n_columns=1, item_type=pair_key_type, fixed_size=False) # FIXME repace this by a for loop by making h52np # implement the iterable pattern with next() outputing # inp.read() try: while True: triplets = pair_key_type(inp.read()) n = triplets.shape[0] ind = np.arange(n) i1 = 2 * ind i2 = 2 * ind + 1 # would need to amend np2h5 and h52np to remove # the second dim... pairs = np.empty( shape=(2 * n, 1), dtype=pair_key_type) # FIXME change the encoding (and type_fitting) # so that A,B and B,A have the same code ... # (take a=min(a,b), b=max(a,b)) # FIXME but allow a flag to control the # behavior to be able to enforce A,X and B,X # order when using assymetrical distance # functions pairs[i1, 0] = triplets[:, 0] + ( max_ind + 1) * triplets[:, 2] # AX pairs[i2, 0] = triplets[:, 1] + ( max_ind + 1) * triplets[:, 2] # BX # FIXME do a unique here already? Do not store # the inverse mapping ? (could sort triplets on # pair1, complete pair1, sort on pair2, # complete pair 2 and shuffle ?) out.write(pairs) except StopIteration: pass # sort pairs handler = h5_handler.H5Handler(output, '/pairs/', str(by)) # memory: available RAM in Mo, could be a param memory = 1000 # estimate of the amount of data to be sorted with h5py.File(output) as fh: n = fh['/pairs/' + str(by)].shape[0] i = fh['/pairs/' + str(by)].dtype.itemsize amount = n * i # in bytes # harmonize units to Ko: memory = 1000 * memory amount = amount / 1000. # be conservative: aim at using no more than 3/4 the available # memory # if enough memory take one chunk (this will do an unnecessary # full write and read of the file... could be optimized easily) if amount <= 0.75 * memory: # would it be beneficial to have a large o_buffer_size as # well ? handler.sort(buffer_size=amount) # else take around 30 chunks if possible (this seems efficient # given the current implem, using a larger number of chunks # efficiently might be possible if the reading chunks part of # the sort was cythonized ?) elif amount / 30. <= 0.75 * memory: handler.sort(buffer_size=amount / 30.) # else take minimum number of chunks possible given the # available RAM else: handler.sort(buffer_size=0.75 * memory) # FIXME should have a unique function directly instead of # sorting + unique ? with h52np.H52NP(output) as f_in: with np2h5.NP2H5(output) as f_out: inp = f_in.add_dataset('pairs', str(by)) out = f_out.add_dataset( 'unique_pairs', str(by), n_columns=1, item_type=pair_key_type, fixed_size=False) try: last = -1 while True: pairs = inp.read() pairs = np.unique(pairs) # unique alters the shape pairs = np.reshape(pairs, (pairs.shape[0], 1)) if pairs[0, 0] == last: pairs = pairs[1:] if pairs.size > 0: last = pairs[-1, 0] out.write(pairs) except StopIteration: pass with h5py.File(output) as fh: del fh['/pairs/' + str(by)] # store for ulterior decoding with h5py.File(output) as fh: fh['/unique_pairs'].attrs[str(by)] = max_ind + 1 store = pd.HDFStore(output) # use append to make use of table format, which is better at # handling strings without much space (fixed-size format) store.append('/feat_dbs/' + str(by), self.feat_dbs[by], expectedrows=len(self.feat_dbs[by])) store.close() # FIXME generate inverse mapping to triplets (1 and 2) ? if not(all_empty): with h5py.File(output) as fh: del fh['/pairs/'] # number of triplets when triplets with same on, across, by are counted as # one # FIXME current implementation won't work with A, B, X or ABX filters # FIXME lots of code in this function is repicated from # on_across_triplets, generate_triplets and/or compute_stats: the maximum # possible should be factored out, including the loop over by, loop over # on_across iteration structure
[docs] def compute_nb_levels(self): if self.filters.A or self.filters.B or self.filters.X or \ self.filters.ABX: raise ValueError( 'Current implementation do not support computing nb_levels in ' 'the presence of A, B, X, or ABX filters') if self.verbose > 0: display = progress_display.ProgressDisplay() display.add( 'block', 'Computing nb_levels for by block', self.stats['nb_by_levels']) for by, db in self.by_dbs.iteritems(): if self.verbose > 0: display.update('block', 1) display.display() n = 0 # iterate over on/across blocks for block_key, n_block in (self.by_stats[by]['on_across_levels'] .iteritems()): block = self.on_across_blocks[by].groups[block_key] on_across_by_values = dict(db.ix[block[0]]) on, across = on_across_from_key(block_key) if self.filters.on_across_by_filter(on_across_by_values): # find all possible A, B, X where A and X have the 'on' # feature of the block and A and B have the 'across' # feature of the block on_across_block = self.on_across_blocks[ by].groups[block_key] A = np.array(on_across_block, dtype=self.types[by]) X = self.on_blocks[by].groups[on] # FIXME quick fix to process case whith no across, but # better done in a separate loop ... if self.across == ['#across']: # in this case A is a singleton and B can be anything # in the by block that doesn't have the same 'on' as A B = np.array( list(set(self.by_dbs[by].index).difference(X)), dtype=self.types[by]) else: B = self.across_blocks[by].groups[across] # remove B with the same 'on' than A B = np.array( list(set(B).difference(A)), dtype=self.types[by]) # remove X with the same 'across' than A X = np.array( list(set(X).difference(A)), dtype=self.types[by]) if B.size > 0 and X.size > 0: # case were there was no across specified is different if self.across == ["#across"]: grouping = self.on else: grouping = self.on + self.across n_level_B = len(db.iloc[B].groupby(grouping).groups) n_level_X = len(db.iloc[X].groupby(grouping).groups) n = n + n_level_B * n_level_X self.by_stats[by]['nb_levels'] = n self.stats['nb_levels'] = sum([stats['nb_levels'] for stats in self.by_stats.values()])
[docs] def print_stats(self, filename=None, summarized=True): if filename is None: self.print_stats_to_stream(sys.stdout, summarized) else: with open(filename, 'w') as h: self.print_stats_to_stream(h, summarized)
[docs] def print_stats_to_stream(self, stream, summarized): import pprint stream.write('\n\n###### Global stats ######\n\n') pprint.pprint(self.stats, stream) stream.write('\n\n###### by blocks stats ######\n\n') if not(summarized): for by, stats in self.by_stats.iteritems(): stream.write('### by level: %s ###\n' % str(by)) pprint.pprint(stats, stream) else: for by, stats in self.by_stats.iteritems(): stream.write('### by level: %s ###\n' % str(by)) stream.write('nb_triplets: %d\n' % stats['nb_triplets']) stream.write('nb_levels: %d\n' % stats['nb_levels']) stream.write('nb_across_pairs: %d\n' % stats['nb_across_pairs']) stream.write('nb_on_pairs: %d\n' % stats['nb_on_pairs']) stream.write('nb_on_levels: %d\n' % stats['nb_on_levels']) stream.write('nb_across_levels: %d\n' % stats['nb_across_levels']) stream.write('nb_on_across_levels: %d\n' % stats['nb_on_across_levels']) # utility function necessary because of current inconsistencies in panda: # you can't seem to index a dataframe with a tuple with only one element, # even though tuple with more than one element are fine
[docs]def on_across_from_key(key): on = key[0] # if panda was more consistent we could use key[:1] instead ... across = key[1:] if len(across) == 1: # this is the problematic case across = across[0] return on, across
[docs]def verifydb(filename, features=None): with open(filename) as f: cols = str.split(f.readline()) assert len(cols) > 4, 'the syntax of the item file is incorrect' assert cols[0] == '#file', 'The first column must be named #file' assert cols[1] == 'onset', 'The second column must be named onset' assert cols[2] == 'offset', 'The third column must be named offset' if features: h5f = h5py.File(features) files = h5f['features']['files'][:] for line in f: source = str.split(line, ' ')[0] assert source in files, ("The file " + source + " cannot " "be found in the feature file")
""" Command-line API Example call: task.py ./test.token --on word --across talker --by length --write_triplets """ # FIXME maybe some problems if wanting to pass some code directly on the # command-line if it contains something like s = "'a'==1 and 'b'==2" ? but # not a big deal ? # detects whether the script was called from command-line if __name__ == '__main__': import argparse # using lists as default value in the parser might be dangerous ? # probably not as long as it is not used more than once ? # parser (the usage string is specified explicitly because the default # does not show that the mandatory arguments must come before the # mandatory ones; otherwise parsing is not possible beacause optional # arguments can have various numbers of inputs) parser = argparse.ArgumentParser( usage="""%(prog)s database [output] -o ON [-a ACROSS [ACROSS ...]] \ [-b BY [BY ...]] [-f FILT [FILT ...]] [-r REG [REG ...]] [-s SAMPLING_AMOUNT\ _OR_PROPORTION] [--stats-only] [-h] [-v VERBOSE_LEVEL] [--no_verif] \ [--features FEATURE_FILE]""", description='ABX task specification') message = """must be defined by the database you are using (e.g. speaker \ or phonemes, if your database contains columns defining these attributes)""" # I/O files g1 = parser.add_argument_group('I/O files') g1.add_argument( 'database', help='main file of the database defining the items used to form ABX ' 'triplets and their attributes') g1.add_argument('output', nargs='?', default=None, help='optional: output file, where the results of the ' 'analysis will be put') # Task specification g2 = parser.add_argument_group('Task specification') g2.add_argument( '-o', '--on', required=True, help='ON attribute, ' + message) g2.add_argument('-a', '--across', nargs='+', default=[], help='optional: ACROSS attribute(s), ' + message) g2.add_argument('-b', '--by', nargs='+', default=[], help='optional: BY attribute(s), ' + message) g2.add_argument('-f', '--filt', nargs='+', default=[], help='optional: filter specification(s), ' + message) g2.add_argument('-s', '--sample', default=None, type=float, help='optional: if a real number in ]0;1[: sampling ' 'proportion, if a strictly positive integer: number ' 'of triplets to be sampled') # Regressors specification g3 = parser.add_argument_group('Regressors specification') g3.add_argument('-r', '--reg', nargs='+', default=[], help='optional: regressor specification(s), ' + message) # Computation parameters g4 = parser.add_argument_group('Computation parameters') g4.add_argument('--stats_only', default=False, action='store_true', help='add this flag if you only want some statistics ' 'about the specified task') g4.add_argument('-v', '--verbose', default=0, help='optional: level of verbosity required on the ' 'standard output') g4.add_argument('--no_verif', default=False, action='store_true', help='optional: skip the verification of the database ' 'file consistancy') g4.add_argument('--features', help='optional: feature file, verify the consistency ' 'of the feature file with the item file') args = parser.parse_args() if os.path.exists(args.output) and not args.stats_only: print("WARNING: Overwriting task file " + args.output) if not args.no_verif and args.features: print("WARNING: Cannot verify the consistency of the item file {0} " "with the features file because the features file was not " "provided") task = Task(args.database, args.on, args.across, args.by, args.filt, args.reg, args.verbose, not args.no_verif, args.features) if not(args.stats_only): # generate triplets and unique pairs task.generate_triplets(args.output, args.sample) else: task.print_stats()