""" Class for reading UNIX unformatted FORTRAN files. """ from __future__ import division from __future__ import print_function from __future__ import with_statement import numpy as np import os, sys, gzip, bz2 from utils import prod, cumsum0_enum_range_iter from lzma import LZMAFile class DataReader(object): """ Provide basic IO for reading from data buffer """ default_byte_order = '=' fortran_reclen = 4 reclen_dtype = np.dtype(np.uint32) sys_is_le = sys.byteorder == 'little' native_byteorder = '<' if sys_is_le else '>' class RecordSizeError(Exception): """ Exception raised when assert_eor finds EOR == False. This means that the data read does not match the record length. """ def __init__(self, reclen, pos): self.reclen = reclen self.pos = pos def __str__(self): """Return error message.""" return ("\n" + "Not at end of data record in\n" + "Poistion = {:d}\n" + "RecLen = {:d}" ).format( self.pos, self.reclen) class ReadError(Exception): """ Exception raised when data could not be read correctly. """ def __init__(self, message): self.message = message def __str__(self): """ Return error message. """ return ( "\n" + "Error reading record from file: {}".format( self.message)) def __init__(self, byteorder = default_byte_order, **kwargs): self._set_byteorder(byteorder = byteorder) def _set_byteorder(self, byteorder = default_byte_order): """ set up all data types for deserved byte order """ if byteorder == "=": byteorder = self.native_byteorder self.swapbyteorder = byteorder != self.native_byteorder self.byteorder = byteorder self.reclen_dtype = self.reclen_dtype.newbyteorder(self.byteorder) self.dtype_i4 = np.dtype(np.int32).newbyteorder(self.byteorder) self.dtype_f8 = np.dtype(np.float64).newbyteorder(self.byteorder) self.i4 = np.ndarray([1],dtype=self.dtype_i4) self.f8 = np.ndarray([1],dtype=self.dtype_f8) def _reverse_byteorder(self): """ Reverse data type byte order """ byteorder = '<' if self.byteorder == '>' else '>' self._set_byteorder(byteorder) def eor(self): """Return whether position is end of current reciord.""" return self.pos == self.reclen def assert_eor(self): """Throw exception if current position is not end of record. This can be used to deterim whether all data has been read, i.e., as a consistency check of the read data sizes""" if not self.eor(): raise self.RecordSizeError(self.reclen,self.pos) def skip_bytes(self, bytes): """Skip number of bytes on read.""" self.pos += bytes def get_data(self): """Just get all of the record data.""" if self.pos > 0: raise self.ReadError('[get_data] : Not at beginning of record') self.pos = self.reclen return self.data def get_buf(self,length): """Read some raw data.""" p = self.pos self.pos += length return self.data[p:p+length] def get_i4n(self): """Read one numpy int32.""" p = self.pos self.i4.data[0:4] = self.data[p:p+4] self.pos += 4 return self.i4[0] def get_f8n(self): """Get one numpy float64.""" p = self.pos self.f8.data[0:8] = self.data[p:p+8] self.pos += 8 return self.f8[0] def peek_f8(self, offset=0): """Get one numpy float64 at location offset relative to current position. Do not advance buffer pointer.""" p = self.pos + offset self.f8.data[0:8] = self.data[p:p+8] return float(self.f8[0]) def get_f8(self): """Get one float from float64.""" p = self.pos self.f8.data[0:8] = self.data[p:p+8] self.pos += 8 return float(self.f8[0]) def get_i4(self): """Read one int32 and return Python integer.""" p = self.pos self.i4.data[0:4] = self.data[p:p+4] self.pos += 4 return int(self.i4[0]) def get_f8an(self,dim): """Read a numpy float64 array.""" value = \ np.ndarray(dim, \ buffer = self.data,\ offset = self.pos, \ dtype = np.float64, order = 'F') value = value.copy() if self.swapbyteorder: value.byteswap(True) self.pos += 8 * prod(dim) return value def get_kep_parm(self,list): """Read a kepler parameter binary list.""" count = len(list) value = \ np.ndarray(count, \ buffer=self.data,\ offset=self.pos, \ dtype=np.float64) value = value.copy() if self.swapbyteorder: value.byteswap(True) ivalue = \ np.ndarray(count, \ buffer=value,\ offset=4, \ dtype=np.int32,\ strides=8) self.pos += 8 * count return (value[i] if l == 1 else ivalue[i] for i,l in enumerate(list)) def get_f8an_kep_i4an(self,dim): """Read i4 in f8 array for kepler. Pass the f8 dimension. Half the space seems wasted the way KEPLER treats this, the entire second half of each arry is empty. Here we shall just discard the 2nd part of the array on only return the requested dimension. Usually one would first read the f8, do a byteswap, then use the buffer for the integers, however, KEPLER stores the i4 in the f8 in the same format a big endian system would have, and hence the byte-swap is only needed on the i4 level. """ value = \ np.ndarray(dim, \ buffer = self.data,\ offset = self.pos, \ dtype = np.int32, order = 'F') value = value.copy() if self.swapbyteorder: value.byteswap(True) self.pos += 8 * prod(dim) return value def get_f8an1d0(self, length): """Read a 1D np.float64 array and front-pad with one 0.""" value = \ np.ndarray(length+1, \ dtype=np.dtype(np.float64)) p = self.pos len8 = 8*length value.data[8:8+len8] = self.data[p:p+len8] if self.swapbyteorder: value.byteswap(True) value[0] = 0 self.pos += len8 return value def get_f8an1d0n(self,length): """Read a 1D np.float64 array and pad with one 0 on both sides.""" value = \ np.ndarray(length+2, \ dtype=np.dtype(np.float64)) len8 = 8*length p = self.pos value.data[8:8+len8] = self.data[p:p+len8] if self.swapbyteorder: value.byteswap(True) value[[0,-1]] = 0 self.pos += len8 return value def get_i4an1d0n(self,length): """Read a 1D np.int32 array and pad with one 0 on both sides.""" value = \ np.ndarray(length+2, \ dtype=np.dtype(np.int32)) len4 = 4*length p = self.pos value.data[4:4+len4] = self.data[p:p+len4] if self.swapbyteorder: value.byteswap(True) value[[0,-1]] = 0 self.pos += len4 return value # this one should be able to replace all the 1D routines. def get_1dn(self,length,xtype=np.float64,lead=0,tail=0,load=False): """Read a 1D np array and pad with 0 on either side.""" if load: self.load() itemsize = xtype.itemsize totlen = length + lead + tail value = np.ndarray(totlen,dtype=xtype) p = self.pos i0 = itemsize * lead nb = itemsize * length value.data[i0:i0+nb] = self.data[p:p+nb] if self.swapbyteorder: value.byteswap(True) value[0:lead]=0 value[totlen-tail:totlen]=0 self.pos += nb if load: self.assert_eor() return value # this one should be able to replace all the multiple-dim routines. def get_an(self,dim,xtype=np.float64): """Read an np array of type xtype.""" itemsize = xtype.itemsize value = np.ndarray(dim,\ buffer = self.data,\ offset = self.pos, \ dtype = xtype, order = 'F') value = value.copy() if self.swapbyteorder: value.byteswap(True) self.pos += itemsize * prod(dim) return value def get_i4an(self,dim): """Read a np.int32 array.""" value = \ np.ndarray(dim, \ buffer = self.data,\ offset = self.pos, \ dtype = np.int32, order = 'F') value = value.copy() if self.swapbyteorder: value.byteswap(True) self.pos += 4 * prod(dim) return value def get_i4a(self,count): """Read a np.int32 and return Python int list.""" value = \ np.ndarray(count, \ buffer = self.data,\ offset = self.pos, \ dtype = self.dtype_i4, order = 'F') value = value.tolist() self.pos += 4 * count return value def get_f8a(self, count): """Read a np.float64 and return Python float list.""" value = \ np.ndarray(count, \ buffer = self.data,\ offset = self.pos, \ dtype = self.dtype_f8, order = 'F') value = value.tolist() self.pos += 8 * count return value def get_sal(self,lengths): """Read an array of srings of varying lengths.""" nl = len(lengths) value = np.ndarray(nl,dtype=object) j = self.pos for k,i,j in cumsum0_enum_range_iter(lengths,self.pos): value[k] = self.data[i:j] self.pos = j return value def get_san(self,dim,length=8): """Read an array of srings of same length.""" value = np.ndarray(dim,\ buffer = self.data, offset = self.pos, dtype=np.dtype((np.str,length)),\ order = 'F') value = value.copy() self.pos += length * prod(dim) return value def get_s(self,length=8): """Read a srings.""" value = np.ndarray(1,\ buffer = self.data, offset = self.pos, dtype=np.dtype((np.str,length)),\ order = 'F') self.pos += length return value[0] def get_UUID(self): """Read an UUID1 and return object.""" p = self.pos value = uuidtime.UUID(bytes=self.data[p:p+16]) self.pos += 16 return value def get_UUID_raw(self): """Read UUID raw data (16 bytes).""" p = self.pos value = self.data[p:p+16] self.pos += 16 return value def get_UUID_raw_a1d(self,count,load=False): """Read UUID raw data (16 bytes).""" if load: self.load() p = self.pos value = [self.data[i:i+16] for i in xrange(p, p + 16 * count, 16)] self.pos += 16 * count if load: self.assert_eor() return value class RecordReader(DataReader): """ class to read from buffer (one record) [could be more?] """ def __init__(self, data, reclen, pos, byetorder = DataReader.default_byte_order, *args, **kwargs): self.data = data self.reclen = reclen self.pos = pos self._set_byteorder(byteorder = byetorder) def load(self, byetorder = DataReader.default_byte_order): self._set_byteorder(byteorder = byetorder) class FortranRead(DataReader): """ Class for reading 'unformatted' Fortran binary files. This is in part based on a code from Laurens Keek (2010). Compressed files with .gz will be automatically uncompressed. This may fail, however, if the file is bigger tha 2GB or 4GB. Compressed files with .bz2 will be automatically uncompressed. This is currently rather inefficient because to determine the file size the entire stream need to be read first. """ class DataSizeError(Exception): """ Exception raised when assert_eor finds EOR == False. This means that the data read does not match the record length. """ def __init__(self, filename, reclen, pos): self.filename = filename self.reclen = reclen self.pos = pos def __str__(self): """Return error message.""" return ("Not at end of data record in file {}.\n"+\ "Poistion = {:d}\n"+\ "RecLen = {:d}")\ .format(self.filename, self.pos, self.reclen) class RecLenError(Exception): """ Exception raised when a record was not read correctly. This means that the record header does not match the record trailer (both should contain the length of the data area). """ def __init__(self, filename, message): self.filename = filename # name of file which caused an error self.message = message def __str__(self): """Return error message.""" return "Error reading record from file {}.\n{}"\ .format(self.filename,self.message) class FileReadError(Exception): """ Exception raised when data could not be read correctly. """ def __init__(self, filename, message): self.filename = filename # name of file which caused an error self.message = message def __str__(self): """ Return error message. """ return "Error reading record from file {}: {}"\ .format(self.filename,self.message) def __init__(self, filename, extension = None, tolerant = False, byteorder = None, *args, **kwargs): """ Initialize data fields and open file. byteorder: '<', '>', '=': little, big, native endian x86 have native '<' risc have native '>' None: try native, then check (default) tolerant: True: allow partial read of record False: throw exception if record is not read in full Maybe useful for testing. """ self.tolerant = tolerant self.open(filename, extension = extension) if byteorder == None: self._set_byteorder() if not self._check_byteorder(): self._reverse_byteorder() else: self._set_byteorder( byteorder = byteorder) def _check_byteorder(self): """ deterimine if file is opened in right endian """ ok = True ok &= self.filesize >= 8 pos0 = self.file.tell() if pos0 != 0: self.file.seek(0, os.SEEK_SET) reclen = self._read_reclen() ok &= (reclen >= 0) and (reclen <= self.filesize - 8) if ok: self.file.seek(reclen, os.SEEK_CUR) ok &= reclen == self._read_reclen() if pos0 != self.file.tell(): self.file.seek(pos0, os.SEEK_SET) return ok def open(self, filename, extension = None): """Open the file.""" self.filename = os.path.expandvars(os.path.expanduser(filename)) # TODO - add automatic file selection based on extension if not os.path.exists(self.filename): fn = self.filename + '.gz' if os.path.exists(fn): self.filename = fn else: fn = self.filename + '.bz2' if os.path.exists(fn): self.filename = fn else: raise IOError("File not found.") if self.filename.endswith('.gz'): self.compressed = True self.compress_mode = 'gz' self.file = gzip.open(self.filename,'rb') pos = self.file.myfileobj.tell() self.file.myfileobj.seek(-4, os.SEEK_END) self.stat = os.fstat(self.file.myfileobj.fileno()) self.filesize = np.ndarray(1, dtype = "= self.filesize def close(self): """Close the file.""" self.file.close() def rewind(self): """Rewind the file.""" self.file.seek(0,os.SEEK_SET) self._init() def _read_reclen(self): """Return the record length.""" # fast python 2 code: # self.i4.data[0:4] = self.file.read(4) # return int(self.i4[0]) # python 3 is more annoying # in particular, data now had chunks of size of the data tyoe # and is a memory view rather than 'buffer' # the following seemd to have worked, though # b4 = np.ndarray(4,buffer=i4,dtype=np.byte) [add as class attribute] # b4.data[:]= memoryview(self.file.read(4))[:] # the following works in both P2/3 but is slower. # it does create extra objects... return int(np.frombuffer(self.file.read(self.fortran_reclen),self.reclen_dtype)) def skip(self): """Read past a record without unpacking the data.""" self._load(False) def load(self): """Read a data record from file.""" self._load(True) def _load(self,load=True): """Read in data of a record or skip, and asvance to next.""" f = self.file reclen = self._read_reclen() if load: self.data = f.read(reclen) self.reclen = reclen else: f.seek(self.reclen,os.SEEK_CUR) self.data = '' self.reclen = 0 check = self._read_reclen() if (not self.tolerant) and (check != reclen): raise self.RecLenError(self.filename, "Header lenght does not match trailer length.") self.fpos += reclen + 2 * self.fortran_reclen self.rpos += 1 self.pos = 0 def assert_eof(self): """Throw exception if current position is not end of file. This can be use to deterime whether all data has been read, i.e., as a consistency check of the read data sizes. If the file is compressed, the initial file size cannot be used to determine the the size of the file.""" if not self.eof() and not self.compressed: raise self.DataSizeError(self.filename,self.filesize,self.fpos) def load_i4n(self): """Load and read one numpy int32.""" self.load() value = self.get_i4n() self.assert_eor() return value def load_i4(self): """Load and read np.int32 to return Python integer.""" self.load() value = self.get_i4() self.assert_eor() return value def load_f8an(self,dim): """Load and read a numpy float64 array.""" self.load() value = self.get_f8an(dim) self.assert_eor() return value def load_f8an_kep_i4an(self, dim): """Load and read i4 in f8 array for kepler.""" self.load() value = self.get_f8an_kep_i4an(dim) self.assert_eor() return value def load_f8an1d0n(self,length): """Load and read a 1D np.float64 array and pad with one 0 on both sides.""" self.load() value = self.get_f8an1d0(length) self.assert_eor() return value def load_i4an(self,dim): """Load and read a np.int32 array.""" self.load() value = self.get_i4an(dim) self.assert_eor() return value def load_san(self,dim,length=8): """Load and read np.int32 to return Python integer.""" self.load() value = self.get_san(dim,length) self.assert_eor() return value class FortranWrite: """ Class for writing 'unformatted' Fortran binary files. Compressed files with .gz will be automatically compressed. This may fail, however, if the file is bigger tha 2GB or 4GB. Compressed files with .bz2 will be automatically compressed. """ # I still need to find out how to efficiently work with a buffer # for wring internally - how to extend it, etc. The plan is to # always write an entire record at once. Currently it would # appear the largest FORTRAN files I have come with records for # ppnb in KEPLER, 5000 isotopes * 2000 zones * 8 bytes ... 80 MB # if pushing it ... usually < 16 MB. So we could start with that, # then extend if ever needed. default_byte_order = ">" fortran_reclen = 4 reclen_dtype=np.dtype(np.uint32) sys_is_le = sys.byteorder == "little" native_byteorder = "<" if sys_is_le else ">" initial_buf_size = 2**24 buf_grow_factor = 2 buf_grow_limit = 2**28 # as of 2011, even my best solid stae drive will take 0.5 s to write that much class WriteError(Exception): """ Exception raised when data could not be written correctly. """ def __init__(self, filename, message): self.filename = filename self.message = message def __str__(self): """ Return error message. """ return "Error writing record to file {}: {}"\ .format(self.filename, self.message) def __init__(self, filename, byteorder = default_byte_order, buf_size = initial_buf_size): """ Initialize data fields and open file. Optionally the byte order can be specified. The default is big endian. """ # TODO: Add append mode. # not sure how/whether this will work with compressed files if byteorder == "=": byteorder = self.native_byteorder self.swapbyteorder = byteorder != self.native_byteorder self.byteorder = byteorder self.reclen_dtype = self.reclen_dtype.newbyteorder(self.byteorder) self.open(filename) self.dtype_i4 = np.dtype(np.int32).newbyteorder(self.byteorder) self.dtype_f8 = np.dtype(np.float64).newbyteorder(self.byteorder) self.i4 = np.ndarray([1],dtype=self.dtype_i4) self.f8 = np.ndarray([1],dtype=self.dtype_f8) self.buf_size - buf_size self.buffer = bytearray(self.buf_size) def open(self, filename): """ Open the file for writing. """ self.filename = os.path.expandvars(os.path.expanduser(filename)) if self.filename.endswith('.gz'): self.compressed = True self.compress_mode = 'gz' self.file = gzip.open(filename,'wb') elif self.filename.endswith('.bz2'): self.compressed = True self.compress_mode = 'bz2' self.file = bz2.BZ2File(filename,'wb',2**16) else: self.file = open(filename,'wb',-1) self.stat = os.fstat(self.file.fileno()) self.filesize = self.stat.st_size self.compressed = False self._init() def _init(self): """Initialize the file position and data to empty.""" self.fpos = 0 self.pos = 0 self.rpos = 0 self.data = '' self.reclen = 0 def close(self): """Close the file.""" self.file.close() def rewind(self): """Rewind the file.""" self.file.seek(0,os.SEEK_SET) self._init() def _write_reclen(self): """Return the record length.""" # fast python 2 code: # self.i4.data[0:4] = self.file.read(4) # return int(self.i4[0]) # python 3 is more annoying # in particular, data now had chunks of size of the data tyoe # and is a memory view rather than 'buffer' # the following seemd to have worked, though # b4 = np.ndarray(4,buffer=i4,dtype=np.byte) [add as class attribute] # b4.data[:]= memoryview(self.file.read(4))[:] # the following works in both P2/3 but is slower. # it does create extra objects... return int(np.frombuffer(self.file.read(self.fortran_reclen),self.reclen_dtype)) def write(self): """ Write the data record to file. """ self._write() def _write(self): """ Write a data record to file. """ f = self.file self._write_reclen() self.data = f.write(self.data, reclen) self._write_reclen() self.data = '' self.reclen = 0 self.fpos += reclen + 2 * self.fortran_reclen self.rpos += 1 self.pos = 0 def _extend_buf(self): """ Grow write buffer as specified. """ if self.buf_size < self.buf_grow_limit: new_buf_size = 2 * self.buf_size else: new_buf_size = self.buf_sise + self.buf_grow_limit new_buffer = bytearray(new_buf_size) new_buffer[0:self.pos] = self.bufffer[0:self.pos] del self.buffer self.buffer = new_buffer self.buf_size = new_buf_size def _check_buf_size(self, size): if pos < self.buf_size - self.fortran_reclen: self._extend_buf() def put_data(self, data): """ Just put all the record data. """ if self.pos > 0: raise self.WriteError(self.ftag + "{}: not at beginning of record.".format(self.filename)) self.pos = self.reclen self.write() def put_buf(self, data, length): """ Write some raw data. """ p = self.pos self.pos += length self.data[p:p+length] = data