# Copyright (c) 1996, 1997, 1998 The Regents of the University of California. # All rights reserved. See legal notices for full text and disclaimer. """ Places you can store a time history The API: m = Memory (name) -- histories in memory t = TextFile (filename) -- histories in a text file c = ColumnarFile (filename) -- columned histories in a text file n = Event () -- no storage Methods of HistoryMedium: begin_record (current_cycle, current_time) end_record () write (name, value) external_name (name) = name used on the external medium history(name) -- if present, returns existing history of name Attributes: registry -- a map of external names to cycle last written is_open -- true if the record is currently open record_number -- number of calls that have been made to end_record () cycle, time -- latest arguments to begin_record (cycle, time) (After an end_record () cycle and registry[name] should agree in all cases) write_cycle_and_time -- if true, cycle and time written first in each record Descendents should call ancestor routines followed by their own action. Each descendent should supply _write (self, ename, value) and external_name (name) can be used to map user names to file names. Record numbers begin at 1. """ HistoryMediumError = "Error in History Medium package" class HistoryMedium: "Storage medium for a time history." def initialize (self): "Initialize a history medium." self.registry = {} self.record_number = 0 self.cycle = None self.time = None self.is_open = 0 self.write_cycle_and_time = 1 def begin_record (self, cycle, time): "Start recording a new record at time." self.record_number = self.record_number + 1 self.is_open = 1 self.cycle = cycle self.time = time def end_record (self): "Finish recording a new record." self.is_open = 0 def external_name (self, name): "Return the name to actually use in the medium." return name def write (self, name, value): "Add a new value to the history named 'name'." ename = self.external_name (name) if not self.registry.has_key (ename): self.registry [ename] = self.cycle - 1 if self.registry[ename] >= self.cycle: raise HistoryMediumError, \ "More than one write of external name per record, name = " + ename self._write (ename, value) self.registry[ename] = self.cycle def history(self, name): "History of name in this tag; tag must not be open" if self.is_open: raise HistoryMediumError, \ "Cannot call history when tag is open." return self._get_history (name) def _get_history (self, name): "Default implementation raises exception." raise HistoryMediumError, \ "This tag type does not support method history(name)." class Memory (HistoryMedium): "Storage medium in memory for a time history." def __init__ (self): HistoryMedium.initialize (self) self.histories = {} def _write (self, ename, value): if self.record_number == 1: self.histories[ename] = [value] else: self.histories[ename].append (value) def _get_history (self, name): "Get the history stored under name." try: return self.histories[name] except KeyError: raise HistoryMediumError,\ "No history for " + name + " in this tag." def __repr__ (self): result = "Memory ()" return result __str__ = __repr__ class TextFile (HistoryMedium): "History storage in a text file." def __init__ (self, filename=""): HistoryMedium.initialize (self) self.set_name (filename) self.writer = None self.write_cycle_and_time = 0 def __del__ (self): self.writer = None def set_name (self, newname): "Set file name to newname" self.filename = newname self.file_record_number = 0 def begin_record (self, cycle, time): if not self.filename: raise HistoryMediumError, \ "No filename set for columnar file tag." HistoryMedium.begin_record (self, cycle, time) self.file_record_number = self.file_record_number + 1 if self.file_record_number == 1: self.writer = open (self.filename, "w") else: self.writer = open (self.filename, "a") f = self.writer f.write ("Record ") f.write (repr(self.record_number)) f.write ("\t") f.write (`cycle`) f.write ("\t") f.write (`time`) f.write ("\n") def end_record (self): f = self.writer self.writer = None f.write ('\n') f.close () HistoryMedium.end_record (self) def _write (self, name, value): f = self.writer f.write ("\t") f.write (name) f.write ("\t") f.write (`value`) f.write ("\n") def __repr__ (self): result = "TextFile (" + `self.filename` + ")" return result __str__ = __repr__ class ColumnarFile (HistoryMedium): "A text file history medium consisting of labeled columns." def __init__ (self, filename=""): HistoryMedium.initialize (self) self.set_name (filename) self.size = 0 self.names = [] self.catalog = {} self.values =[] def __repr__ (self): result = "ColumnarFile (" + `self.filename` + ")" return result __str__ = __repr__ def set_name (self, newname): "Set file name to newname" self.filename = newname self.file_record_number = 0 def begin_record (self, cycle, time): if not self.filename: raise HistoryMediumError, \ "No filename set for columnar file tag." HistoryMedium.begin_record (self, cycle, time) self.file_record_number = self.file_record_number + 1 if self.file_record_number == 1: self.writer = open (self.filename, "w") else: self.writer = open (self.filename, "a") def end_record (self): h = self.writer if self.file_record_number == 1: for i in range (self.size): if i > 0: h.write ('\t') h.write (self.names[i]) h.write ('\n') for i in range (self.size): if i > 0: h.write ('\t') h.write (`self.values[i]`) h.write ('\n') self.writer.close () HistoryMedium.end_record (self) def _write (self, ename, value): if self.record_number == 1: self.size = self.size + 1 self.names.append (ename) self.values.append (value) self.catalog[ename] = self.size - 1 else: try: j = self.catalog[ename] except KeyError: raise RuntimeError, "Unexpected name in columnar history." self.values[j] = value class Event (HistoryMedium): "No storage history medium, used for event scheduling." def __init__ (self): HistoryMedium.initialize (self) self.write_cycle_and_time = 0 def _write (self, ename, value): pass def __repr__ (self): result = "Event ()" return result __str__ = __repr__ if __name__ == "__main__": mem = Memory () t = TextFile ("history_medium_out.txt") e = Event () for i in [1,2,3,4]: for h in [mem, t, e]: h.begin_record (i, i / 10.) x = .2 * i h.write ("x", x) h.write ("y", x * x) h.end_record () print "History of x", mem.history["x"] c=input("Enter a number to quit: ") raise SystemExit