Source code for StructuredData.SDshelllibBase

import inspect
import os.path
import pprint
import csv
import cStringIO
import internal.myyaml as myyaml
import Classes as SD

csvdelim= ';'

locked_files= set()
locked_dirs= set()

[docs]def use_file_lock(flag): """switch file locking on and off.""" SD.use_lockfile= flag
def add_locked_file(filename): locked_files.add(os.path.abspath(filename)) def add_locked_dir(dir_): locked_dirs.add(os.path.abspath(dir_))
[docs]def module_functions(modules): """return all functions of a dict of modules. modules must be a dict mapping module names to module objects. The function returns a dict mapping function names to function objects. """ def fname(mod_name,fun): if not mod_name: return fun return "%s.%s" % (mod_name, fun) funcs= {} for (mod_name, mod) in modules.items(): for m in inspect.getmembers(mod): if not inspect.isfunction(m[1]): continue funcs[fname(mod_name,m[0])]= m[1] return funcs
[docs]def is_readonly(path): """returns True of the path (file) should not be written. """ path= os.path.abspath(path) (head, tail)= os.path.split(path) head= os.path.abspath(head) if head in locked_dirs: return "directory '%s' is read-only" % head p= os.path.join(head, tail) if p in locked_files: return "file '%s' is read-only" % p return None
def read_py(f): dict_= eval(open(f).read()) return dict_ def py(val): return pprint.pformat(val)
[docs]def prettyprint(filename, struct): """pretty print a structure to console or a file.""" if not filename: # filename==None or filename=="": return py(struct) else: stream= file(filename, 'w') pprint.pprint(struct, stream) stream.close() return filename
[docs]def write_yml(filename, struct): """write in yaml to console or a file.""" if not filename: # filename==None or filename=="": return myyaml.write_string(struct) else: myyaml.write_file(filename, struct) return filename
[docs]def as_csv(iterator, delimiter): """return as csv data.""" result= cStringIO.StringIO() csvwriter= csv.writer(result, delimiter= delimiter, lineterminator= os.linesep) for elm in iterator: csvwriter.writerow(elm) contents= result.getvalue() result.close() return contents
def aligned(val): maxlen=0 d= {} if hasattr(val, "items"): for (k,v) in val.items(): if len(k)>maxlen: maxlen=len(k) d[k]= v elif hasattr(val, "__iter__"): for (k,v) in val: if len(k)>maxlen: maxlen=len(k) d[k]= v else: raise AssertionError, "unsupported val for align: %s" % repr(val) res= [] for k in sorted(d.keys()): res.append("%s: %s" % (k.ljust(maxlen),d[k])) return "\n".join(res) def yaml(val): return myyaml.write_string(val) def multi(format_, val, delimiter=";"): if format_=="raw": return val elif format_=="yaml": return yaml(val) elif format_=='py': return py(val) elif format_=='csv': return as_csv(val, csvdelim) else: error(ValueError, "ERROR: unexpected format_:%s" % format_) return
[docs]class MultiStringOption(object): """A helper class for string options. Here are some examples: >>> m= _MultiStringOption({"a":0, "b":0, "c":0, "x":1, "y":1},[None,"y"]) >>> m.parse("x:b") ['b', 'x'] >>> m.parse("x:b:a") Traceback (most recent call last): ... ValueError: contradicting part 'a' in spec 'x:b:a' >>> m.parse("a") ['a', 'y'] >>> m.parse("x") [None, 'x'] >>> m.parse("z") Traceback (most recent call last): ... ValueError: unknown part 'z' in spec 'z' """ def __init__(self, values, defaults): """constructor.""" self._values= values self._defaults= defaults def __repr__(self): """return a repr string for the object.""" return "_MultiStringOption(%s)" % repr(self._spec)
[docs] def parse(self, spec): """parse a spec string. parameters: spec - specification string returns: a tuple (result, new): """ results= [None] * len(self._defaults) lst= spec.split(":") for st in lst: if st=="": continue st_l= st.lower() idx= self._values.get(st_l) if idx is None: raise ValueError, "unknown part '%s' in spec '%s'" % \ (st, spec) if results[idx] is not None: raise ValueError, "contradicting part '%s' in spec '%s'" % \ (st, spec) results[idx]= st_l for idx in xrange(len(results)): if results[idx] is None: results[idx]= self._defaults[idx] return results