Package Products :: Package ZenUtils :: Module AmqpDataManager
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.AmqpDataManager

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2010, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  import sys 
 12  import time 
 13  import logging 
 14  log = logging.getLogger('zen.AmqpDataManager') 
 15   
 16  # DataManager class for adding msg queueing to zope and other XA 
 17  # transaction cohorts.  Usage with nested_transaction: 
 18  # 
 19  #  with nested_transaction(AmqpDataManager(publisher.channel)) as txn: 
 20  #      # perform zope db commands 
 21  #      # perform SQLAlchemy db commands 
 22  #      publisher.publish(msg) 
 23  # 
24 -class AmqpDataManager(object):
25 """Objects that manage transactional storage. 26 27 These objects may manage data for other objects, or they may manage 28 non-object storages, such as relational databases. For example, 29 a ZODB.Connection. 30 31 Note that when some data is modified, that data's data manager should 32 join a transaction so that data can be committed when the user commits 33 the transaction. 34 """ 35
36 - def __init__(self, channel, txnmgr = None):
37 self.channel = channel 38 self.channel.tx_select() 39 self.transaction_manager = txnmgr
40 41 #"""The transaction manager (TM) used by this data manager. 42 43 #This is a public attribute, intended for read-only use. The value 44 #is an instance of ITransactionManager, typically set by the data 45 #manager's constructor. 46 #""") 47
48 - def abort(self, transaction):
49 """Abort a transaction and forget all changes. 50 51 Abort must be called outside of a two-phase commit. 52 53 Abort is called by the transaction manager to abort transactions 54 that are not yet in a two-phase commit. 55 """ 56 # discard any messages that have been buffered 57 log.debug("abort'ed") 58 if self.channel.is_open: 59 self.channel.tx_rollback()
60 61 # Two-phase commit protocol. These methods are called by the ITransaction 62 # object associated with the transaction being committed. The sequence 63 # of calls normally follows this regular expression: 64 # tpc_begin commit tpc_vote (tpc_finish | tpc_abort) 65
66 - def tpc_begin(self, transaction):
67 """Begin commit of a transaction, starting the two-phase commit. 68 69 transaction is the ITransaction instance associated with the 70 transaction being committed. 71 """ 72 # nothing special to do here 73 log.debug("tpc_begin'ed")
74
75 - def commit(self, transaction):
76 """Commit modifications to registered objects. 77 78 Save changes to be made persistent if the transaction commits (if 79 tpc_finish is called later). If tpc_abort is called later, changes 80 must not persist. 81 82 This includes conflict detection and handling. If no conflicts or 83 errors occur, the data manager should be prepared to make the 84 changes persist when tpc_finish is called. 85 """ 86 # nothing special to do here 87 log.debug("commit'ed")
88 89
90 - def tpc_finish(self, transaction):
91 """Indicate confirmation that the transaction is done. 92 93 Make all changes to objects modified by this transaction persist. 94 95 transaction is the ITransaction instance associated with the 96 transaction being committed. 97 98 This should never fail. If this raises an exception, the 99 database is not expected to maintain consistency; it's a 100 serious error. 101 """ 102 log.debug("tpc_finish'ed") 103 try: 104 self.channel.tx_commit() 105 except Exception as e: 106 log.exception("tpc_finish completed FAIL") 107 else: 108 log.debug("tpc_finish completed OK")
109 110
111 - def tpc_vote(self, transaction):
112 """Verify that a data manager can commit the transaction. 113 114 This is the last chance for a data manager to vote 'no'. A 115 data manager votes 'no' by raising an exception. 116 117 transaction is the ITransaction instance associated with the 118 transaction being committed. 119 """ 120 # Nothing to do here 121 log.debug("tpc_voted")
122 123
124 - def tpc_abort(self, transaction):
125 """Abort a transaction. 126 127 This is called by a transaction manager to end a two-phase commit on 128 the data manager. Abandon all changes to objects modified by this 129 transaction. 130 131 transaction is the ITransaction instance associated with the 132 transaction being committed. 133 134 This should never fail. 135 """ 136 log.debug("tpc_abort'ed") 137 try: 138 self.channel.tx_rollback() 139 except Exception as e: 140 log.exception(e) 141 log.debug("tpc_abort failed with exception") 142 else: 143 log.debug("tpc_abort completed")
144 145
146 - def sortKey(self):
147 """Return a key to use for ordering registered DataManagers. 148 """ 149 150 # this data manager must always go last 151 return "~~~~~~~"
152 153 # 154 # usage outside of zope transaction 155 # with AmqpTransaction(publisher.channel) as txn: 156 # publisher.publish(msg) 157 # publisher.publish(msg2) 158 #
159 -class AmqpTransaction(object):
160 - def __init__(self, channel):
161 self.datamgr = AmqpDataManager(channel) 162 self.txnid = int(time.clock()*1e6) % sys.maxint
163
164 - def __enter__(self):
165 return self
166
167 - def __exit__(self, type, value, traceback):
168 if type is None: 169 try: 170 self.datamgr.tpc_begin(self.txnid) 171 self.datamgr.commit(self.txnid) 172 self.datamgr.tpc_vote(self.txnid) 173 self.datamgr.tpc_finish(self.txnid) 174 except Exception as e: 175 self.datamgr.tpc_abort(self.txnid) 176 raise 177 else: 178 try: 179 self.datamgr.abort(self.txnid) 180 except Exception as e: 181 pass
182