Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
journal.c
Go to the documentation of this file.
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * journal.c
5  *
6  * Defines functions of journalling api
7  *
8  * Copyright (C) 2003, 2004 Oracle. All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25 
26 #include <linux/fs.h>
27 #include <linux/types.h>
28 #include <linux/slab.h>
29 #include <linux/highmem.h>
30 #include <linux/kthread.h>
31 #include <linux/time.h>
32 #include <linux/random.h>
33 
34 #include <cluster/masklog.h>
35 
36 #include "ocfs2.h"
37 
38 #include "alloc.h"
39 #include "blockcheck.h"
40 #include "dir.h"
41 #include "dlmglue.h"
42 #include "extent_map.h"
43 #include "heartbeat.h"
44 #include "inode.h"
45 #include "journal.h"
46 #include "localalloc.h"
47 #include "slot_map.h"
48 #include "super.h"
49 #include "sysfile.h"
50 #include "uptodate.h"
51 #include "quota.h"
52 
53 #include "buffer_head_io.h"
54 #include "ocfs2_trace.h"
55 
56 DEFINE_SPINLOCK(trans_inc_lock);
57 
58 #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
59 
60 static int ocfs2_force_read_journal(struct inode *inode);
61 static int ocfs2_recover_node(struct ocfs2_super *osb,
62  int node_num, int slot_num);
63 static int __ocfs2_recovery_thread(void *arg);
64 static int ocfs2_commit_cache(struct ocfs2_super *osb);
65 static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
66 static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
67  int dirty, int replayed);
68 static int ocfs2_trylock_journal(struct ocfs2_super *osb,
69  int slot_num);
70 static int ocfs2_recover_orphans(struct ocfs2_super *osb,
71  int slot);
72 static int ocfs2_commit_thread(void *arg);
73 static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
74  int slot_num,
75  struct ocfs2_dinode *la_dinode,
76  struct ocfs2_dinode *tl_dinode,
77  struct ocfs2_quota_recovery *qrec);
78 
79 static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb)
80 {
81  return __ocfs2_wait_on_mount(osb, 0);
82 }
83 
84 static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb)
85 {
86  return __ocfs2_wait_on_mount(osb, 1);
87 }
88 
89 /*
90  * This replay_map is to track online/offline slots, so we could recover
91  * offline slots during recovery and mount
92  */
93 
95  REPLAY_UNNEEDED = 0, /* Replay is not needed, so ignore this map */
96  REPLAY_NEEDED, /* Replay slots marked in rm_replay_slots */
97  REPLAY_DONE /* Replay was already queued */
98 };
99 
101  unsigned int rm_slots;
103  unsigned char rm_replay_slots[0];
104 };
105 
107 {
108  if (!osb->replay_map)
109  return;
110 
111  /* If we've already queued the replay, we don't have any more to do */
112  if (osb->replay_map->rm_state == REPLAY_DONE)
113  return;
114 
115  osb->replay_map->rm_state = state;
116 }
117 
119 {
120  struct ocfs2_replay_map *replay_map;
121  int i, node_num;
122 
123  /* If replay map is already set, we don't do it again */
124  if (osb->replay_map)
125  return 0;
126 
127  replay_map = kzalloc(sizeof(struct ocfs2_replay_map) +
128  (osb->max_slots * sizeof(char)), GFP_KERNEL);
129 
130  if (!replay_map) {
131  mlog_errno(-ENOMEM);
132  return -ENOMEM;
133  }
134 
135  spin_lock(&osb->osb_lock);
136 
137  replay_map->rm_slots = osb->max_slots;
138  replay_map->rm_state = REPLAY_UNNEEDED;
139 
140  /* set rm_replay_slots for offline slot(s) */
141  for (i = 0; i < replay_map->rm_slots; i++) {
142  if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT)
143  replay_map->rm_replay_slots[i] = 1;
144  }
145 
146  osb->replay_map = replay_map;
147  spin_unlock(&osb->osb_lock);
148  return 0;
149 }
150 
152 {
153  struct ocfs2_replay_map *replay_map = osb->replay_map;
154  int i;
155 
156  if (!replay_map)
157  return;
158 
159  if (replay_map->rm_state != REPLAY_NEEDED)
160  return;
161 
162  for (i = 0; i < replay_map->rm_slots; i++)
163  if (replay_map->rm_replay_slots[i])
164  ocfs2_queue_recovery_completion(osb->journal, i, NULL,
165  NULL, NULL);
166  replay_map->rm_state = REPLAY_DONE;
167 }
168 
170 {
171  struct ocfs2_replay_map *replay_map = osb->replay_map;
172 
173  if (!osb->replay_map)
174  return;
175 
176  kfree(replay_map);
177  osb->replay_map = NULL;
178 }
179 
181 {
182  struct ocfs2_recovery_map *rm;
183 
184  mutex_init(&osb->recovery_lock);
185  osb->disable_recovery = 0;
186  osb->recovery_thread_task = NULL;
188 
189  rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
190  osb->max_slots * sizeof(unsigned int),
191  GFP_KERNEL);
192  if (!rm) {
193  mlog_errno(-ENOMEM);
194  return -ENOMEM;
195  }
196 
197  rm->rm_entries = (unsigned int *)((char *)rm +
198  sizeof(struct ocfs2_recovery_map));
199  osb->recovery_map = rm;
200 
201  return 0;
202 }
203 
204 /* we can't grab the goofy sem lock from inside wait_event, so we use
205  * memory barriers to make sure that we'll see the null task before
206  * being woken up */
207 static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
208 {
209  mb();
210  return osb->recovery_thread_task != NULL;
211 }
212 
214 {
215  struct ocfs2_recovery_map *rm;
216 
217  /* disable any new recovery threads and wait for any currently
218  * running ones to exit. Do this before setting the vol_state. */
219  mutex_lock(&osb->recovery_lock);
220  osb->disable_recovery = 1;
222  wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
223 
224  /* At this point, we know that no more recovery threads can be
225  * launched, so wait for any recovery completion work to
226  * complete. */
228 
229  /*
230  * Now that recovery is shut down, and the osb is about to be
231  * freed, the osb_lock is not taken here.
232  */
233  rm = osb->recovery_map;
234  /* XXX: Should we bug if there are dirty entries? */
235 
236  kfree(rm);
237 }
238 
239 static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
240  unsigned int node_num)
241 {
242  int i;
243  struct ocfs2_recovery_map *rm = osb->recovery_map;
244 
246 
247  for (i = 0; i < rm->rm_used; i++) {
248  if (rm->rm_entries[i] == node_num)
249  return 1;
250  }
251 
252  return 0;
253 }
254 
255 /* Behaves like test-and-set. Returns the previous value */
256 static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
257  unsigned int node_num)
258 {
259  struct ocfs2_recovery_map *rm = osb->recovery_map;
260 
261  spin_lock(&osb->osb_lock);
262  if (__ocfs2_recovery_map_test(osb, node_num)) {
263  spin_unlock(&osb->osb_lock);
264  return 1;
265  }
266 
267  /* XXX: Can this be exploited? Not from o2dlm... */
268  BUG_ON(rm->rm_used >= osb->max_slots);
269 
270  rm->rm_entries[rm->rm_used] = node_num;
271  rm->rm_used++;
272  spin_unlock(&osb->osb_lock);
273 
274  return 0;
275 }
276 
277 static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
278  unsigned int node_num)
279 {
280  int i;
281  struct ocfs2_recovery_map *rm = osb->recovery_map;
282 
283  spin_lock(&osb->osb_lock);
284 
285  for (i = 0; i < rm->rm_used; i++) {
286  if (rm->rm_entries[i] == node_num)
287  break;
288  }
289 
290  if (i < rm->rm_used) {
291  /* XXX: be careful with the pointer math */
292  memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
293  (rm->rm_used - i - 1) * sizeof(unsigned int));
294  rm->rm_used--;
295  }
296 
297  spin_unlock(&osb->osb_lock);
298 }
299 
300 static int ocfs2_commit_cache(struct ocfs2_super *osb)
301 {
302  int status = 0;
303  unsigned int flushed;
304  struct ocfs2_journal *journal = NULL;
305 
306  journal = osb->journal;
307 
308  /* Flush all pending commits and checkpoint the journal. */
309  down_write(&journal->j_trans_barrier);
310 
311  flushed = atomic_read(&journal->j_num_trans);
312  trace_ocfs2_commit_cache_begin(flushed);
313  if (flushed == 0) {
314  up_write(&journal->j_trans_barrier);
315  goto finally;
316  }
317 
319  status = jbd2_journal_flush(journal->j_journal);
321  if (status < 0) {
322  up_write(&journal->j_trans_barrier);
323  mlog_errno(status);
324  goto finally;
325  }
326 
327  ocfs2_inc_trans_id(journal);
328 
329  flushed = atomic_read(&journal->j_num_trans);
330  atomic_set(&journal->j_num_trans, 0);
331  up_write(&journal->j_trans_barrier);
332 
333  trace_ocfs2_commit_cache_end(journal->j_trans_id, flushed);
334 
336  wake_up(&journal->j_checkpointed);
337 finally:
338  return status;
339 }
340 
341 handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs)
342 {
343  journal_t *journal = osb->journal->j_journal;
344  handle_t *handle;
345 
346  BUG_ON(!osb || !osb->journal->j_journal);
347 
348  if (ocfs2_is_hard_readonly(osb))
349  return ERR_PTR(-EROFS);
350 
351  BUG_ON(osb->journal->j_state == OCFS2_JOURNAL_FREE);
352  BUG_ON(max_buffs <= 0);
353 
354  /* Nested transaction? Just return the handle... */
355  if (journal_current_handle())
356  return jbd2_journal_start(journal, max_buffs);
357 
358  sb_start_intwrite(osb->sb);
359 
360  down_read(&osb->journal->j_trans_barrier);
361 
362  handle = jbd2_journal_start(journal, max_buffs);
363  if (IS_ERR(handle)) {
364  up_read(&osb->journal->j_trans_barrier);
365  sb_end_intwrite(osb->sb);
366 
367  mlog_errno(PTR_ERR(handle));
368 
369  if (is_journal_aborted(journal)) {
370  ocfs2_abort(osb->sb, "Detected aborted journal");
371  handle = ERR_PTR(-EROFS);
372  }
373  } else {
374  if (!ocfs2_mount_local(osb))
375  atomic_inc(&(osb->journal->j_num_trans));
376  }
377 
378  return handle;
379 }
380 
382  handle_t *handle)
383 {
384  int ret, nested;
385  struct ocfs2_journal *journal = osb->journal;
386 
387  BUG_ON(!handle);
388 
389  nested = handle->h_ref > 1;
390  ret = jbd2_journal_stop(handle);
391  if (ret < 0)
392  mlog_errno(ret);
393 
394  if (!nested) {
395  up_read(&journal->j_trans_barrier);
396  sb_end_intwrite(osb->sb);
397  }
398 
399  return ret;
400 }
401 
402 /*
403  * 'nblocks' is what you want to add to the current transaction.
404  *
405  * This might call jbd2_journal_restart() which will commit dirty buffers
406  * and then restart the transaction. Before calling
407  * ocfs2_extend_trans(), any changed blocks should have been
408  * dirtied. After calling it, all blocks which need to be changed must
409  * go through another set of journal_access/journal_dirty calls.
410  *
411  * WARNING: This will not release any semaphores or disk locks taken
412  * during the transaction, so make sure they were taken *before*
413  * start_trans or we'll have ordering deadlocks.
414  *
415  * WARNING2: Note that we do *not* drop j_trans_barrier here. This is
416  * good because transaction ids haven't yet been recorded on the
417  * cluster locks associated with this handle.
418  */
419 int ocfs2_extend_trans(handle_t *handle, int nblocks)
420 {
421  int status, old_nblocks;
422 
423  BUG_ON(!handle);
424  BUG_ON(nblocks < 0);
425 
426  if (!nblocks)
427  return 0;
428 
429  old_nblocks = handle->h_buffer_credits;
430 
431  trace_ocfs2_extend_trans(old_nblocks, nblocks);
432 
433 #ifdef CONFIG_OCFS2_DEBUG_FS
434  status = 1;
435 #else
436  status = jbd2_journal_extend(handle, nblocks);
437  if (status < 0) {
438  mlog_errno(status);
439  goto bail;
440  }
441 #endif
442 
443  if (status > 0) {
444  trace_ocfs2_extend_trans_restart(old_nblocks + nblocks);
445  status = jbd2_journal_restart(handle,
446  old_nblocks + nblocks);
447  if (status < 0) {
448  mlog_errno(status);
449  goto bail;
450  }
451  }
452 
453  status = 0;
454 bail:
455  return status;
456 }
457 
459  struct jbd2_buffer_trigger_type ot_triggers;
461 };
462 
463 static inline struct ocfs2_triggers *to_ocfs2_trigger(struct jbd2_buffer_trigger_type *triggers)
464 {
465  return container_of(triggers, struct ocfs2_triggers, ot_triggers);
466 }
467 
468 static void ocfs2_frozen_trigger(struct jbd2_buffer_trigger_type *triggers,
469  struct buffer_head *bh,
470  void *data, size_t size)
471 {
472  struct ocfs2_triggers *ot = to_ocfs2_trigger(triggers);
473 
474  /*
475  * We aren't guaranteed to have the superblock here, so we
476  * must unconditionally compute the ecc data.
477  * __ocfs2_journal_access() will only set the triggers if
478  * metaecc is enabled.
479  */
480  ocfs2_block_check_compute(data, size, data + ot->ot_offset);
481 }
482 
483 /*
484  * Quota blocks have their own trigger because the struct ocfs2_block_check
485  * offset depends on the blocksize.
486  */
487 static void ocfs2_dq_frozen_trigger(struct jbd2_buffer_trigger_type *triggers,
488  struct buffer_head *bh,
489  void *data, size_t size)
490 {
491  struct ocfs2_disk_dqtrailer *dqt =
492  ocfs2_block_dqtrailer(size, data);
493 
494  /*
495  * We aren't guaranteed to have the superblock here, so we
496  * must unconditionally compute the ecc data.
497  * __ocfs2_journal_access() will only set the triggers if
498  * metaecc is enabled.
499  */
500  ocfs2_block_check_compute(data, size, &dqt->dq_check);
501 }
502 
503 /*
504  * Directory blocks also have their own trigger because the
505  * struct ocfs2_block_check offset depends on the blocksize.
506  */
507 static void ocfs2_db_frozen_trigger(struct jbd2_buffer_trigger_type *triggers,
508  struct buffer_head *bh,
509  void *data, size_t size)
510 {
511  struct ocfs2_dir_block_trailer *trailer =
512  ocfs2_dir_trailer_from_size(size, data);
513 
514  /*
515  * We aren't guaranteed to have the superblock here, so we
516  * must unconditionally compute the ecc data.
517  * __ocfs2_journal_access() will only set the triggers if
518  * metaecc is enabled.
519  */
520  ocfs2_block_check_compute(data, size, &trailer->db_check);
521 }
522 
523 static void ocfs2_abort_trigger(struct jbd2_buffer_trigger_type *triggers,
524  struct buffer_head *bh)
525 {
526  mlog(ML_ERROR,
527  "ocfs2_abort_trigger called by JBD2. bh = 0x%lx, "
528  "bh->b_blocknr = %llu\n",
529  (unsigned long)bh,
530  (unsigned long long)bh->b_blocknr);
531 
532  /* We aren't guaranteed to have the superblock here - but if we
533  * don't, it'll just crash. */
534  ocfs2_error(bh->b_assoc_map->host->i_sb,
535  "JBD2 has aborted our journal, ocfs2 cannot continue\n");
536 }
537 
538 static struct ocfs2_triggers di_triggers = {
539  .ot_triggers = {
540  .t_frozen = ocfs2_frozen_trigger,
541  .t_abort = ocfs2_abort_trigger,
542  },
543  .ot_offset = offsetof(struct ocfs2_dinode, i_check),
544 };
545 
546 static struct ocfs2_triggers eb_triggers = {
547  .ot_triggers = {
548  .t_frozen = ocfs2_frozen_trigger,
549  .t_abort = ocfs2_abort_trigger,
550  },
551  .ot_offset = offsetof(struct ocfs2_extent_block, h_check),
552 };
553 
554 static struct ocfs2_triggers rb_triggers = {
555  .ot_triggers = {
556  .t_frozen = ocfs2_frozen_trigger,
557  .t_abort = ocfs2_abort_trigger,
558  },
559  .ot_offset = offsetof(struct ocfs2_refcount_block, rf_check),
560 };
561 
562 static struct ocfs2_triggers gd_triggers = {
563  .ot_triggers = {
564  .t_frozen = ocfs2_frozen_trigger,
565  .t_abort = ocfs2_abort_trigger,
566  },
567  .ot_offset = offsetof(struct ocfs2_group_desc, bg_check),
568 };
569 
570 static struct ocfs2_triggers db_triggers = {
571  .ot_triggers = {
572  .t_frozen = ocfs2_db_frozen_trigger,
573  .t_abort = ocfs2_abort_trigger,
574  },
575 };
576 
577 static struct ocfs2_triggers xb_triggers = {
578  .ot_triggers = {
579  .t_frozen = ocfs2_frozen_trigger,
580  .t_abort = ocfs2_abort_trigger,
581  },
582  .ot_offset = offsetof(struct ocfs2_xattr_block, xb_check),
583 };
584 
585 static struct ocfs2_triggers dq_triggers = {
586  .ot_triggers = {
587  .t_frozen = ocfs2_dq_frozen_trigger,
588  .t_abort = ocfs2_abort_trigger,
589  },
590 };
591 
592 static struct ocfs2_triggers dr_triggers = {
593  .ot_triggers = {
594  .t_frozen = ocfs2_frozen_trigger,
595  .t_abort = ocfs2_abort_trigger,
596  },
597  .ot_offset = offsetof(struct ocfs2_dx_root_block, dr_check),
598 };
599 
600 static struct ocfs2_triggers dl_triggers = {
601  .ot_triggers = {
602  .t_frozen = ocfs2_frozen_trigger,
603  .t_abort = ocfs2_abort_trigger,
604  },
605  .ot_offset = offsetof(struct ocfs2_dx_leaf, dl_check),
606 };
607 
608 static int __ocfs2_journal_access(handle_t *handle,
609  struct ocfs2_caching_info *ci,
610  struct buffer_head *bh,
611  struct ocfs2_triggers *triggers,
612  int type)
613 {
614  int status;
615  struct ocfs2_super *osb =
617 
618  BUG_ON(!ci || !ci->ci_ops);
619  BUG_ON(!handle);
620  BUG_ON(!bh);
621 
622  trace_ocfs2_journal_access(
623  (unsigned long long)ocfs2_metadata_cache_owner(ci),
624  (unsigned long long)bh->b_blocknr, type, bh->b_size);
625 
626  /* we can safely remove this assertion after testing. */
627  if (!buffer_uptodate(bh)) {
628  mlog(ML_ERROR, "giving me a buffer that's not uptodate!\n");
629  mlog(ML_ERROR, "b_blocknr=%llu\n",
630  (unsigned long long)bh->b_blocknr);
631  BUG();
632  }
633 
634  /* Set the current transaction information on the ci so
635  * that the locking code knows whether it can drop it's locks
636  * on this ci or not. We're protected from the commit
637  * thread updating the current transaction id until
638  * ocfs2_commit_trans() because ocfs2_start_trans() took
639  * j_trans_barrier for us. */
640  ocfs2_set_ci_lock_trans(osb->journal, ci);
641 
643  switch (type) {
646  status = jbd2_journal_get_write_access(handle, bh);
647  break;
648 
650  status = jbd2_journal_get_undo_access(handle, bh);
651  break;
652 
653  default:
654  status = -EINVAL;
655  mlog(ML_ERROR, "Unknown access type!\n");
656  }
657  if (!status && ocfs2_meta_ecc(osb) && triggers)
658  jbd2_journal_set_triggers(bh, &triggers->ot_triggers);
660 
661  if (status < 0)
662  mlog(ML_ERROR, "Error %d getting %d access to buffer!\n",
663  status, type);
664 
665  return status;
666 }
667 
668 int ocfs2_journal_access_di(handle_t *handle, struct ocfs2_caching_info *ci,
669  struct buffer_head *bh, int type)
670 {
671  return __ocfs2_journal_access(handle, ci, bh, &di_triggers, type);
672 }
673 
674 int ocfs2_journal_access_eb(handle_t *handle, struct ocfs2_caching_info *ci,
675  struct buffer_head *bh, int type)
676 {
677  return __ocfs2_journal_access(handle, ci, bh, &eb_triggers, type);
678 }
679 
680 int ocfs2_journal_access_rb(handle_t *handle, struct ocfs2_caching_info *ci,
681  struct buffer_head *bh, int type)
682 {
683  return __ocfs2_journal_access(handle, ci, bh, &rb_triggers,
684  type);
685 }
686 
687 int ocfs2_journal_access_gd(handle_t *handle, struct ocfs2_caching_info *ci,
688  struct buffer_head *bh, int type)
689 {
690  return __ocfs2_journal_access(handle, ci, bh, &gd_triggers, type);
691 }
692 
693 int ocfs2_journal_access_db(handle_t *handle, struct ocfs2_caching_info *ci,
694  struct buffer_head *bh, int type)
695 {
696  return __ocfs2_journal_access(handle, ci, bh, &db_triggers, type);
697 }
698 
699 int ocfs2_journal_access_xb(handle_t *handle, struct ocfs2_caching_info *ci,
700  struct buffer_head *bh, int type)
701 {
702  return __ocfs2_journal_access(handle, ci, bh, &xb_triggers, type);
703 }
704 
705 int ocfs2_journal_access_dq(handle_t *handle, struct ocfs2_caching_info *ci,
706  struct buffer_head *bh, int type)
707 {
708  return __ocfs2_journal_access(handle, ci, bh, &dq_triggers, type);
709 }
710 
711 int ocfs2_journal_access_dr(handle_t *handle, struct ocfs2_caching_info *ci,
712  struct buffer_head *bh, int type)
713 {
714  return __ocfs2_journal_access(handle, ci, bh, &dr_triggers, type);
715 }
716 
717 int ocfs2_journal_access_dl(handle_t *handle, struct ocfs2_caching_info *ci,
718  struct buffer_head *bh, int type)
719 {
720  return __ocfs2_journal_access(handle, ci, bh, &dl_triggers, type);
721 }
722 
723 int ocfs2_journal_access(handle_t *handle, struct ocfs2_caching_info *ci,
724  struct buffer_head *bh, int type)
725 {
726  return __ocfs2_journal_access(handle, ci, bh, NULL, type);
727 }
728 
729 void ocfs2_journal_dirty(handle_t *handle, struct buffer_head *bh)
730 {
731  int status;
732 
733  trace_ocfs2_journal_dirty((unsigned long long)bh->b_blocknr);
734 
735  status = jbd2_journal_dirty_metadata(handle, bh);
736  BUG_ON(status);
737 }
738 
739 #define OCFS2_DEFAULT_COMMIT_INTERVAL (HZ * JBD2_DEFAULT_MAX_COMMIT_AGE)
740 
742 {
743  journal_t *journal = osb->journal->j_journal;
744  unsigned long commit_interval = OCFS2_DEFAULT_COMMIT_INTERVAL;
745 
746  if (osb->osb_commit_interval)
747  commit_interval = osb->osb_commit_interval;
748 
749  write_lock(&journal->j_state_lock);
750  journal->j_commit_interval = commit_interval;
751  if (osb->s_mount_opt & OCFS2_MOUNT_BARRIER)
752  journal->j_flags |= JBD2_BARRIER;
753  else
754  journal->j_flags &= ~JBD2_BARRIER;
755  write_unlock(&journal->j_state_lock);
756 }
757 
758 int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty)
759 {
760  int status = -1;
761  struct inode *inode = NULL; /* the journal inode */
762  journal_t *j_journal = NULL;
763  struct ocfs2_dinode *di = NULL;
764  struct buffer_head *bh = NULL;
765  struct ocfs2_super *osb;
766  int inode_lock = 0;
767 
768  BUG_ON(!journal);
769 
770  osb = journal->j_osb;
771 
772  /* already have the inode for our journal */
774  osb->slot_num);
775  if (inode == NULL) {
776  status = -EACCES;
777  mlog_errno(status);
778  goto done;
779  }
780  if (is_bad_inode(inode)) {
781  mlog(ML_ERROR, "access error (bad inode)\n");
782  iput(inode);
783  inode = NULL;
784  status = -EACCES;
785  goto done;
786  }
787 
788  SET_INODE_JOURNAL(inode);
789  OCFS2_I(inode)->ip_open_count++;
790 
791  /* Skip recovery waits here - journal inode metadata never
792  * changes in a live cluster so it can be considered an
793  * exception to the rule. */
794  status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY);
795  if (status < 0) {
796  if (status != -ERESTARTSYS)
797  mlog(ML_ERROR, "Could not get lock on journal!\n");
798  goto done;
799  }
800 
801  inode_lock = 1;
802  di = (struct ocfs2_dinode *)bh->b_data;
803 
804  if (inode->i_size < OCFS2_MIN_JOURNAL_SIZE) {
805  mlog(ML_ERROR, "Journal file size (%lld) is too small!\n",
806  inode->i_size);
807  status = -EINVAL;
808  goto done;
809  }
810 
811  trace_ocfs2_journal_init(inode->i_size,
812  (unsigned long long)inode->i_blocks,
813  OCFS2_I(inode)->ip_clusters);
814 
815  /* call the kernels journal init function now */
816  j_journal = jbd2_journal_init_inode(inode);
817  if (j_journal == NULL) {
818  mlog(ML_ERROR, "Linux journal layer error\n");
819  status = -EINVAL;
820  goto done;
821  }
822 
823  trace_ocfs2_journal_init_maxlen(j_journal->j_maxlen);
824 
825  *dirty = (le32_to_cpu(di->id1.journal1.ij_flags) &
827 
828  journal->j_journal = j_journal;
829  journal->j_inode = inode;
830  journal->j_bh = bh;
831 
833 
834  journal->j_state = OCFS2_JOURNAL_LOADED;
835 
836  status = 0;
837 done:
838  if (status < 0) {
839  if (inode_lock)
840  ocfs2_inode_unlock(inode, 1);
841  brelse(bh);
842  if (inode) {
843  OCFS2_I(inode)->ip_open_count--;
844  iput(inode);
845  }
846  }
847 
848  return status;
849 }
850 
851 static void ocfs2_bump_recovery_generation(struct ocfs2_dinode *di)
852 {
853  le32_add_cpu(&(di->id1.journal1.ij_recovery_generation), 1);
854 }
855 
856 static u32 ocfs2_get_recovery_generation(struct ocfs2_dinode *di)
857 {
858  return le32_to_cpu(di->id1.journal1.ij_recovery_generation);
859 }
860 
861 static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
862  int dirty, int replayed)
863 {
864  int status;
865  unsigned int flags;
866  struct ocfs2_journal *journal = osb->journal;
867  struct buffer_head *bh = journal->j_bh;
868  struct ocfs2_dinode *fe;
869 
870  fe = (struct ocfs2_dinode *)bh->b_data;
871 
872  /* The journal bh on the osb always comes from ocfs2_journal_init()
873  * and was validated there inside ocfs2_inode_lock_full(). It's a
874  * code bug if we mess it up. */
876 
877  flags = le32_to_cpu(fe->id1.journal1.ij_flags);
878  if (dirty)
879  flags |= OCFS2_JOURNAL_DIRTY_FL;
880  else
881  flags &= ~OCFS2_JOURNAL_DIRTY_FL;
882  fe->id1.journal1.ij_flags = cpu_to_le32(flags);
883 
884  if (replayed)
885  ocfs2_bump_recovery_generation(fe);
886 
887  ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
888  status = ocfs2_write_block(osb, bh, INODE_CACHE(journal->j_inode));
889  if (status < 0)
890  mlog_errno(status);
891 
892  return status;
893 }
894 
895 /*
896  * If the journal has been kmalloc'd it needs to be freed after this
897  * call.
898  */
900 {
901  struct ocfs2_journal *journal = NULL;
902  int status = 0;
903  struct inode *inode = NULL;
904  int num_running_trans = 0;
905 
906  BUG_ON(!osb);
907 
908  journal = osb->journal;
909  if (!journal)
910  goto done;
911 
912  inode = journal->j_inode;
913 
914  if (journal->j_state != OCFS2_JOURNAL_LOADED)
915  goto done;
916 
917  /* need to inc inode use count - jbd2_journal_destroy will iput. */
918  if (!igrab(inode))
919  BUG();
920 
921  num_running_trans = atomic_read(&(osb->journal->j_num_trans));
922  trace_ocfs2_journal_shutdown(num_running_trans);
923 
924  /* Do a commit_cache here. It will flush our journal, *and*
925  * release any locks that are still held.
926  * set the SHUTDOWN flag and release the trans lock.
927  * the commit thread will take the trans lock for us below. */
929 
930  /* The OCFS2_JOURNAL_IN_SHUTDOWN will signal to commit_cache to not
931  * drop the trans_lock (which we want to hold until we
932  * completely destroy the journal. */
933  if (osb->commit_task) {
934  /* Wait for the commit thread */
935  trace_ocfs2_journal_shutdown_wait(osb->commit_task);
937  osb->commit_task = NULL;
938  }
939 
940  BUG_ON(atomic_read(&(osb->journal->j_num_trans)) != 0);
941 
942  if (ocfs2_mount_local(osb)) {
944  status = jbd2_journal_flush(journal->j_journal);
946  if (status < 0)
947  mlog_errno(status);
948  }
949 
950  if (status == 0) {
951  /*
952  * Do not toggle if flush was unsuccessful otherwise
953  * will leave dirty metadata in a "clean" journal
954  */
955  status = ocfs2_journal_toggle_dirty(osb, 0, 0);
956  if (status < 0)
957  mlog_errno(status);
958  }
959 
960  /* Shutdown the kernel journal system */
962  journal->j_journal = NULL;
963 
964  OCFS2_I(inode)->ip_open_count--;
965 
966  /* unlock our journal */
967  ocfs2_inode_unlock(inode, 1);
968 
969  brelse(journal->j_bh);
970  journal->j_bh = NULL;
971 
972  journal->j_state = OCFS2_JOURNAL_FREE;
973 
974 // up_write(&journal->j_trans_barrier);
975 done:
976  if (inode)
977  iput(inode);
978 }
979 
980 static void ocfs2_clear_journal_error(struct super_block *sb,
981  journal_t *journal,
982  int slot)
983 {
984  int olderr;
985 
986  olderr = jbd2_journal_errno(journal);
987  if (olderr) {
988  mlog(ML_ERROR, "File system error %d recorded in "
989  "journal %u.\n", olderr, slot);
990  mlog(ML_ERROR, "File system on device %s needs checking.\n",
991  sb->s_id);
992 
993  jbd2_journal_ack_err(journal);
994  jbd2_journal_clear_err(journal);
995  }
996 }
997 
998 int ocfs2_journal_load(struct ocfs2_journal *journal, int local, int replayed)
999 {
1000  int status = 0;
1001  struct ocfs2_super *osb;
1002 
1003  BUG_ON(!journal);
1004 
1005  osb = journal->j_osb;
1006 
1007  status = jbd2_journal_load(journal->j_journal);
1008  if (status < 0) {
1009  mlog(ML_ERROR, "Failed to load journal!\n");
1010  goto done;
1011  }
1012 
1013  ocfs2_clear_journal_error(osb->sb, journal->j_journal, osb->slot_num);
1014 
1015  status = ocfs2_journal_toggle_dirty(osb, 1, replayed);
1016  if (status < 0) {
1017  mlog_errno(status);
1018  goto done;
1019  }
1020 
1021  /* Launch the commit thread */
1022  if (!local) {
1023  osb->commit_task = kthread_run(ocfs2_commit_thread, osb,
1024  "ocfs2cmt");
1025  if (IS_ERR(osb->commit_task)) {
1026  status = PTR_ERR(osb->commit_task);
1027  osb->commit_task = NULL;
1028  mlog(ML_ERROR, "unable to launch ocfs2commit thread, "
1029  "error=%d", status);
1030  goto done;
1031  }
1032  } else
1033  osb->commit_task = NULL;
1034 
1035 done:
1036  return status;
1037 }
1038 
1039 
1040 /* 'full' flag tells us whether we clear out all blocks or if we just
1041  * mark the journal clean */
1042 int ocfs2_journal_wipe(struct ocfs2_journal *journal, int full)
1043 {
1044  int status;
1045 
1046  BUG_ON(!journal);
1047 
1048  status = jbd2_journal_wipe(journal->j_journal, full);
1049  if (status < 0) {
1050  mlog_errno(status);
1051  goto bail;
1052  }
1053 
1054  status = ocfs2_journal_toggle_dirty(journal->j_osb, 0, 0);
1055  if (status < 0)
1056  mlog_errno(status);
1057 
1058 bail:
1059  return status;
1060 }
1061 
1062 static int ocfs2_recovery_completed(struct ocfs2_super *osb)
1063 {
1064  int empty;
1065  struct ocfs2_recovery_map *rm = osb->recovery_map;
1066 
1067  spin_lock(&osb->osb_lock);
1068  empty = (rm->rm_used == 0);
1069  spin_unlock(&osb->osb_lock);
1070 
1071  return empty;
1072 }
1073 
1075 {
1076  wait_event(osb->recovery_event, ocfs2_recovery_completed(osb));
1077 }
1078 
1079 /*
1080  * JBD Might read a cached version of another nodes journal file. We
1081  * don't want this as this file changes often and we get no
1082  * notification on those changes. The only way to be sure that we've
1083  * got the most up to date version of those blocks then is to force
1084  * read them off disk. Just searching through the buffer cache won't
1085  * work as there may be pages backing this file which are still marked
1086  * up to date. We know things can't change on this file underneath us
1087  * as we have the lock by now :)
1088  */
1089 static int ocfs2_force_read_journal(struct inode *inode)
1090 {
1091  int status = 0;
1092  int i;
1093  u64 v_blkno, p_blkno, p_blocks, num_blocks;
1094 #define CONCURRENT_JOURNAL_FILL 32ULL
1095  struct buffer_head *bhs[CONCURRENT_JOURNAL_FILL];
1096 
1097  memset(bhs, 0, sizeof(struct buffer_head *) * CONCURRENT_JOURNAL_FILL);
1098 
1099  num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, inode->i_size);
1100  v_blkno = 0;
1101  while (v_blkno < num_blocks) {
1102  status = ocfs2_extent_map_get_blocks(inode, v_blkno,
1103  &p_blkno, &p_blocks, NULL);
1104  if (status < 0) {
1105  mlog_errno(status);
1106  goto bail;
1107  }
1108 
1109  if (p_blocks > CONCURRENT_JOURNAL_FILL)
1110  p_blocks = CONCURRENT_JOURNAL_FILL;
1111 
1112  /* We are reading journal data which should not
1113  * be put in the uptodate cache */
1114  status = ocfs2_read_blocks_sync(OCFS2_SB(inode->i_sb),
1115  p_blkno, p_blocks, bhs);
1116  if (status < 0) {
1117  mlog_errno(status);
1118  goto bail;
1119  }
1120 
1121  for(i = 0; i < p_blocks; i++) {
1122  brelse(bhs[i]);
1123  bhs[i] = NULL;
1124  }
1125 
1126  v_blkno += p_blocks;
1127  }
1128 
1129 bail:
1130  for(i = 0; i < CONCURRENT_JOURNAL_FILL; i++)
1131  brelse(bhs[i]);
1132  return status;
1133 }
1134 
1141 };
1142 
1143 /* Does the second half of the recovery process. By this point, the
1144  * node is marked clean and can actually be considered recovered,
1145  * hence it's no longer in the recovery map, but there's still some
1146  * cleanup we can do which shouldn't happen within the recovery thread
1147  * as locking in that context becomes very difficult if we are to take
1148  * recovering nodes into account.
1149  *
1150  * NOTE: This function can and will sleep on recovery of other nodes
1151  * during cluster locking, just like any other ocfs2 process.
1152  */
1154 {
1155  int ret = 0;
1156  struct ocfs2_journal *journal =
1158  struct ocfs2_super *osb = journal->j_osb;
1159  struct ocfs2_dinode *la_dinode, *tl_dinode;
1160  struct ocfs2_la_recovery_item *item, *n;
1161  struct ocfs2_quota_recovery *qrec;
1162  LIST_HEAD(tmp_la_list);
1163 
1164  trace_ocfs2_complete_recovery(
1165  (unsigned long long)OCFS2_I(journal->j_inode)->ip_blkno);
1166 
1167  spin_lock(&journal->j_lock);
1168  list_splice_init(&journal->j_la_cleanups, &tmp_la_list);
1169  spin_unlock(&journal->j_lock);
1170 
1171  list_for_each_entry_safe(item, n, &tmp_la_list, lri_list) {
1172  list_del_init(&item->lri_list);
1173 
1174  ocfs2_wait_on_quotas(osb);
1175 
1176  la_dinode = item->lri_la_dinode;
1177  tl_dinode = item->lri_tl_dinode;
1178  qrec = item->lri_qrec;
1179 
1180  trace_ocfs2_complete_recovery_slot(item->lri_slot,
1181  la_dinode ? le64_to_cpu(la_dinode->i_blkno) : 0,
1182  tl_dinode ? le64_to_cpu(tl_dinode->i_blkno) : 0,
1183  qrec);
1184 
1185  if (la_dinode) {
1187  la_dinode);
1188  if (ret < 0)
1189  mlog_errno(ret);
1190 
1191  kfree(la_dinode);
1192  }
1193 
1194  if (tl_dinode) {
1196  tl_dinode);
1197  if (ret < 0)
1198  mlog_errno(ret);
1199 
1200  kfree(tl_dinode);
1201  }
1202 
1203  ret = ocfs2_recover_orphans(osb, item->lri_slot);
1204  if (ret < 0)
1205  mlog_errno(ret);
1206 
1207  if (qrec) {
1208  ret = ocfs2_finish_quota_recovery(osb, qrec,
1209  item->lri_slot);
1210  if (ret < 0)
1211  mlog_errno(ret);
1212  /* Recovery info is already freed now */
1213  }
1214 
1215  kfree(item);
1216  }
1217 
1218  trace_ocfs2_complete_recovery_end(ret);
1219 }
1220 
1221 /* NOTE: This function always eats your references to la_dinode and
1222  * tl_dinode, either manually on error, or by passing them to
1223  * ocfs2_complete_recovery */
1224 static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
1225  int slot_num,
1226  struct ocfs2_dinode *la_dinode,
1227  struct ocfs2_dinode *tl_dinode,
1228  struct ocfs2_quota_recovery *qrec)
1229 {
1230  struct ocfs2_la_recovery_item *item;
1231 
1232  item = kmalloc(sizeof(struct ocfs2_la_recovery_item), GFP_NOFS);
1233  if (!item) {
1234  /* Though we wish to avoid it, we are in fact safe in
1235  * skipping local alloc cleanup as fsck.ocfs2 is more
1236  * than capable of reclaiming unused space. */
1237  if (la_dinode)
1238  kfree(la_dinode);
1239 
1240  if (tl_dinode)
1241  kfree(tl_dinode);
1242 
1243  if (qrec)
1245 
1246  mlog_errno(-ENOMEM);
1247  return;
1248  }
1249 
1250  INIT_LIST_HEAD(&item->lri_list);
1251  item->lri_la_dinode = la_dinode;
1252  item->lri_slot = slot_num;
1253  item->lri_tl_dinode = tl_dinode;
1254  item->lri_qrec = qrec;
1255 
1256  spin_lock(&journal->j_lock);
1257  list_add_tail(&item->lri_list, &journal->j_la_cleanups);
1258  queue_work(ocfs2_wq, &journal->j_recovery_work);
1259  spin_unlock(&journal->j_lock);
1260 }
1261 
1262 /* Called by the mount code to queue recovery the last part of
1263  * recovery for it's own and offline slot(s). */
1265 {
1266  struct ocfs2_journal *journal = osb->journal;
1267 
1268  if (ocfs2_is_hard_readonly(osb))
1269  return;
1270 
1271  /* No need to queue up our truncate_log as regular cleanup will catch
1272  * that */
1273  ocfs2_queue_recovery_completion(journal, osb->slot_num,
1274  osb->local_alloc_copy, NULL, NULL);
1276 
1277  osb->local_alloc_copy = NULL;
1278  osb->dirty = 0;
1279 
1280  /* queue to recover orphan slots for all offline slots */
1284 }
1285 
1287 {
1288  if (osb->quota_rec) {
1289  ocfs2_queue_recovery_completion(osb->journal,
1290  osb->slot_num,
1291  NULL,
1292  NULL,
1293  osb->quota_rec);
1294  osb->quota_rec = NULL;
1295  }
1296 }
1297 
1298 static int __ocfs2_recovery_thread(void *arg)
1299 {
1300  int status, node_num, slot_num;
1301  struct ocfs2_super *osb = arg;
1302  struct ocfs2_recovery_map *rm = osb->recovery_map;
1303  int *rm_quota = NULL;
1304  int rm_quota_used = 0, i;
1305  struct ocfs2_quota_recovery *qrec;
1306 
1307  status = ocfs2_wait_on_mount(osb);
1308  if (status < 0) {
1309  goto bail;
1310  }
1311 
1312  rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS);
1313  if (!rm_quota) {
1314  status = -ENOMEM;
1315  goto bail;
1316  }
1317 restart:
1318  status = ocfs2_super_lock(osb, 1);
1319  if (status < 0) {
1320  mlog_errno(status);
1321  goto bail;
1322  }
1323 
1324  status = ocfs2_compute_replay_slots(osb);
1325  if (status < 0)
1326  mlog_errno(status);
1327 
1328  /* queue recovery for our own slot */
1329  ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
1330  NULL, NULL);
1331 
1332  spin_lock(&osb->osb_lock);
1333  while (rm->rm_used) {
1334  /* It's always safe to remove entry zero, as we won't
1335  * clear it until ocfs2_recover_node() has succeeded. */
1336  node_num = rm->rm_entries[0];
1337  spin_unlock(&osb->osb_lock);
1338  slot_num = ocfs2_node_num_to_slot(osb, node_num);
1339  trace_ocfs2_recovery_thread_node(node_num, slot_num);
1340  if (slot_num == -ENOENT) {
1341  status = 0;
1342  goto skip_recovery;
1343  }
1344 
1345  /* It is a bit subtle with quota recovery. We cannot do it
1346  * immediately because we have to obtain cluster locks from
1347  * quota files and we also don't want to just skip it because
1348  * then quota usage would be out of sync until some node takes
1349  * the slot. So we remember which nodes need quota recovery
1350  * and when everything else is done, we recover quotas. */
1351  for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
1352  if (i == rm_quota_used)
1353  rm_quota[rm_quota_used++] = slot_num;
1354 
1355  status = ocfs2_recover_node(osb, node_num, slot_num);
1356 skip_recovery:
1357  if (!status) {
1358  ocfs2_recovery_map_clear(osb, node_num);
1359  } else {
1360  mlog(ML_ERROR,
1361  "Error %d recovering node %d on device (%u,%u)!\n",
1362  status, node_num,
1363  MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
1364  mlog(ML_ERROR, "Volume requires unmount.\n");
1365  }
1366 
1367  spin_lock(&osb->osb_lock);
1368  }
1369  spin_unlock(&osb->osb_lock);
1370  trace_ocfs2_recovery_thread_end(status);
1371 
1372  /* Refresh all journal recovery generations from disk */
1373  status = ocfs2_check_journals_nolocks(osb);
1374  status = (status == -EROFS) ? 0 : status;
1375  if (status < 0)
1376  mlog_errno(status);
1377 
1378  /* Now it is right time to recover quotas... We have to do this under
1379  * superblock lock so that no one can start using the slot (and crash)
1380  * before we recover it */
1381  for (i = 0; i < rm_quota_used; i++) {
1382  qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
1383  if (IS_ERR(qrec)) {
1384  status = PTR_ERR(qrec);
1385  mlog_errno(status);
1386  continue;
1387  }
1388  ocfs2_queue_recovery_completion(osb->journal, rm_quota[i],
1389  NULL, NULL, qrec);
1390  }
1391 
1392  ocfs2_super_unlock(osb, 1);
1393 
1394  /* queue recovery for offline slots */
1396 
1397 bail:
1398  mutex_lock(&osb->recovery_lock);
1399  if (!status && !ocfs2_recovery_completed(osb)) {
1400  mutex_unlock(&osb->recovery_lock);
1401  goto restart;
1402  }
1403 
1405  osb->recovery_thread_task = NULL;
1406  mb(); /* sync with ocfs2_recovery_thread_running */
1407  wake_up(&osb->recovery_event);
1408 
1409  mutex_unlock(&osb->recovery_lock);
1410 
1411  if (rm_quota)
1412  kfree(rm_quota);
1413 
1414  /* no one is callint kthread_stop() for us so the kthread() api
1415  * requires that we call do_exit(). And it isn't exported, but
1416  * complete_and_exit() seems to be a minimal wrapper around it. */
1417  complete_and_exit(NULL, status);
1418  return status;
1419 }
1420 
1421 void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
1422 {
1423  mutex_lock(&osb->recovery_lock);
1424 
1425  trace_ocfs2_recovery_thread(node_num, osb->node_num,
1427  osb->disable_recovery ?
1428  -1 : ocfs2_recovery_map_set(osb, node_num));
1429 
1430  if (osb->disable_recovery)
1431  goto out;
1432 
1433  if (osb->recovery_thread_task)
1434  goto out;
1435 
1436  osb->recovery_thread_task = kthread_run(__ocfs2_recovery_thread, osb,
1437  "ocfs2rec");
1438  if (IS_ERR(osb->recovery_thread_task)) {
1439  mlog_errno((int)PTR_ERR(osb->recovery_thread_task));
1440  osb->recovery_thread_task = NULL;
1441  }
1442 
1443 out:
1444  mutex_unlock(&osb->recovery_lock);
1445  wake_up(&osb->recovery_event);
1446 }
1447 
1448 static int ocfs2_read_journal_inode(struct ocfs2_super *osb,
1449  int slot_num,
1450  struct buffer_head **bh,
1451  struct inode **ret_inode)
1452 {
1453  int status = -EACCES;
1454  struct inode *inode = NULL;
1455 
1456  BUG_ON(slot_num >= osb->max_slots);
1457 
1459  slot_num);
1460  if (!inode || is_bad_inode(inode)) {
1461  mlog_errno(status);
1462  goto bail;
1463  }
1464  SET_INODE_JOURNAL(inode);
1465 
1467  if (status < 0) {
1468  mlog_errno(status);
1469  goto bail;
1470  }
1471 
1472  status = 0;
1473 
1474 bail:
1475  if (inode) {
1476  if (status || !ret_inode)
1477  iput(inode);
1478  else
1479  *ret_inode = inode;
1480  }
1481  return status;
1482 }
1483 
1484 /* Does the actual journal replay and marks the journal inode as
1485  * clean. Will only replay if the journal inode is marked dirty. */
1486 static int ocfs2_replay_journal(struct ocfs2_super *osb,
1487  int node_num,
1488  int slot_num)
1489 {
1490  int status;
1491  int got_lock = 0;
1492  unsigned int flags;
1493  struct inode *inode = NULL;
1494  struct ocfs2_dinode *fe;
1495  journal_t *journal = NULL;
1496  struct buffer_head *bh = NULL;
1497  u32 slot_reco_gen;
1498 
1499  status = ocfs2_read_journal_inode(osb, slot_num, &bh, &inode);
1500  if (status) {
1501  mlog_errno(status);
1502  goto done;
1503  }
1504 
1505  fe = (struct ocfs2_dinode *)bh->b_data;
1506  slot_reco_gen = ocfs2_get_recovery_generation(fe);
1507  brelse(bh);
1508  bh = NULL;
1509 
1510  /*
1511  * As the fs recovery is asynchronous, there is a small chance that
1512  * another node mounted (and recovered) the slot before the recovery
1513  * thread could get the lock. To handle that, we dirty read the journal
1514  * inode for that slot to get the recovery generation. If it is
1515  * different than what we expected, the slot has been recovered.
1516  * If not, it needs recovery.
1517  */
1518  if (osb->slot_recovery_generations[slot_num] != slot_reco_gen) {
1519  trace_ocfs2_replay_journal_recovered(slot_num,
1520  osb->slot_recovery_generations[slot_num], slot_reco_gen);
1521  osb->slot_recovery_generations[slot_num] = slot_reco_gen;
1522  status = -EBUSY;
1523  goto done;
1524  }
1525 
1526  /* Continue with recovery as the journal has not yet been recovered */
1527 
1528  status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY);
1529  if (status < 0) {
1530  trace_ocfs2_replay_journal_lock_err(status);
1531  if (status != -ERESTARTSYS)
1532  mlog(ML_ERROR, "Could not lock journal!\n");
1533  goto done;
1534  }
1535  got_lock = 1;
1536 
1537  fe = (struct ocfs2_dinode *) bh->b_data;
1538 
1539  flags = le32_to_cpu(fe->id1.journal1.ij_flags);
1540  slot_reco_gen = ocfs2_get_recovery_generation(fe);
1541 
1542  if (!(flags & OCFS2_JOURNAL_DIRTY_FL)) {
1543  trace_ocfs2_replay_journal_skip(node_num);
1544  /* Refresh recovery generation for the slot */
1545  osb->slot_recovery_generations[slot_num] = slot_reco_gen;
1546  goto done;
1547  }
1548 
1549  /* we need to run complete recovery for offline orphan slots */
1551 
1552  printk(KERN_NOTICE "ocfs2: Begin replay journal (node %d, slot %d) on "\
1553  "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev),
1554  MINOR(osb->sb->s_dev));
1555 
1556  OCFS2_I(inode)->ip_clusters = le32_to_cpu(fe->i_clusters);
1557 
1558  status = ocfs2_force_read_journal(inode);
1559  if (status < 0) {
1560  mlog_errno(status);
1561  goto done;
1562  }
1563 
1564  journal = jbd2_journal_init_inode(inode);
1565  if (journal == NULL) {
1566  mlog(ML_ERROR, "Linux journal layer error\n");
1567  status = -EIO;
1568  goto done;
1569  }
1570 
1571  status = jbd2_journal_load(journal);
1572  if (status < 0) {
1573  mlog_errno(status);
1574  if (!igrab(inode))
1575  BUG();
1576  jbd2_journal_destroy(journal);
1577  goto done;
1578  }
1579 
1580  ocfs2_clear_journal_error(osb->sb, journal, slot_num);
1581 
1582  /* wipe the journal */
1583  jbd2_journal_lock_updates(journal);
1584  status = jbd2_journal_flush(journal);
1585  jbd2_journal_unlock_updates(journal);
1586  if (status < 0)
1587  mlog_errno(status);
1588 
1589  /* This will mark the node clean */
1590  flags = le32_to_cpu(fe->id1.journal1.ij_flags);
1591  flags &= ~OCFS2_JOURNAL_DIRTY_FL;
1592  fe->id1.journal1.ij_flags = cpu_to_le32(flags);
1593 
1594  /* Increment recovery generation to indicate successful recovery */
1595  ocfs2_bump_recovery_generation(fe);
1596  osb->slot_recovery_generations[slot_num] =
1597  ocfs2_get_recovery_generation(fe);
1598 
1599  ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
1600  status = ocfs2_write_block(osb, bh, INODE_CACHE(inode));
1601  if (status < 0)
1602  mlog_errno(status);
1603 
1604  if (!igrab(inode))
1605  BUG();
1606 
1607  jbd2_journal_destroy(journal);
1608 
1609  printk(KERN_NOTICE "ocfs2: End replay journal (node %d, slot %d) on "\
1610  "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev),
1611  MINOR(osb->sb->s_dev));
1612 done:
1613  /* drop the lock on this nodes journal */
1614  if (got_lock)
1615  ocfs2_inode_unlock(inode, 1);
1616 
1617  if (inode)
1618  iput(inode);
1619 
1620  brelse(bh);
1621 
1622  return status;
1623 }
1624 
1625 /*
1626  * Do the most important parts of node recovery:
1627  * - Replay it's journal
1628  * - Stamp a clean local allocator file
1629  * - Stamp a clean truncate log
1630  * - Mark the node clean
1631  *
1632  * If this function completes without error, a node in OCFS2 can be
1633  * said to have been safely recovered. As a result, failure during the
1634  * second part of a nodes recovery process (local alloc recovery) is
1635  * far less concerning.
1636  */
1637 static int ocfs2_recover_node(struct ocfs2_super *osb,
1638  int node_num, int slot_num)
1639 {
1640  int status = 0;
1641  struct ocfs2_dinode *la_copy = NULL;
1642  struct ocfs2_dinode *tl_copy = NULL;
1643 
1644  trace_ocfs2_recover_node(node_num, slot_num, osb->node_num);
1645 
1646  /* Should not ever be called to recover ourselves -- in that
1647  * case we should've called ocfs2_journal_load instead. */
1648  BUG_ON(osb->node_num == node_num);
1649 
1650  status = ocfs2_replay_journal(osb, node_num, slot_num);
1651  if (status < 0) {
1652  if (status == -EBUSY) {
1653  trace_ocfs2_recover_node_skip(slot_num, node_num);
1654  status = 0;
1655  goto done;
1656  }
1657  mlog_errno(status);
1658  goto done;
1659  }
1660 
1661  /* Stamp a clean local alloc file AFTER recovering the journal... */
1662  status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
1663  if (status < 0) {
1664  mlog_errno(status);
1665  goto done;
1666  }
1667 
1668  /* An error from begin_truncate_log_recovery is not
1669  * serious enough to warrant halting the rest of
1670  * recovery. */
1671  status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
1672  if (status < 0)
1673  mlog_errno(status);
1674 
1675  /* Likewise, this would be a strange but ultimately not so
1676  * harmful place to get an error... */
1677  status = ocfs2_clear_slot(osb, slot_num);
1678  if (status < 0)
1679  mlog_errno(status);
1680 
1681  /* This will kfree the memory pointed to by la_copy and tl_copy */
1682  ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
1683  tl_copy, NULL);
1684 
1685  status = 0;
1686 done:
1687 
1688  return status;
1689 }
1690 
1691 /* Test node liveness by trylocking his journal. If we get the lock,
1692  * we drop it here. Return 0 if we got the lock, -EAGAIN if node is
1693  * still alive (we couldn't get the lock) and < 0 on error. */
1694 static int ocfs2_trylock_journal(struct ocfs2_super *osb,
1695  int slot_num)
1696 {
1697  int status, flags;
1698  struct inode *inode = NULL;
1699 
1701  slot_num);
1702  if (inode == NULL) {
1703  mlog(ML_ERROR, "access error\n");
1704  status = -EACCES;
1705  goto bail;
1706  }
1707  if (is_bad_inode(inode)) {
1708  mlog(ML_ERROR, "access error (bad inode)\n");
1709  iput(inode);
1710  inode = NULL;
1711  status = -EACCES;
1712  goto bail;
1713  }
1714  SET_INODE_JOURNAL(inode);
1715 
1717  status = ocfs2_inode_lock_full(inode, NULL, 1, flags);
1718  if (status < 0) {
1719  if (status != -EAGAIN)
1720  mlog_errno(status);
1721  goto bail;
1722  }
1723 
1724  ocfs2_inode_unlock(inode, 1);
1725 bail:
1726  if (inode)
1727  iput(inode);
1728 
1729  return status;
1730 }
1731 
1732 /* Call this underneath ocfs2_super_lock. It also assumes that the
1733  * slot info struct has been updated from disk. */
1735 {
1736  unsigned int node_num;
1737  int status, i;
1738  u32 gen;
1739  struct buffer_head *bh = NULL;
1740  struct ocfs2_dinode *di;
1741 
1742  /* This is called with the super block cluster lock, so we
1743  * know that the slot map can't change underneath us. */
1744 
1745  for (i = 0; i < osb->max_slots; i++) {
1746  /* Read journal inode to get the recovery generation */
1747  status = ocfs2_read_journal_inode(osb, i, &bh, NULL);
1748  if (status) {
1749  mlog_errno(status);
1750  goto bail;
1751  }
1752  di = (struct ocfs2_dinode *)bh->b_data;
1753  gen = ocfs2_get_recovery_generation(di);
1754  brelse(bh);
1755  bh = NULL;
1756 
1757  spin_lock(&osb->osb_lock);
1759 
1760  trace_ocfs2_mark_dead_nodes(i,
1761  osb->slot_recovery_generations[i]);
1762 
1763  if (i == osb->slot_num) {
1764  spin_unlock(&osb->osb_lock);
1765  continue;
1766  }
1767 
1768  status = ocfs2_slot_to_node_num_locked(osb, i, &node_num);
1769  if (status == -ENOENT) {
1770  spin_unlock(&osb->osb_lock);
1771  continue;
1772  }
1773 
1774  if (__ocfs2_recovery_map_test(osb, node_num)) {
1775  spin_unlock(&osb->osb_lock);
1776  continue;
1777  }
1778  spin_unlock(&osb->osb_lock);
1779 
1780  /* Ok, we have a slot occupied by another node which
1781  * is not in the recovery map. We trylock his journal
1782  * file here to test if he's alive. */
1783  status = ocfs2_trylock_journal(osb, i);
1784  if (!status) {
1785  /* Since we're called from mount, we know that
1786  * the recovery thread can't race us on
1787  * setting / checking the recovery bits. */
1788  ocfs2_recovery_thread(osb, node_num);
1789  } else if ((status < 0) && (status != -EAGAIN)) {
1790  mlog_errno(status);
1791  goto bail;
1792  }
1793  }
1794 
1795  status = 0;
1796 bail:
1797  return status;
1798 }
1799 
1800 /*
1801  * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some
1802  * randomness to the timeout to minimize multple nodes firing the timer at the
1803  * same time.
1804  */
1805 static inline unsigned long ocfs2_orphan_scan_timeout(void)
1806 {
1807  unsigned long time;
1808 
1809  get_random_bytes(&time, sizeof(time));
1810  time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000);
1811  return msecs_to_jiffies(time);
1812 }
1813 
1814 /*
1815  * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for
1816  * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This
1817  * is done to catch any orphans that are left over in orphan directories.
1818  *
1819  * It scans all slots, even ones that are in use. It does so to handle the
1820  * case described below:
1821  *
1822  * Node 1 has an inode it was using. The dentry went away due to memory
1823  * pressure. Node 1 closes the inode, but it's on the free list. The node
1824  * has the open lock.
1825  * Node 2 unlinks the inode. It grabs the dentry lock to notify others,
1826  * but node 1 has no dentry and doesn't get the message. It trylocks the
1827  * open lock, sees that another node has a PR, and does nothing.
1828  * Later node 2 runs its orphan dir. It igets the inode, trylocks the
1829  * open lock, sees the PR still, and does nothing.
1830  * Basically, we have to trigger an orphan iput on node 1. The only way
1831  * for this to happen is if node 1 runs node 2's orphan dir.
1832  *
1833  * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT
1834  * seconds. It gets an EX lock on os_lockres and checks sequence number
1835  * stored in LVB. If the sequence number has changed, it means some other
1836  * node has done the scan. This node skips the scan and tracks the
1837  * sequence number. If the sequence number didn't change, it means a scan
1838  * hasn't happened. The node queues a scan and increments the
1839  * sequence number in the LVB.
1840  */
1842 {
1843  struct ocfs2_orphan_scan *os;
1844  int status, i;
1845  u32 seqno = 0;
1846 
1847  os = &osb->osb_orphan_scan;
1848 
1850  goto out;
1851 
1852  trace_ocfs2_queue_orphan_scan_begin(os->os_count, os->os_seqno,
1853  atomic_read(&os->os_state));
1854 
1855  status = ocfs2_orphan_scan_lock(osb, &seqno);
1856  if (status < 0) {
1857  if (status != -EAGAIN)
1858  mlog_errno(status);
1859  goto out;
1860  }
1861 
1862  /* Do no queue the tasks if the volume is being umounted */
1864  goto unlock;
1865 
1866  if (os->os_seqno != seqno) {
1867  os->os_seqno = seqno;
1868  goto unlock;
1869  }
1870 
1871  for (i = 0; i < osb->max_slots; i++)
1872  ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL,
1873  NULL);
1874  /*
1875  * We queued a recovery on orphan slots, increment the sequence
1876  * number and update LVB so other node will skip the scan for a while
1877  */
1878  seqno++;
1879  os->os_count++;
1880  os->os_scantime = CURRENT_TIME;
1881 unlock:
1882  ocfs2_orphan_scan_unlock(osb, seqno);
1883 out:
1884  trace_ocfs2_queue_orphan_scan_end(os->os_count, os->os_seqno,
1885  atomic_read(&os->os_state));
1886  return;
1887 }
1888 
1889 /* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */
1891 {
1892  struct ocfs2_orphan_scan *os;
1893  struct ocfs2_super *osb;
1894 
1895  os = container_of(work, struct ocfs2_orphan_scan,
1896  os_orphan_scan_work.work);
1897  osb = os->os_osb;
1898 
1899  mutex_lock(&os->os_lock);
1903  ocfs2_orphan_scan_timeout());
1904  mutex_unlock(&os->os_lock);
1905 }
1906 
1908 {
1909  struct ocfs2_orphan_scan *os;
1910 
1911  os = &osb->osb_orphan_scan;
1912  if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) {
1914  mutex_lock(&os->os_lock);
1916  mutex_unlock(&os->os_lock);
1917  }
1918 }
1919 
1921 {
1922  struct ocfs2_orphan_scan *os;
1923 
1924  os = &osb->osb_orphan_scan;
1925  os->os_osb = osb;
1926  os->os_count = 0;
1927  os->os_seqno = 0;
1928  mutex_init(&os->os_lock);
1930 }
1931 
1933 {
1934  struct ocfs2_orphan_scan *os;
1935 
1936  os = &osb->osb_orphan_scan;
1937  os->os_scantime = CURRENT_TIME;
1938  if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
1940  else {
1943  ocfs2_orphan_scan_timeout());
1944  }
1945 }
1946 
1948  struct inode *head;
1949  struct ocfs2_super *osb;
1950 };
1951 
1952 static int ocfs2_orphan_filldir(void *priv, const char *name, int name_len,
1953  loff_t pos, u64 ino, unsigned type)
1954 {
1955  struct ocfs2_orphan_filldir_priv *p = priv;
1956  struct inode *iter;
1957 
1958  if (name_len == 1 && !strncmp(".", name, 1))
1959  return 0;
1960  if (name_len == 2 && !strncmp("..", name, 2))
1961  return 0;
1962 
1963  /* Skip bad inodes so that recovery can continue */
1964  iter = ocfs2_iget(p->osb, ino,
1966  if (IS_ERR(iter))
1967  return 0;
1968 
1969  trace_ocfs2_orphan_filldir((unsigned long long)OCFS2_I(iter)->ip_blkno);
1970  /* No locking is required for the next_orphan queue as there
1971  * is only ever a single process doing orphan recovery. */
1972  OCFS2_I(iter)->ip_next_orphan = p->head;
1973  p->head = iter;
1974 
1975  return 0;
1976 }
1977 
1978 static int ocfs2_queue_orphans(struct ocfs2_super *osb,
1979  int slot,
1980  struct inode **head)
1981 {
1982  int status;
1983  struct inode *orphan_dir_inode = NULL;
1985  loff_t pos = 0;
1986 
1987  priv.osb = osb;
1988  priv.head = *head;
1989 
1990  orphan_dir_inode = ocfs2_get_system_file_inode(osb,
1992  slot);
1993  if (!orphan_dir_inode) {
1994  status = -ENOENT;
1995  mlog_errno(status);
1996  return status;
1997  }
1998 
1999  mutex_lock(&orphan_dir_inode->i_mutex);
2000  status = ocfs2_inode_lock(orphan_dir_inode, NULL, 0);
2001  if (status < 0) {
2002  mlog_errno(status);
2003  goto out;
2004  }
2005 
2006  status = ocfs2_dir_foreach(orphan_dir_inode, &pos, &priv,
2007  ocfs2_orphan_filldir);
2008  if (status) {
2009  mlog_errno(status);
2010  goto out_cluster;
2011  }
2012 
2013  *head = priv.head;
2014 
2015 out_cluster:
2016  ocfs2_inode_unlock(orphan_dir_inode, 0);
2017 out:
2018  mutex_unlock(&orphan_dir_inode->i_mutex);
2019  iput(orphan_dir_inode);
2020  return status;
2021 }
2022 
2023 static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb,
2024  int slot)
2025 {
2026  int ret;
2027 
2028  spin_lock(&osb->osb_lock);
2029  ret = !osb->osb_orphan_wipes[slot];
2030  spin_unlock(&osb->osb_lock);
2031  return ret;
2032 }
2033 
2034 static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb,
2035  int slot)
2036 {
2037  spin_lock(&osb->osb_lock);
2038  /* Mark ourselves such that new processes in delete_inode()
2039  * know to quit early. */
2041  while (osb->osb_orphan_wipes[slot]) {
2042  /* If any processes are already in the middle of an
2043  * orphan wipe on this dir, then we need to wait for
2044  * them. */
2045  spin_unlock(&osb->osb_lock);
2047  ocfs2_orphan_recovery_can_continue(osb, slot));
2048  spin_lock(&osb->osb_lock);
2049  }
2050  spin_unlock(&osb->osb_lock);
2051 }
2052 
2053 static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb,
2054  int slot)
2055 {
2057 }
2058 
2059 /*
2060  * Orphan recovery. Each mounted node has it's own orphan dir which we
2061  * must run during recovery. Our strategy here is to build a list of
2062  * the inodes in the orphan dir and iget/iput them. The VFS does
2063  * (most) of the rest of the work.
2064  *
2065  * Orphan recovery can happen at any time, not just mount so we have a
2066  * couple of extra considerations.
2067  *
2068  * - We grab as many inodes as we can under the orphan dir lock -
2069  * doing iget() outside the orphan dir risks getting a reference on
2070  * an invalid inode.
2071  * - We must be sure not to deadlock with other processes on the
2072  * system wanting to run delete_inode(). This can happen when they go
2073  * to lock the orphan dir and the orphan recovery process attempts to
2074  * iget() inside the orphan dir lock. This can be avoided by
2075  * advertising our state to ocfs2_delete_inode().
2076  */
2077 static int ocfs2_recover_orphans(struct ocfs2_super *osb,
2078  int slot)
2079 {
2080  int ret = 0;
2081  struct inode *inode = NULL;
2082  struct inode *iter;
2083  struct ocfs2_inode_info *oi;
2084 
2085  trace_ocfs2_recover_orphans(slot);
2086 
2087  ocfs2_mark_recovering_orphan_dir(osb, slot);
2088  ret = ocfs2_queue_orphans(osb, slot, &inode);
2089  ocfs2_clear_recovering_orphan_dir(osb, slot);
2090 
2091  /* Error here should be noted, but we want to continue with as
2092  * many queued inodes as we've got. */
2093  if (ret)
2094  mlog_errno(ret);
2095 
2096  while (inode) {
2097  oi = OCFS2_I(inode);
2098  trace_ocfs2_recover_orphans_iput(
2099  (unsigned long long)oi->ip_blkno);
2100 
2101  iter = oi->ip_next_orphan;
2102 
2103  spin_lock(&oi->ip_lock);
2104  /* The remote delete code may have set these on the
2105  * assumption that the other node would wipe them
2106  * successfully. If they are still in the node's
2107  * orphan dir, we need to reset that state. */
2109 
2110  /* Set the proper information to get us going into
2111  * ocfs2_delete_inode. */
2113  spin_unlock(&oi->ip_lock);
2114 
2115  iput(inode);
2116 
2117  inode = iter;
2118  }
2119 
2120  return ret;
2121 }
2122 
2123 static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota)
2124 {
2125  /* This check is good because ocfs2 will wait on our recovery
2126  * thread before changing it to something other than MOUNTED
2127  * or DISABLED. */
2129  (!quota && atomic_read(&osb->vol_state) == VOLUME_MOUNTED) ||
2132 
2133  /* If there's an error on mount, then we may never get to the
2134  * MOUNTED flag, but this is set right before
2135  * dismount_volume() so we can trust it. */
2136  if (atomic_read(&osb->vol_state) == VOLUME_DISABLED) {
2137  trace_ocfs2_wait_on_mount(VOLUME_DISABLED);
2138  mlog(0, "mount error, exiting!\n");
2139  return -EBUSY;
2140  }
2141 
2142  return 0;
2143 }
2144 
2145 static int ocfs2_commit_thread(void *arg)
2146 {
2147  int status;
2148  struct ocfs2_super *osb = arg;
2149  struct ocfs2_journal *journal = osb->journal;
2150 
2151  /* we can trust j_num_trans here because _should_stop() is only set in
2152  * shutdown and nobody other than ourselves should be able to start
2153  * transactions. committing on shutdown might take a few iterations
2154  * as final transactions put deleted inodes on the list */
2155  while (!(kthread_should_stop() &&
2156  atomic_read(&journal->j_num_trans) == 0)) {
2157 
2159  atomic_read(&journal->j_num_trans)
2160  || kthread_should_stop());
2161 
2162  status = ocfs2_commit_cache(osb);
2163  if (status < 0)
2164  mlog_errno(status);
2165 
2166  if (kthread_should_stop() && atomic_read(&journal->j_num_trans)){
2167  mlog(ML_KTHREAD,
2168  "commit_thread: %u transactions pending on "
2169  "shutdown\n",
2170  atomic_read(&journal->j_num_trans));
2171  }
2172  }
2173 
2174  return 0;
2175 }
2176 
2177 /* Reads all the journal inodes without taking any cluster locks. Used
2178  * for hard readonly access to determine whether any journal requires
2179  * recovery. Also used to refresh the recovery generation numbers after
2180  * a journal has been recovered by another node.
2181  */
2183 {
2184  int ret = 0;
2185  unsigned int slot;
2186  struct buffer_head *di_bh = NULL;
2187  struct ocfs2_dinode *di;
2188  int journal_dirty = 0;
2189 
2190  for(slot = 0; slot < osb->max_slots; slot++) {
2191  ret = ocfs2_read_journal_inode(osb, slot, &di_bh, NULL);
2192  if (ret) {
2193  mlog_errno(ret);
2194  goto out;
2195  }
2196 
2197  di = (struct ocfs2_dinode *) di_bh->b_data;
2198 
2199  osb->slot_recovery_generations[slot] =
2200  ocfs2_get_recovery_generation(di);
2201 
2202  if (le32_to_cpu(di->id1.journal1.ij_flags) &
2204  journal_dirty = 1;
2205 
2206  brelse(di_bh);
2207  di_bh = NULL;
2208  }
2209 
2210 out:
2211  if (journal_dirty)
2212  ret = -EROFS;
2213  return ret;
2214 }