58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
85 static int send_remove(
struct dlm_rsb *
r);
88 static void __receive_convert_reply(
struct dlm_rsb *
r,
struct dlm_lkb *lkb,
91 static void do_purge(
struct dlm_ls *ls,
int nodeid,
int pid);
92 static void del_timeout(
struct dlm_lkb *lkb);
93 static void toss_rsb(
struct kref *
kref);
103 static const int __dlm_compat_matrix[8][8] = {
105 {1, 1, 1, 1, 1, 1, 1, 0},
106 {1, 1, 1, 1, 1, 1, 1, 0},
107 {1, 1, 1, 1, 1, 1, 0, 0},
108 {1, 1, 1, 1, 0, 0, 0, 0},
109 {1, 1, 1, 0, 1, 0, 0, 0},
110 {1, 1, 1, 0, 0, 0, 0, 0},
111 {1, 1, 0, 0, 0, 0, 0, 0},
112 {0, 0, 0, 0, 0, 0, 0, 0}
126 { -1, 1, 1, 1, 1, 1, 1, -1 },
127 { -1, 1, 1, 1, 1, 1, 1, 0 },
128 { -1, -1, 1, 1, 1, 1, 1, 0 },
129 { -1, -1, -1, 1, 1, 1, 1, 0 },
130 { -1, -1, -1, -1, 1, 1, 1, 0 },
131 { -1, 0, 0, 0, 0, 0, 1, 0 },
132 { -1, 0, 0, 0, 0, 0, 0, 0 },
133 { -1, 0, 0, 0, 0, 0, 0, 0 }
136 #define modes_compat(gr, rq) \
137 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
141 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
150 static const int __quecvt_compat_matrix[8][8] = {
152 {0, 0, 0, 0, 0, 0, 0, 0},
153 {0, 0, 1, 1, 1, 1, 1, 0},
154 {0, 0, 0, 1, 1, 1, 1, 0},
155 {0, 0, 0, 0, 1, 1, 1, 0},
156 {0, 0, 0, 1, 0, 1, 1, 0},
157 {0, 0, 0, 0, 0, 0, 1, 0},
158 {0, 0, 0, 0, 0, 0, 0, 0},
159 {0, 0, 0, 0, 0, 0, 0, 0}
164 printk(
KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
172 static void dlm_print_rsb(
struct dlm_rsb *
r)
174 printk(
KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
187 printk(
KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
205 static inline void dlm_lock_recovery(
struct dlm_ls *ls)
220 static inline int can_be_queued(
struct dlm_lkb *lkb)
225 static inline int force_blocking_asts(
struct dlm_lkb *lkb)
230 static inline int is_demoted(
struct dlm_lkb *lkb)
235 static inline int is_altmode(
struct dlm_lkb *lkb)
240 static inline int is_granted(
struct dlm_lkb *lkb)
245 static inline int is_remote(
struct dlm_rsb *
r)
251 static inline int is_process_copy(
struct dlm_lkb *lkb)
256 static inline int is_master_copy(
struct dlm_lkb *lkb)
261 static inline int middle_conversion(
struct dlm_lkb *lkb)
269 static inline int down_conversion(
struct dlm_lkb *lkb)
274 static inline int is_overlap_unlock(
struct dlm_lkb *lkb)
279 static inline int is_overlap_cancel(
struct dlm_lkb *lkb)
284 static inline int is_overlap(
struct dlm_lkb *lkb)
290 static void queue_cast(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
int rv)
292 if (is_master_copy(lkb))
314 static inline void queue_cast_overlap(
struct dlm_rsb *r,
struct dlm_lkb *lkb)
320 static void queue_bast(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
int rqmode)
322 if (is_master_copy(lkb)) {
323 send_bast(r, lkb, rqmode);
336 static inline void hold_rsb(
struct dlm_rsb *r)
349 static void put_rsb(
struct dlm_rsb *r)
355 kref_put(&r->
res_ref, toss_rsb);
356 spin_unlock(&ls->
ls_rsbtbl[bucket].lock);
364 static int pre_rsb_struct(
struct dlm_ls *ls)
400 static int get_rsb_struct(
struct dlm_ls *ls,
char *
name,
int len,
438 static int rsb_cmp(
struct dlm_rsb *r,
const char *name,
int nlen)
443 memcpy(maxname, name, nlen);
456 rc = rsb_cmp(r, name, len);
545 static int find_rsb_dir(
struct dlm_ls *ls,
char *name,
int len,
547 int dir_nodeid,
int from_nodeid,
559 if (from_nodeid == dir_nodeid)
582 if (from_local || from_dir ||
583 (from_other && (dir_nodeid == our_nodeid))) {
589 error = pre_rsb_struct(ls);
624 log_debug(ls,
"find_rsb toss from_other %d master %d dir %d %s",
633 log_error(ls,
"find_rsb toss from_dir %d master %d",
651 error = rsb_insert(r, &ls->
ls_rsbtbl[b].keep);
660 if (error == -
EBADR && !create)
663 error = get_rsb_struct(ls, name, len, &r);
678 log_debug(ls,
"find_rsb new from_dir %d recreate %s",
685 if (from_other && (dir_nodeid != our_nodeid)) {
687 log_error(ls,
"find_rsb new from_other %d dir %d our %d %s",
688 from_nodeid, dir_nodeid, our_nodeid, r->
res_name);
695 log_debug(ls,
"find_rsb new from_other %d dir %d %s",
696 from_nodeid, dir_nodeid, r->
res_name);
699 if (dir_nodeid == our_nodeid) {
711 error = rsb_insert(r, &ls->
ls_rsbtbl[b].keep);
723 static int find_rsb_nodir(
struct dlm_ls *ls,
char *name,
int len,
725 int dir_nodeid,
int from_nodeid,
726 unsigned int flags,
struct dlm_rsb **r_ret)
734 error = pre_rsb_struct(ls);
766 log_error(ls,
"find_rsb toss from_nodeid %d master %d dir %d",
774 (dir_nodeid == our_nodeid)) {
777 log_error(ls,
"find_rsb toss our %d master %d dir %d",
785 error = rsb_insert(r, &ls->
ls_rsbtbl[b].keep);
794 error = get_rsb_struct(ls, name, len, &r);
806 r->
res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
809 error = rsb_insert(r, &ls->
ls_rsbtbl[b].keep);
817 static int find_rsb(
struct dlm_ls *ls,
char *name,
int len,
int from_nodeid,
818 unsigned int flags,
struct dlm_rsb **r_ret)
826 hash = jhash(name, len, 0);
831 if (dlm_no_directory(ls))
832 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
833 from_nodeid, flags, r_ret);
835 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836 from_nodeid, flags, r_ret);
842 static int validate_master_nodeid(
struct dlm_ls *ls,
struct dlm_rsb *r,
845 if (dlm_no_directory(ls)) {
846 log_error(ls,
"find_rsb keep from_nodeid %d master %d dir %d",
859 log_debug(ls,
"validate master from_other %d master %d "
860 "dir %d first %x %s", from_nodeid,
870 log_error(ls,
"validate master from_dir %d master %d "
912 unsigned int flags,
int *r_nodeid,
int *
result)
919 int dir_nodeid,
error, toss_list = 0;
924 if (from_nodeid == our_nodeid) {
925 log_error(ls,
"dlm_master_lookup from our_nodeid %d flags %x",
930 hash = jhash(name, len, 0);
934 if (dir_nodeid != our_nodeid) {
935 log_error(ls,
"dlm_master_lookup from %d dir %d our %d h %x %d",
936 from_nodeid, dir_nodeid, our_nodeid, hash,
943 error = pre_rsb_struct(ls);
970 log_error(ls,
"dlm_master_lookup res_dir %d our %d %s",
987 log_error(ls,
"dlm_master_lookup fix_master on toss");
997 log_limit(ls,
"dlm_master_lookup from_master %d "
998 "master_nodeid %d res_nodeid %d first %x %s",
1003 log_error(ls,
"from_master %d our_master", from_nodeid);
1018 log_debug(ls,
"dlm_master_lookup master 0 to %d first %x %s",
1024 if (!from_master && !fix_master &&
1030 log_limit(ls,
"dlm_master_lookup from master %d flags %x "
1031 "first %x %s", from_nodeid, flags,
1052 error = get_rsb_struct(ls, name, len, &r);
1068 error = rsb_insert(r, &ls->
ls_rsbtbl[b].toss);
1078 *r_nodeid = from_nodeid;
1108 hash = jhash(name, len, 0);
1125 static void toss_rsb(
struct kref *
kref)
1143 static void unhold_rsb(
struct dlm_rsb *r)
1146 rv = kref_put(&r->
res_ref, toss_rsb);
1150 static void kill_rsb(
struct kref *kref)
1174 static void detach_lkb(
struct dlm_lkb *lkb)
1182 static int create_lkb(
struct dlm_ls *ls,
struct dlm_lkb **lkb_ret)
1216 log_error(ls,
"create_lkb idr error %d", rv);
1235 return lkb ? 0 : -
ENOENT;
1238 static void kill_lkb(
struct kref *kref)
1251 static int __put_lkb(
struct dlm_ls *ls,
struct dlm_lkb *lkb)
1256 if (kref_put(&lkb->
lkb_ref, kill_lkb)) {
1281 return __put_lkb(ls, lkb);
1287 static inline void hold_lkb(
struct dlm_lkb *lkb)
1297 static inline void unhold_lkb(
struct dlm_lkb *lkb)
1300 rv = kref_put(&lkb->
lkb_ref, kill_lkb);
1320 kref_get(&lkb->lkb_ref);
1326 lkb->lkb_status =
status;
1331 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1337 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1342 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1345 &r->res_convertqueue);
1363 add_lkb(r, lkb, sts);
1367 static int msg_reply_type(
int mstype)
1384 static int nodeid_warned(
int nodeid,
int num_nodes,
int *warned)
1393 if (warned[i] == nodeid)
1404 s64 debug_maxus = 0;
1405 u32 debug_scanned = 0;
1406 u32 debug_expired = 0;
1429 if (us > debug_maxus)
1434 warned = kzalloc(num_nodes *
sizeof(
int),
GFP_KERNEL);
1441 log_error(ls,
"waitwarn %x %lld %d us check connection to "
1442 "node %d", lkb->
lkb_id, (
long long)us,
1449 log_debug(ls,
"scan_waiters %u warn %u over %d us max %lld us",
1450 debug_scanned, debug_expired,
1451 dlm_config.ci_waitwarn_us, (
long long)debug_maxus);
1457 static int add_to_waiters(
struct dlm_lkb *lkb,
int mstype,
int to_nodeid)
1464 if (is_overlap_unlock(lkb) ||
1485 log_debug(ls,
"addwait %x cur %d overlap %d count %d f %x",
1503 log_error(ls,
"addwait error %x %d flags %x %d %d %s",
1515 static int _remove_from_waiters(
struct dlm_lkb *lkb,
int mstype,
1519 int overlap_done = 0;
1540 log_debug(ls,
"remwait %x cancel_reply wait_type %d",
1555 is_overlap_cancel(lkb) && ms && !ms->
m_result) {
1556 log_debug(ls,
"remwait %x convert_reply zap overlap_cancel",
1572 log_error(ls,
"remwait error %x remote %d %x msg %d flags %x no wait",
1584 log_error(ls,
"remwait error %x reply %d wait_type %d overlap",
1600 static int remove_from_waiters(
struct dlm_lkb *lkb,
int mstype)
1606 error = _remove_from_waiters(lkb, mstype,
NULL);
1621 error = _remove_from_waiters(lkb, ms->
m_type, ms);
1632 static void wait_pending_remove(
struct dlm_rsb *r)
1639 log_debug(ls,
"delay lookup for remove dir %d %s",
1655 static void shrink_bucket(
struct dlm_ls *ls,
int b)
1661 int remote_count = 0;
1676 if (!dlm_no_directory(ls) &&
1687 if (!dlm_no_directory(ls) &&
1705 if (!kref_put(&r->
res_ref, kill_rsb)) {
1729 for (i = 0; i < remote_count; i++) {
1737 log_debug(ls,
"remove_name not toss %s", name);
1743 log_debug(ls,
"remove_name master %d dir %d our %d %s",
1752 log_error(ls,
"remove_name dir %d master %d our %d %s",
1761 log_debug(ls,
"remove_name toss_time %lu now %lu %s",
1766 if (!kref_put(&r->
res_ref, kill_rsb)) {
1768 log_error(ls,
"remove_name in use %s", name);
1798 shrink_bucket(ls, i);
1799 if (dlm_locking_stopped(ls))
1805 static void add_timeout(
struct dlm_lkb *lkb)
1809 if (is_master_copy(lkb))
1829 static void del_timeout(
struct dlm_lkb *lkb)
1851 int do_cancel, do_warn;
1855 if (dlm_locking_stopped(ls))
1863 wait_us = ktime_to_us(ktime_sub(
ktime_get(),
1871 wait_us >=
dlm_config.ci_timewarn_cs * 10000)
1874 if (!do_cancel && !do_warn)
1881 if (!do_cancel && !do_warn)
1897 log_debug(ls,
"timeout cancel %x node %d %s",
1902 _cancel_lock(r, lkb);
1940 int b, len = r->
res_ls->ls_lvblen;
1961 }
else if (b == 0) {
1989 static void set_lvb_unlock(
struct dlm_rsb *r,
struct dlm_lkb *lkb)
2018 static void set_lvb_lock_pc(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
2031 int len = receive_extralen(ms);
2060 set_lvb_unlock(r, lkb);
2061 _remove_lock(r, lkb);
2064 static void remove_lock_pc(
struct dlm_rsb *r,
struct dlm_lkb *lkb)
2066 _remove_lock(r, lkb);
2100 static int revert_lock_pc(
struct dlm_rsb *r,
struct dlm_lkb *lkb)
2102 return revert_lock(r, lkb);
2121 set_lvb_lock(r, lkb);
2122 _grant_lock(r, lkb);
2125 static void grant_lock_pc(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
2128 set_lvb_lock_pc(r, lkb, ms);
2129 _grant_lock(r, lkb);
2136 static void grant_lock_pending(
struct dlm_rsb *r,
struct dlm_lkb *lkb)
2139 if (is_master_copy(lkb))
2142 queue_cast(r, lkb, 0);
2153 static void munge_demoted(
struct dlm_lkb *lkb)
2156 log_print(
"munge_demoted %x invalid modes gr %d rq %d",
2168 log_print(
"munge_altmode %x invalid reply type %d",
2183 static inline int first_in_list(
struct dlm_lkb *lkb,
struct list_head *head)
2249 static int conversion_deadlock_detect(
struct dlm_rsb *r,
struct dlm_lkb *lkb2)
2252 int lkb_is_ahead = 0;
2260 if (!lkb_is_ahead) {
2288 static int _can_be_granted(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
int now,
2346 if (conv && recover)
2434 static int can_be_granted(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
int now,
2435 int recover,
int *
err)
2444 rv = _can_be_granted(r, lkb, now, recover);
2454 if (is_convert && can_be_queued(lkb) &&
2455 conversion_deadlock_detect(r, lkb)) {
2463 log_print(
"can_be_granted deadlock %x now %d",
2485 rv = _can_be_granted(r, lkb, now, 0);
2505 static int grant_pending_convert(
struct dlm_rsb *r,
int high,
int *cw,
2506 unsigned int *count)
2510 int hi, demoted, quit, grant_restart, demote_restart;
2520 demoted = is_demoted(lkb);
2523 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2524 grant_lock_pending(r, lkb);
2531 if (!demoted && is_demoted(lkb)) {
2532 log_print(
"WARN: pending demoted %x node %d %s",
2539 log_print(
"WARN: pending deadlock %x node %d %s",
2553 if (demote_restart && !quit) {
2558 return max_t(
int, high, hi);
2561 static int grant_pending_wait(
struct dlm_rsb *r,
int high,
int *cw,
2562 unsigned int *count)
2567 if (can_be_granted(r, lkb, 0, 0,
NULL)) {
2568 grant_lock_pending(r, lkb);
2586 static int lock_requires_bast(
struct dlm_lkb *
gr,
int high,
int cw)
2595 !__dlm_compat_matrix[gr->
lkb_grmode+1][high+1])
2600 static void grant_pending_locks(
struct dlm_rsb *r,
unsigned int *count)
2606 if (!is_master(r)) {
2612 high = grant_pending_convert(r, high, &cw, count);
2613 high = grant_pending_wait(r, high, &cw, count);
2625 if (lkb->
lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2630 queue_bast(r, lkb, high);
2659 if (gr->
lkb_bastfn && modes_require_bast(gr, lkb)) {
2666 static void send_blocking_asts(
struct dlm_rsb *r,
struct dlm_lkb *lkb)
2671 static void send_blocking_asts_all(
struct dlm_rsb *r,
struct dlm_lkb *lkb)
2738 wait_pending_remove(r);
2741 send_lookup(r, lkb);
2745 static void process_lookup_list(
struct dlm_rsb *r)
2751 _request_lock(r, lkb);
2758 static void confirm_master(
struct dlm_rsb *r,
int error)
2769 process_lookup_list(r);
2786 _request_lock(r, lkb);
2796 int namelen,
unsigned long timeout_cs,
2797 void (*ast) (
void *astparam),
2799 void (*bast) (
void *astparam,
int mode),
2842 if (flags & DLM_LKF_CONVERT && !lksb->
sb_lkid)
2861 static int set_unlock_args(
uint32_t flags,
void *astarg,
struct dlm_args *args)
2875 static int validate_lock_args(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
2880 if (args->
flags & DLM_LKF_CONVERT) {
2884 if (args->
flags & DLM_LKF_QUECVT &&
2895 if (is_overlap(lkb))
2912 log_debug(ls,
"validate_lock_args %d %x %x %x %d %d %s",
2926 static int validate_unlock_args(
struct dlm_lkb *lkb,
struct dlm_args *args)
2955 args->
flags & DLM_LKF_CANCEL ?
2966 if (args->
flags & DLM_LKF_CANCEL) {
2970 if (is_overlap(lkb))
3011 if (is_overlap_unlock(lkb))
3049 log_debug(ls,
"validate_unlock_args %d %x %x %x %x %d %s", rv,
3067 if (can_be_granted(r, lkb, 1, 0,
NULL)) {
3069 queue_cast(r, lkb, 0);
3073 if (can_be_queued(lkb)) {
3081 queue_cast(r, lkb, -
EAGAIN);
3086 static void do_request_effects(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
3091 if (force_blocking_asts(lkb))
3092 send_blocking_asts_all(r, lkb);
3095 send_blocking_asts(r, lkb);
3107 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3109 queue_cast(r, lkb, 0);
3119 revert_lock(r, lkb);
3131 if (is_demoted(lkb)) {
3133 if (_can_be_granted(r, lkb, 1, 0)) {
3135 queue_cast(r, lkb, 0);
3141 if (can_be_queued(lkb)) {
3150 queue_cast(r, lkb, -
EAGAIN);
3155 static void do_convert_effects(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
3160 grant_pending_locks(r,
NULL);
3164 if (force_blocking_asts(lkb))
3165 send_blocking_asts_all(r, lkb);
3168 send_blocking_asts(r, lkb);
3175 remove_lock(r, lkb);
3180 static void do_unlock_effects(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
3183 grant_pending_locks(r,
NULL);
3192 error = revert_lock(r, lkb);
3200 static void do_cancel_effects(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
3204 grant_pending_locks(r,
NULL);
3220 error = set_master(r, lkb);
3230 error = send_request(r, lkb);
3232 error = do_request(r, lkb);
3235 do_request_effects(r, lkb, error);
3249 error = send_convert(r, lkb);
3251 error = do_convert(r, lkb);
3254 do_convert_effects(r, lkb, error);
3268 error = send_unlock(r, lkb);
3270 error = do_unlock(r, lkb);
3273 do_unlock_effects(r, lkb, error);
3287 error = send_cancel(r, lkb);
3289 error = do_cancel(r, lkb);
3292 do_cancel_effects(r, lkb, error);
3303 static int request_lock(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
char *name,
3309 error = validate_lock_args(ls, lkb, args);
3313 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3322 error = _request_lock(r, lkb);
3329 static int convert_lock(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
3340 error = validate_lock_args(ls, lkb, args);
3344 error = _convert_lock(r, lkb);
3351 static int unlock_lock(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
3362 error = validate_unlock_args(lkb, args);
3366 error = _unlock_lock(r, lkb);
3373 static int cancel_lock(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
3384 error = validate_unlock_args(lkb, args);
3388 error = _cancel_lock(r, lkb);
3404 unsigned int namelen,
3406 void (*ast) (
void *astarg),
3408 void (*bast) (
void *astarg,
int mode))
3419 dlm_lock_recovery(ls);
3422 error = find_lkb(ls, lksb->
sb_lkid, &lkb);
3424 error = create_lkb(ls, &lkb);
3429 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3430 astarg, bast, &args);
3435 error = convert_lock(ls, lkb, &args);
3437 error = request_lock(ls, lkb, name, namelen, &args);
3442 if (convert || error)
3467 dlm_lock_recovery(ls);
3469 error = find_lkb(ls, lkid, &lkb);
3473 error = set_unlock_args(flags, astarg, &args);
3477 if (flags & DLM_LKF_CANCEL)
3478 error = cancel_lock(ls, lkb, &args);
3480 error = unlock_lock(ls, lkb, &args);
3516 static int _create_message(
struct dlm_ls *ls,
int mb_len,
3517 int to_nodeid,
int mstype,
3519 struct dlm_mhandle **mh_ret)
3522 struct dlm_mhandle *mh;
3550 static int create_message(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
3551 int to_nodeid,
int mstype,
3553 struct dlm_mhandle **mh_ret)
3569 mb_len += r->
res_ls->ls_lvblen;
3573 return _create_message(r->
res_ls, mb_len, to_nodeid, mstype,
3580 static int send_message(
struct dlm_mhandle *mh,
struct dlm_message *ms)
3631 static int send_common(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
int mstype)
3634 struct dlm_mhandle *mh;
3635 int to_nodeid,
error;
3639 error = add_to_waiters(lkb, mstype, to_nodeid);
3643 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3647 send_args(r, lkb, ms);
3649 error = send_message(mh, ms);
3655 remove_from_waiters(lkb, msg_reply_type(mstype));
3671 if (!error && down_conversion(lkb)) {
3675 r->
res_ls->ls_stub_ms.m_result = 0;
3676 __receive_convert_reply(r, lkb, &r->
res_ls->ls_stub_ms);
3699 struct dlm_mhandle *mh;
3700 int to_nodeid,
error;
3704 error = create_message(r, lkb, to_nodeid,
DLM_MSG_GRANT, &ms, &mh);
3708 send_args(r, lkb, ms);
3712 error = send_message(mh, ms);
3717 static int send_bast(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
int mode)
3720 struct dlm_mhandle *mh;
3721 int to_nodeid,
error;
3729 send_args(r, lkb, ms);
3733 error = send_message(mh, ms);
3741 struct dlm_mhandle *mh;
3742 int to_nodeid,
error;
3754 send_args(r, lkb, ms);
3756 error = send_message(mh, ms);
3766 static int send_remove(
struct dlm_rsb *r)
3769 struct dlm_mhandle *mh;
3770 int to_nodeid,
error;
3781 error = send_message(mh, ms);
3786 static int send_common_reply(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
3790 struct dlm_mhandle *mh;
3791 int to_nodeid,
error;
3795 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3799 send_args(r, lkb, ms);
3803 error = send_message(mh, ms);
3808 static int send_request_reply(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
int rv)
3813 static int send_convert_reply(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
int rv)
3818 static int send_unlock_reply(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
int rv)
3823 static int send_cancel_reply(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
int rv)
3829 int ret_nodeid,
int rv)
3833 struct dlm_mhandle *mh;
3844 error = send_message(mh, ms);
3871 static int receive_extralen(
struct dlm_message *ms)
3876 static int receive_lvb(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
3886 len = receive_extralen(ms);
3894 static void fake_bastfn(
void *astparam,
int mode)
3896 log_print(
"fake_bastfn should not be called");
3899 static void fake_astfn(
void *astparam)
3901 log_print(
"fake_astfn should not be called");
3904 static int receive_request_args(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
3926 static int receive_convert_args(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
3932 if (receive_lvb(ls, lkb, ms))
3941 static int receive_unlock_args(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
3944 if (receive_lvb(ls, lkb, ms))
3971 if (!is_master_copy(lkb) || lkb->
lkb_nodeid != from)
3980 if (!is_process_copy(lkb) || lkb->
lkb_nodeid != from)
3985 if (!is_process_copy(lkb))
3997 "ignore invalid message %d from %d %x %x %x %d",
4003 static void send_repeat_remove(
struct dlm_ls *ls,
char *ms_name,
int len)
4007 struct dlm_mhandle *mh;
4012 memset(name, 0,
sizeof(name));
4013 memcpy(name, ms_name, len);
4015 hash = jhash(name, len, 0);
4020 log_error(ls,
"send_repeat_remove dir %d %s", dir_nodeid, name);
4026 log_error(ls,
"repeat_remove on keep %s", name);
4033 log_error(ls,
"repeat_remove on toss %s", name);
4045 rv = _create_message(ls,
sizeof(
struct dlm_message) + len,
4053 send_message(mh, ms);
4066 int error, namelen = 0;
4068 from_nodeid = ms->
m_header.h_nodeid;
4070 error = create_lkb(ls, &lkb);
4074 receive_flags(lkb, ms);
4076 error = receive_request_args(ls, lkb, ms);
4088 namelen = receive_extralen(ms);
4090 error = find_rsb(ls, ms->
m_extra, namelen, from_nodeid,
4091 R_RECEIVE_REQUEST, &r);
4100 error = validate_master_nodeid(ls, r, from_nodeid);
4110 error = do_request(r, lkb);
4111 send_request_reply(r, lkb, error);
4112 do_request_effects(r, lkb, error);
4140 log_limit(ls,
"receive_request %x from %d %d",
4141 ms->
m_lkid, from_nodeid, error);
4144 if (namelen && error == -
EBADR) {
4145 send_repeat_remove(ls, ms->
m_extra, namelen);
4149 setup_stub_lkb(ls, ms);
4158 int error, reply = 1;
4160 error = find_lkb(ls, ms->
m_remid, &lkb);
4165 log_error(ls,
"receive_convert %x remid %x recover_seq %llu "
4178 error = validate_message(lkb, ms);
4182 receive_flags(lkb, ms);
4184 error = receive_convert_args(ls, lkb, ms);
4186 send_convert_reply(r, lkb, error);
4190 reply = !down_conversion(lkb);
4192 error = do_convert(r, lkb);
4194 send_convert_reply(r, lkb, error);
4195 do_convert_effects(r, lkb, error);
4203 setup_stub_lkb(ls, ms);
4214 error = find_lkb(ls, ms->
m_remid, &lkb);
4219 log_error(ls,
"receive_unlock %x remid %x remote %d %x",
4231 error = validate_message(lkb, ms);
4235 receive_flags(lkb, ms);
4237 error = receive_unlock_args(ls, lkb, ms);
4239 send_unlock_reply(r, lkb, error);
4243 error = do_unlock(r, lkb);
4244 send_unlock_reply(r, lkb, error);
4245 do_unlock_effects(r, lkb, error);
4253 setup_stub_lkb(ls, ms);
4264 error = find_lkb(ls, ms->
m_remid, &lkb);
4268 receive_flags(lkb, ms);
4275 error = validate_message(lkb, ms);
4279 error = do_cancel(r, lkb);
4280 send_cancel_reply(r, lkb, error);
4281 do_cancel_effects(r, lkb, error);
4289 setup_stub_lkb(ls, ms);
4300 error = find_lkb(ls, ms->
m_remid, &lkb);
4309 error = validate_message(lkb, ms);
4313 receive_flags_reply(lkb, ms);
4314 if (is_altmode(lkb))
4315 munge_altmode(lkb, ms);
4316 grant_lock_pc(r, lkb, ms);
4317 queue_cast(r, lkb, 0);
4331 error = find_lkb(ls, ms->
m_remid, &lkb);
4340 error = validate_message(lkb, ms);
4355 int len,
error, ret_nodeid, from_nodeid, our_nodeid;
4357 from_nodeid = ms->
m_header.h_nodeid;
4360 len = receive_extralen(ms);
4366 if (!error && ret_nodeid == our_nodeid) {
4367 receive_request(ls, ms);
4370 send_lookup_reply(ls, ms, ret_nodeid, error);
4378 int rv, len, dir_nodeid, from_nodeid;
4380 from_nodeid = ms->
m_header.h_nodeid;
4382 len = receive_extralen(ms);
4385 log_error(ls,
"receive_remove from %d bad len %d",
4392 log_error(ls,
"receive_remove from %d bad nodeid %d",
4393 from_nodeid, dir_nodeid);
4406 memset(name, 0,
sizeof(name));
4409 hash = jhash(name, len, 0);
4420 log_error(ls,
"receive_remove from %d not found %s",
4427 log_error(ls,
"receive_remove keep from %d master %d",
4434 log_debug(ls,
"receive_remove from %d master %d first %x %s",
4442 log_error(ls,
"receive_remove toss from %d master %d",
4449 if (kref_put(&r->
res_ref, kill_rsb)) {
4454 log_error(ls,
"receive_remove from %d rsb ref error",
4471 int from_nodeid = ms->
m_header.h_nodeid;
4473 error = find_lkb(ls, ms->
m_remid, &lkb);
4481 error = validate_message(lkb, ms);
4488 log_error(ls,
"receive_request_reply %x remote %d %x result %d",
4508 queue_cast(r, lkb, -
EAGAIN);
4509 confirm_master(r, -
EAGAIN);
4516 receive_flags_reply(lkb, ms);
4518 if (is_altmode(lkb))
4519 munge_altmode(lkb, ms);
4524 grant_lock_pc(r, lkb, ms);
4525 queue_cast(r, lkb, 0);
4527 confirm_master(r, result);
4533 log_limit(ls,
"receive_request_reply %x from %d %d "
4534 "master %d dir %d first %x %s", lkb->
lkb_id,
4546 if (is_overlap(lkb)) {
4548 queue_cast_overlap(r, lkb);
4549 confirm_master(r, result);
4552 _request_lock(r, lkb);
4555 confirm_master(r, 0);
4560 log_error(ls,
"receive_request_reply %x error %d",
4564 if (is_overlap_unlock(lkb) && (result == 0 || result == -
EINPROGRESS)) {
4565 log_debug(ls,
"receive_request_reply %x result %d unlock",
4569 send_unlock(r, lkb);
4570 }
else if (is_overlap_cancel(lkb) && (result == -
EINPROGRESS)) {
4574 send_cancel(r, lkb);
4586 static void __receive_convert_reply(
struct dlm_rsb *r,
struct dlm_lkb *lkb,
4593 queue_cast(r, lkb, -
EAGAIN);
4597 receive_flags_reply(lkb, ms);
4598 revert_lock_pc(r, lkb);
4604 receive_flags_reply(lkb, ms);
4605 if (is_demoted(lkb))
4614 receive_flags_reply(lkb, ms);
4615 if (is_demoted(lkb))
4617 grant_lock_pc(r, lkb, ms);
4618 queue_cast(r, lkb, 0);
4638 error = validate_message(lkb, ms);
4643 error = remove_from_waiters_ms(lkb, ms);
4647 __receive_convert_reply(r, lkb, ms);
4658 error = find_lkb(ls, ms->
m_remid, &lkb);
4662 _receive_convert_reply(lkb, ms);
4675 error = validate_message(lkb, ms);
4680 error = remove_from_waiters_ms(lkb, ms);
4688 receive_flags_reply(lkb, ms);
4689 remove_lock_pc(r, lkb);
4708 error = find_lkb(ls, ms->
m_remid, &lkb);
4712 _receive_unlock_reply(lkb, ms);
4725 error = validate_message(lkb, ms);
4730 error = remove_from_waiters_ms(lkb, ms);
4738 receive_flags_reply(lkb, ms);
4739 revert_lock_pc(r, lkb);
4758 error = find_lkb(ls, ms->
m_remid, &lkb);
4762 _receive_cancel_reply(lkb, ms);
4771 int error, ret_nodeid;
4772 int do_lookup_list = 0;
4774 error = find_lkb(ls, ms->
m_lkid, &lkb);
4801 log_error(ls,
"receive_lookup_reply %x from %d ret %d "
4802 "master %d dir %d our %d first %x %s",
4813 }
else if (ret_nodeid == -1) {
4815 log_error(ls,
"receive_lookup_reply %x from %d bad ret_nodeid",
4826 if (is_overlap(lkb)) {
4827 log_debug(ls,
"receive_lookup_reply %x unlock %x",
4829 queue_cast_overlap(r, lkb);
4834 _request_lock(r, lkb);
4838 process_lookup_list(r);
4848 int error = 0, noent = 0;
4851 log_limit(ls,
"receive %d from non-member %d %x %x %d",
4862 error = receive_request(ls, ms);
4866 error = receive_convert(ls, ms);
4870 error = receive_unlock(ls, ms);
4875 error = receive_cancel(ls, ms);
4881 error = receive_request_reply(ls, ms);
4885 error = receive_convert_reply(ls, ms);
4889 error = receive_unlock_reply(ls, ms);
4893 error = receive_cancel_reply(ls, ms);
4900 error = receive_grant(ls, ms);
4905 error = receive_bast(ls, ms);
4911 receive_lookup(ls, ms);
4915 receive_remove(ls, ms);
4921 receive_lookup_reply(ls, ms);
4927 receive_purge(ls, ms);
4945 if (error == -
ENOENT && noent) {
4946 log_debug(ls,
"receive %d no %x remote %d %x saved_seq %u",
4949 }
else if (error == -
ENOENT) {
4950 log_error(ls,
"receive %d no %x remote %d %x saved_seq %u",
4955 dlm_dump_rsb_hash(ls, ms->
m_hash);
4959 log_error(ls,
"receive %d inval from %d lkid %x remid %x "
4977 if (dlm_locking_stopped(ls)) {
4982 log_limit(ls,
"receive %d from %d ignore old gen",
4990 _receive_message(ls, ms, 0);
5000 _receive_message(ls, ms, saved_seq);
5014 switch (hd->
h_cmd) {
5021 type = p->
rcom.rc_type;
5029 log_print(
"invalid h_nodeid %d from %d lockspace %x",
5038 "%u from %d cmd %d type %d\n",
5052 dlm_receive_message(ls, &p->
message, nodeid);
5060 static void recover_convert_waiter(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
5063 if (middle_conversion(lkb)) {
5070 _receive_convert_reply(lkb, ms_stub);
5088 static int waiter_needs_recovery(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
5091 if (dlm_no_directory(ls))
5110 int wait_type, stub_unlock_result, stub_cancel_result;
5115 log_error(ls,
"dlm_recover_waiters_pre no mem");
5129 log_debug(ls,
"waiter %x remote %x msg %d r_nodeid %d "
5130 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5148 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5161 if (is_overlap_cancel(lkb)) {
5164 stub_cancel_result = 0;
5166 if (is_overlap_unlock(lkb)) {
5169 stub_unlock_result = -
ENOENT;
5172 log_debug(ls,
"rwpre overlap %x %x %d %d %d",
5174 stub_cancel_result, stub_unlock_result);
5177 switch (wait_type) {
5184 recover_convert_waiter(ls, lkb, ms_stub);
5192 ms_stub->
m_result = stub_unlock_result;
5194 _receive_unlock_reply(lkb, ms_stub);
5203 ms_stub->
m_result = stub_cancel_result;
5205 _receive_cancel_reply(lkb, ms_stub);
5210 log_error(ls,
"invalid lkb wait_type %d %d",
5219 static struct dlm_lkb *find_resend_waiter(
struct dlm_ls *ls)
5259 int error = 0, mstype,
err, oc, ou;
5262 if (dlm_locking_stopped(ls)) {
5263 log_debug(ls,
"recover_waiters_post aborted");
5268 lkb = find_resend_waiter(ls);
5277 oc = is_overlap_cancel(lkb);
5278 ou = is_overlap_unlock(lkb);
5281 log_debug(ls,
"waiter %x remote %x msg %d r_nodeid %d "
5282 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5315 _unlock_lock(r, lkb);
5325 _request_lock(r, lkb);
5327 confirm_master(r, 0);
5330 _convert_lock(r, lkb);
5338 log_error(ls,
"waiter %x msg %d r_nodeid %d "
5339 "dir_nodeid %d overlap %d %d",
5351 static void purge_mstcpy_list(
struct dlm_ls *ls,
struct dlm_rsb *r,
5357 if (!is_master_copy(lkb))
5370 log_error(ls,
"purged mstcpy lkb not released");
5383 static void purge_dead_list(
struct dlm_ls *ls,
struct dlm_rsb *r,
5385 int nodeid_gone,
unsigned int *count)
5390 if (!is_master_copy(lkb))
5400 log_error(ls,
"purged dead lkb not released");
5415 int nodes_count = 0;
5416 int nodeid_gone = 0;
5417 unsigned int lkb_count = 0;
5424 nodeid_gone = memb->
nodeid;
5436 nodeid_gone, &lkb_count);
5438 nodeid_gone, &lkb_count);
5440 nodeid_gone, &lkb_count);
5449 log_debug(ls,
"dlm_recover_purge %u locks for %u nodes",
5450 lkb_count, nodes_count);
5453 static struct dlm_rsb *find_grant_rsb(
struct dlm_ls *ls,
int bucket)
5464 if (!is_master(r)) {
5469 spin_unlock(&ls->
ls_rsbtbl[bucket].lock);
5472 spin_unlock(&ls->
ls_rsbtbl[bucket].lock);
5497 unsigned int count = 0;
5498 unsigned int rsb_count = 0;
5499 unsigned int lkb_count = 0;
5502 r = find_grant_rsb(ls, bucket);
5513 grant_pending_locks(r, &count);
5516 confirm_master(r, 0);
5523 log_debug(ls,
"dlm_recover_grant %u locks on %u resources",
5524 lkb_count, rsb_count);
5527 static struct dlm_lkb *search_remid_list(
struct list_head *head,
int nodeid,
5539 static struct dlm_lkb *search_remid(
struct dlm_rsb *r,
int nodeid,
5557 static int receive_rcom_lock_args(
struct dlm_ls *ls,
struct dlm_lkb *lkb,
5592 middle_conversion(lkb)) {
5614 int from_nodeid = rc->
rc_header.h_nodeid;
5640 log_error(ls,
"dlm_recover_master_copy remote %d %x not dir",
5641 from_nodeid, remid);
5646 lkb = search_remid(r, from_nodeid, remid);
5652 error = create_lkb(ls, &lkb);
5656 error = receive_rcom_lock_args(ls, lkb, r, rc);
5681 if (error && error != -
EEXIST)
5682 log_debug(ls,
"dlm_recover_master_copy remote %d %x error %d",
5683 from_nodeid, remid, error);
5701 error = find_lkb(ls, lkid, &lkb);
5703 log_error(ls,
"dlm_recover_process_copy no %x remote %d %x %d",
5704 lkid, rc->
rc_header.h_nodeid, remid, result);
5712 if (!is_process_copy(lkb)) {
5713 log_error(ls,
"dlm_recover_process_copy bad %x remote %d %x %d",
5714 lkid, rc->
rc_header.h_nodeid, remid, result);
5728 log_debug(ls,
"dlm_recover_process_copy %x remote %d %x %d",
5729 lkid, rc->
rc_header.h_nodeid, remid, result);
5738 log_error(ls,
"dlm_recover_process_copy %x remote %d %x %d unk",
5739 lkid, rc->
rc_header.h_nodeid, remid, result);
5754 int mode,
uint32_t flags,
void *name,
unsigned int namelen,
5755 unsigned long timeout_cs)
5761 dlm_lock_recovery(ls);
5763 error = create_lkb(ls, &lkb);
5771 if (!ua->
lksb.sb_lvbptr) {
5783 error = set_lock_args(mode, &ua->
lksb, flags, namelen, timeout_cs,
5784 fake_astfn, ua, fake_bastfn, &args);
5792 error = request_lock(ls, lkb, name, namelen, &args);
5809 spin_lock(&ua->
proc->locks_spin);
5812 spin_unlock(&ua->
proc->locks_spin);
5820 unsigned long timeout_cs)
5827 dlm_lock_recovery(ls);
5829 error = find_lkb(ls, lkid, &lkb);
5840 if (!ua->
lksb.sb_lvbptr) {
5845 if (lvb_in && ua->
lksb.sb_lvbptr)
5855 error = set_lock_args(mode, &ua->
lksb, flags, 0, timeout_cs,
5856 fake_astfn, ua, fake_bastfn, &args);
5860 error = convert_lock(ls, lkb, &args);
5880 dlm_lock_recovery(ls);
5882 error = find_lkb(ls, lkid, &lkb);
5888 if (lvb_in && ua->
lksb.sb_lvbptr)
5894 error = set_unlock_args(flags, ua, &args);
5898 error = unlock_lock(ls, lkb, &args);
5908 spin_lock(&ua->
proc->locks_spin);
5912 spin_unlock(&ua->
proc->locks_spin);
5929 dlm_lock_recovery(ls);
5931 error = find_lkb(ls, lkid, &lkb);
5940 error = set_unlock_args(flags, ua, &args);
5944 error = cancel_lock(ls, lkb, &args);
5949 if (error == -
EBUSY)
5967 dlm_lock_recovery(ls);
5969 error = find_lkb(ls, lkid, &lkb);
5975 error = set_unlock_args(flags, ua, &args);
5985 error = validate_unlock_args(lkb, &args);
5990 error = _cancel_lock(r, lkb);
5998 if (error == -
EBUSY)
6010 static int orphan_proc_lock(
struct dlm_ls *ls,
struct dlm_lkb *lkb)
6020 set_unlock_args(0, lkb->
lkb_ua, &args);
6022 error = cancel_lock(ls, lkb, &args);
6031 static int unlock_proc_lock(
struct dlm_ls *ls,
struct dlm_lkb *lkb)
6038 error = unlock_lock(ls, lkb, &args);
6054 if (list_empty(&proc->
locks))
6083 dlm_lock_recovery(ls);
6086 lkb = del_proc_lock(ls, proc);
6091 orphan_proc_lock(ls, lkb);
6093 unlock_proc_lock(ls, lkb);
6129 if (!list_empty(&proc->
locks)) {
6140 unlock_proc_lock(ls, lkb);
6164 static void do_purge(
struct dlm_ls *ls,
int nodeid,
int pid)
6172 unlock_proc_lock(ls, lkb);
6179 static int send_purge(
struct dlm_ls *ls,
int nodeid,
int pid)
6182 struct dlm_mhandle *mh;
6185 error = _create_message(ls,
sizeof(
struct dlm_message), nodeid,
6192 return send_message(mh, ms);
6196 int nodeid,
int pid)
6201 error = send_purge(ls, nodeid, pid);
6203 dlm_lock_recovery(ls);
6205 purge_proc_locks(ls, proc);
6207 do_purge(ls, nodeid, pid);