Source code for fixedrec.fixedrec

# -*- coding: utf-8 -*-
import os


[docs]class RecordFileError(Exception): "Base Exception for block files."
[docs]class RecordFile(object): """Low level fixed-size record file. Record numbers are zero-based. """ def __init__(self, fname, blocksize=4, overwrite=False): """Initialize a new RecordFile object. :param fname: string :param overwrite: bool (True overwrite existing file) """ self.fname = fname if hasattr(fname, 'strpath'): self.fname = fname.strpath self.blocksize = blocksize self.overwrite = overwrite if not fname: raise RecordFileError("Missing file name.") if blocksize < 2: raise RecordFileError("Block size must be greater than 2 bytes.") self.fp = self.open(self.fname, overwrite) def __repr__(self): # pragma: no cover return '\n'.join(list(self)) def __str__(self): return ''.join(list(self)) def __enter__(self): return self def __exit__(self, type, value, tb): self.close()
[docs] def count(self): """Return number of records in the fils. """ return self._eof() / self.blocksize
[docs] def goto_recnum(self, n): """Position the file at record `n`, if `n` == -1, then go to the end of the file (writing then becomes appending). It is possible to go past the end of the file and write a record, causing the file to grow. """ if n == -1: self.fp.seek(0, 2) else: self.fp.seek(n * self.blocksize, 0)
[docs] def truncate(self, recnum=None): "Truncate the file at `recnum` (ie. `recnum` will be gone)." if recnum is not None: self.goto_recnum(recnum) self.fp.truncate()
[docs] def goto_recnum_relative(self, n): """Advance forward or backward (negative `n`) `n` records. """ self.fp.seek(n * self.blocksize, 1)
[docs] def goto_first_record(self): "Go to start of first record." self.fp.seek(0, 0)
[docs] def goto_last_record(self): "Go to the starting position of the last record." self.goto_recnum(len(self) - 1)
[docs] def read(self): "Read a block at the current position." return self.fp.read(self.blocksize)
[docs] def write(self, data, flush=True): "Write data to file at current position." if len(data) != self.blocksize: raise RecordFileError( "Tried to write data (%d) which " % len(data) + "didn't fit in blocksize (%d)." % self.blocksize) self.fp.write(data) if flush: self.fp.flush()
def _eof(self): "Return the position at the end of the file" self.fp.seek(0, 2) eof = self.fp.tell() return eof @property def curpos(self): "Return the current position." return self.fp.tell() / self.blocksize def __iter__(self): "Yield all records." eof = self._eof() self.goto_first_record() while self.fp.tell() < eof: data = self.read() yield data def __len__(self): "Returns the number of records in the file." return self.count() def __getitem__(self, n): "Get record number `n`." self.goto_recnum(n) return self.read() def __setitem__(self, n, data): "Set record number `n`." self.goto_recnum(n) data = data if isinstance(data, list) else [data] for rec in data: self.write(rec)
[docs] def swap(self, a, b): "Swap records at positions `a` and `b`." self[a], self[b] = self[b], self[a]
def __delitem__(self, n): """Move record to the end of the file, then truncate the file. """ length = len(self) - 1 self.swap(n, length) self.truncate(length)
[docs] def open(self, fname, overwrite): "Open or create the file." if overwrite: fp = self.create_new_file(fname) else: try: fp = self.open_existing_file(fname) except IOError as e: if not (e.errno == 2 and e.strerror == 'No such file or directory'): raise # pragma: no cover fp = self.create_new_file(fname) return fp
[docs] def open_existing_file(self, fname): "Open the file for read and write if possible." if os.access(fname, os.W_OK): fp = open(fname, 'r+b') else: fp = open(fname, 'rb') return fp
[docs] def create_new_file(self, fname): """Create a new keystore file (overwriting/deleting any existing file. Args: fname (str): Filename to use. Returns: (file): The created file. """ return open(fname, 'w+b')
[docs] def close(self): "Close file and write statusrec." self.fp.flush() self.fp.close()
def __del__(self): try: self.close() except: pass