1
2
3
4
5
6
7
8
9
10
11 import sys
12 import time
13 import logging
14 log = logging.getLogger('zen.AmqpDataManager')
15
16
17
18
19
20
21
22
23
25 """Objects that manage transactional storage.
26
27 These objects may manage data for other objects, or they may manage
28 non-object storages, such as relational databases. For example,
29 a ZODB.Connection.
30
31 Note that when some data is modified, that data's data manager should
32 join a transaction so that data can be committed when the user commits
33 the transaction.
34 """
35
36 - def __init__(self, channel, txnmgr = None):
37 self.channel = channel
38 self.channel.tx_select()
39 self.transaction_manager = txnmgr
40
41
42
43
44
45
46
47
48 - def abort(self, transaction):
49 """Abort a transaction and forget all changes.
50
51 Abort must be called outside of a two-phase commit.
52
53 Abort is called by the transaction manager to abort transactions
54 that are not yet in a two-phase commit.
55 """
56
57 log.debug("abort'ed")
58 if self.channel.is_open:
59 self.channel.tx_rollback()
60
61
62
63
64
65
67 """Begin commit of a transaction, starting the two-phase commit.
68
69 transaction is the ITransaction instance associated with the
70 transaction being committed.
71 """
72
73 log.debug("tpc_begin'ed")
74
75 - def commit(self, transaction):
76 """Commit modifications to registered objects.
77
78 Save changes to be made persistent if the transaction commits (if
79 tpc_finish is called later). If tpc_abort is called later, changes
80 must not persist.
81
82 This includes conflict detection and handling. If no conflicts or
83 errors occur, the data manager should be prepared to make the
84 changes persist when tpc_finish is called.
85 """
86
87 log.debug("commit'ed")
88
89
91 """Indicate confirmation that the transaction is done.
92
93 Make all changes to objects modified by this transaction persist.
94
95 transaction is the ITransaction instance associated with the
96 transaction being committed.
97
98 This should never fail. If this raises an exception, the
99 database is not expected to maintain consistency; it's a
100 serious error.
101 """
102 log.debug("tpc_finish'ed")
103 try:
104 self.channel.tx_commit()
105 except Exception as e:
106 log.exception("tpc_finish completed FAIL")
107 else:
108 log.debug("tpc_finish completed OK")
109
110
112 """Verify that a data manager can commit the transaction.
113
114 This is the last chance for a data manager to vote 'no'. A
115 data manager votes 'no' by raising an exception.
116
117 transaction is the ITransaction instance associated with the
118 transaction being committed.
119 """
120
121 log.debug("tpc_voted")
122
123
125 """Abort a transaction.
126
127 This is called by a transaction manager to end a two-phase commit on
128 the data manager. Abandon all changes to objects modified by this
129 transaction.
130
131 transaction is the ITransaction instance associated with the
132 transaction being committed.
133
134 This should never fail.
135 """
136 log.debug("tpc_abort'ed")
137 try:
138 self.channel.tx_rollback()
139 except Exception as e:
140 log.exception(e)
141 log.debug("tpc_abort failed with exception")
142 else:
143 log.debug("tpc_abort completed")
144
145
147 """Return a key to use for ordering registered DataManagers.
148 """
149
150
151 return "~~~~~~~"
152
153
154
155
156
157
158
161 self.datamgr = AmqpDataManager(channel)
162 self.txnid = int(time.clock()*1e6) % sys.maxint
163
166
167 - def __exit__(self, type, value, traceback):
168 if type is None:
169 try:
170 self.datamgr.tpc_begin(self.txnid)
171 self.datamgr.commit(self.txnid)
172 self.datamgr.tpc_vote(self.txnid)
173 self.datamgr.tpc_finish(self.txnid)
174 except Exception as e:
175 self.datamgr.tpc_abort(self.txnid)
176 raise
177 else:
178 try:
179 self.datamgr.abort(self.txnid)
180 except Exception as e:
181 pass
182