Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- #Blake VandeMerwe 9/27/2012 -- Program 2
- from collections import OrderedDict as oDict
- from collections import namedtuple
- Segment = namedtuple('Segment', ['company', 'ticker', 'price', 'dollarChange', 'percentChange', 'ytdChange', 'w52high', 'w52low', 'peRatio'])
- CHUNK_IDENT = 'Last updated'
- class Observer(object):
- name = "Observer base"
- def update(self, data, dataChunk):
- raise NotImplementedError("Method not implemented")
- class Average(Observer):
- name = "Average"
- oFile = "Average.dat"
- def update(self, data, dataChunk):
- with open(Average.oFile, 'a') as f:
- averagePrice = 0
- for s in dataChunk:
- averagePrice += float(s.price)
- averagePrice = averagePrice / len(dataChunk)
- f.write('%s, Average Price: %s\n' % (data, averagePrice))
- class Change10(Observer):
- name = "Change10"
- oFile = "Change10.dat"
- def update(self, data, dataChunk):
- with open(Change10.oFile, 'a') as f:
- f.write('%s:\n' % (data))
- for s in dataChunk:
- if abs(float(s.percentChange)) > 10:
- f.write('%s %s %s\n' % (s.ticker, s.price, s.percentChange))
- f.write('\n')
- class Selections(Observer):
- name = "Selections"
- oFile = "Selections.dat"
- def __init__(self, toWatch = []):
- self.watch = toWatch
- def update(self, data, dataChunk):
- with open(Selections.oFile, 'a') as f:
- f.write('%s:\n' % (data))
- for s in dataChunk:
- if s.ticker in self.watch:
- f.write('%s\n' % (' '.join([v for v in s]).strip()))
- f.write('\n')
- class LocalStocks(object):
- def __init__(self, registers = []):
- self.data = oDict()
- self.observers = {}
- if registers: [self.register(r) for r in registers]
- def snapshot(self, data = ''):
- def _parseChunks(chunk = ''):
- newChunk = []
- for seg in chunk:
- seg = seg.split()[::-1]
- if len(seg) > 0:
- newChunk.append(Segment(
- peRatio = seg[0],
- w52low = seg[1],
- w52high = seg[2],
- ytdChange = seg[3],
- percentChange = seg[4],
- dollarChange = seg[5],
- price = seg[6],
- ticker = seg[7],
- company = ' '.join(seg[8:][::-1]).strip()))
- return newChunk
- chunks = oDict()
- chunk = ''
- for line in data:
- if CHUNK_IDENT in line:
- chunk = line.split(CHUNK_IDENT)[-1].strip()
- else:
- chunks.setdefault(chunk, []).append(line.strip())
- for c in chunks.keys():
- chunks[c] = _parseChunks(chunks[c])
- self.data = chunks
- self.notify()
- def register(self, observer):
- self.observers[observer.name] = observer
- def remove(self, observer):
- del self.observers[observer.name]
- def notify(self):
- delData = []
- for data in self.data.keys():
- delData.append(data)
- for o in self.observers.keys():
- self.observers[o].update(data, self.data[data])
- for d in delData:
- del self.data[d]
- def main():
- LS = LocalStocks([Average(), Selections(['ALL', 'BA', 'BC', 'GBEL', 'KFT', 'MCD', 'TR', 'WAG']), Change10()])
- with open('ticker.dat') as f:
- LS.snapshot(f.readlines())
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment