58 #include <linux/types.h> 
   59 #include <linux/rbtree.h> 
   60 #include <linux/slab.h> 
   85 static int send_remove(
struct dlm_rsb *
r);
 
   88 static void __receive_convert_reply(
struct dlm_rsb *
r, 
struct dlm_lkb *lkb,
 
   91 static void do_purge(
struct dlm_ls *ls, 
int nodeid, 
int pid);
 
   92 static void del_timeout(
struct dlm_lkb *lkb);
 
   93 static void toss_rsb(
struct kref *
kref);
 
  103 static const int __dlm_compat_matrix[8][8] = {
 
  105         {1, 1, 1, 1, 1, 1, 1, 0},       
 
  106         {1, 1, 1, 1, 1, 1, 1, 0},       
 
  107         {1, 1, 1, 1, 1, 1, 0, 0},       
 
  108         {1, 1, 1, 1, 0, 0, 0, 0},       
 
  109         {1, 1, 1, 0, 1, 0, 0, 0},       
 
  110         {1, 1, 1, 0, 0, 0, 0, 0},       
 
  111         {1, 1, 0, 0, 0, 0, 0, 0},       
 
  112         {0, 0, 0, 0, 0, 0, 0, 0}        
 
  126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, 
 
  127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, 
 
  128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, 
 
  129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, 
 
  130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, 
 
  131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, 
 
  132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, 
 
  133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  
 
  136 #define modes_compat(gr, rq) \ 
  137     __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1] 
  141     return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
 
  150 static const int __quecvt_compat_matrix[8][8] = {
 
  152         {0, 0, 0, 0, 0, 0, 0, 0},       
 
  153         {0, 0, 1, 1, 1, 1, 1, 0},       
 
  154         {0, 0, 0, 1, 1, 1, 1, 0},       
 
  155         {0, 0, 0, 0, 1, 1, 1, 0},       
 
  156         {0, 0, 0, 1, 0, 1, 1, 0},       
 
  157         {0, 0, 0, 0, 0, 0, 1, 0},       
 
  158         {0, 0, 0, 0, 0, 0, 0, 0},       
 
  159         {0, 0, 0, 0, 0, 0, 0, 0}        
 
  164     printk(
KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " 
  165            "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
 
  172 static void dlm_print_rsb(
struct dlm_rsb *
r)
 
  174     printk(
KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x " 
  187     printk(
KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
 
  205 static inline void dlm_lock_recovery(
struct dlm_ls *ls)
 
  220 static inline int can_be_queued(
struct dlm_lkb *lkb)
 
  225 static inline int force_blocking_asts(
struct dlm_lkb *lkb)
 
  230 static inline int is_demoted(
struct dlm_lkb *lkb)
 
  235 static inline int is_altmode(
struct dlm_lkb *lkb)
 
  240 static inline int is_granted(
struct dlm_lkb *lkb)
 
  245 static inline int is_remote(
struct dlm_rsb *
r)
 
  251 static inline int is_process_copy(
struct dlm_lkb *lkb)
 
  256 static inline int is_master_copy(
struct dlm_lkb *lkb)
 
  261 static inline int middle_conversion(
struct dlm_lkb *lkb)
 
  269 static inline int down_conversion(
struct dlm_lkb *lkb)
 
  274 static inline int is_overlap_unlock(
struct dlm_lkb *lkb)
 
  279 static inline int is_overlap_cancel(
struct dlm_lkb *lkb)
 
  284 static inline int is_overlap(
struct dlm_lkb *lkb)
 
  290 static void queue_cast(
struct dlm_rsb *r, 
struct dlm_lkb *lkb, 
int rv)
 
  292     if (is_master_copy(lkb))
 
  314 static inline void queue_cast_overlap(
struct dlm_rsb *r, 
struct dlm_lkb *lkb)
 
  320 static void queue_bast(
struct dlm_rsb *r, 
struct dlm_lkb *lkb, 
int rqmode)
 
  322     if (is_master_copy(lkb)) {
 
  323         send_bast(r, lkb, rqmode);
 
  336 static inline void hold_rsb(
struct dlm_rsb *r)
 
  349 static void put_rsb(
struct dlm_rsb *r)
 
  355     kref_put(&r->
res_ref, toss_rsb);
 
  356     spin_unlock(&ls->
ls_rsbtbl[bucket].lock);
 
  364 static int pre_rsb_struct(
struct dlm_ls *ls)
 
  400 static int get_rsb_struct(
struct dlm_ls *ls, 
char *
name, 
int len,
 
  438 static int rsb_cmp(
struct dlm_rsb *r, 
const char *name, 
int nlen)
 
  443     memcpy(maxname, name, nlen);
 
  456         rc = rsb_cmp(r, name, len);
 
  545 static int find_rsb_dir(
struct dlm_ls *ls, 
char *name, 
int len,
 
  547             int dir_nodeid, 
int from_nodeid,
 
  559         if (from_nodeid == dir_nodeid)
 
  582     if (from_local || from_dir ||
 
  583         (from_other && (dir_nodeid == our_nodeid))) {
 
  589         error = pre_rsb_struct(ls);
 
  624         log_debug(ls, 
"find_rsb toss from_other %d master %d dir %d %s",
 
  633         log_error(ls, 
"find_rsb toss from_dir %d master %d",
 
  651     error = rsb_insert(r, &ls->
ls_rsbtbl[b].keep);
 
  660     if (error == -
EBADR && !create)
 
  663     error = get_rsb_struct(ls, name, len, &r);
 
  678         log_debug(ls, 
"find_rsb new from_dir %d recreate %s",
 
  685     if (from_other && (dir_nodeid != our_nodeid)) {
 
  687         log_error(ls, 
"find_rsb new from_other %d dir %d our %d %s",
 
  688               from_nodeid, dir_nodeid, our_nodeid, r->
res_name);
 
  695         log_debug(ls, 
"find_rsb new from_other %d dir %d %s",
 
  696               from_nodeid, dir_nodeid, r->
res_name);
 
  699     if (dir_nodeid == our_nodeid) {
 
  711     error = rsb_insert(r, &ls->
ls_rsbtbl[b].keep);
 
  723 static int find_rsb_nodir(
struct dlm_ls *ls, 
char *name, 
int len,
 
  725               int dir_nodeid, 
int from_nodeid,
 
  726               unsigned int flags, 
struct dlm_rsb **r_ret)
 
  734     error = pre_rsb_struct(ls);
 
  766         log_error(ls, 
"find_rsb toss from_nodeid %d master %d dir %d",
 
  774         (dir_nodeid == our_nodeid)) {
 
  777         log_error(ls, 
"find_rsb toss our %d master %d dir %d",
 
  785     error = rsb_insert(r, &ls->
ls_rsbtbl[b].keep);
 
  794     error = get_rsb_struct(ls, name, len, &r);
 
  806     r->
res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
 
  809     error = rsb_insert(r, &ls->
ls_rsbtbl[b].keep);
 
  817 static int find_rsb(
struct dlm_ls *ls, 
char *name, 
int len, 
int from_nodeid,
 
  818             unsigned int flags, 
struct dlm_rsb **r_ret)
 
  826     hash = jhash(name, len, 0);
 
  831     if (dlm_no_directory(ls))
 
  832         return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
 
  833                       from_nodeid, flags, r_ret);
 
  835         return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
 
  836                       from_nodeid, flags, r_ret);
 
  842 static int validate_master_nodeid(
struct dlm_ls *ls, 
struct dlm_rsb *r,
 
  845     if (dlm_no_directory(ls)) {
 
  846         log_error(ls, 
"find_rsb keep from_nodeid %d master %d dir %d",
 
  859             log_debug(ls, 
"validate master from_other %d master %d " 
  860                   "dir %d first %x %s", from_nodeid,
 
  870             log_error(ls, 
"validate master from_dir %d master %d " 
  912               unsigned int flags, 
int *r_nodeid, 
int *
result)
 
  919     int dir_nodeid, 
error, toss_list = 0;
 
  924     if (from_nodeid == our_nodeid) {
 
  925         log_error(ls, 
"dlm_master_lookup from our_nodeid %d flags %x",
 
  930     hash = jhash(name, len, 0);
 
  934     if (dir_nodeid != our_nodeid) {
 
  935         log_error(ls, 
"dlm_master_lookup from %d dir %d our %d h %x %d",
 
  936               from_nodeid, dir_nodeid, our_nodeid, hash,
 
  943     error = pre_rsb_struct(ls);
 
  970         log_error(ls, 
"dlm_master_lookup res_dir %d our %d %s",
 
  987             log_error(ls, 
"dlm_master_lookup fix_master on toss");
 
  997         log_limit(ls, 
"dlm_master_lookup from_master %d " 
  998               "master_nodeid %d res_nodeid %d first %x %s",
 
 1003             log_error(ls, 
"from_master %d our_master", from_nodeid);
 
 1018         log_debug(ls, 
"dlm_master_lookup master 0 to %d first %x %s",
 
 1024     if (!from_master && !fix_master &&
 
 1030         log_limit(ls, 
"dlm_master_lookup from master %d flags %x " 
 1031               "first %x %s", from_nodeid, flags,
 
 1052     error = get_rsb_struct(ls, name, len, &r);
 
 1068     error = rsb_insert(r, &ls->
ls_rsbtbl[b].toss);
 
 1078     *r_nodeid = from_nodeid;
 
 1108     hash = jhash(name, len, 0);
 
 1125 static void toss_rsb(
struct kref *
kref)
 
 1143 static void unhold_rsb(
struct dlm_rsb *r)
 
 1146     rv = kref_put(&r->
res_ref, toss_rsb);
 
 1150 static void kill_rsb(
struct kref *kref)
 
 1174 static void detach_lkb(
struct dlm_lkb *lkb)
 
 1182 static int create_lkb(
struct dlm_ls *ls, 
struct dlm_lkb **lkb_ret)
 
 1216         log_error(ls, 
"create_lkb idr error %d", rv);
 
 1235     return lkb ? 0 : -
ENOENT;
 
 1238 static void kill_lkb(
struct kref *kref)
 
 1251 static int __put_lkb(
struct dlm_ls *ls, 
struct dlm_lkb *lkb)
 
 1256     if (kref_put(&lkb->
lkb_ref, kill_lkb)) {
 
 1281     return __put_lkb(ls, lkb);
 
 1287 static inline void hold_lkb(
struct dlm_lkb *lkb)
 
 1297 static inline void unhold_lkb(
struct dlm_lkb *lkb)
 
 1300     rv = kref_put(&lkb->
lkb_ref, kill_lkb);
 
 1320     kref_get(&lkb->lkb_ref);
 
 1326     lkb->lkb_status = 
status;
 
 1331             list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
 
 1337         lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
 
 1342             list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
 
 1345                       &r->res_convertqueue);
 
 1363     add_lkb(r, lkb, sts);
 
 1367 static int msg_reply_type(
int mstype)
 
 1384 static int nodeid_warned(
int nodeid, 
int num_nodes, 
int *warned)
 
 1393         if (warned[i] == nodeid)
 
 1404     s64 debug_maxus = 0;
 
 1405     u32 debug_scanned = 0;
 
 1406     u32 debug_expired = 0;
 
 1429         if (us > debug_maxus)
 
 1434             warned = kzalloc(num_nodes * 
sizeof(
int), 
GFP_KERNEL);
 
 1441         log_error(ls, 
"waitwarn %x %lld %d us check connection to " 
 1442               "node %d", lkb->
lkb_id, (
long long)us,
 
 1449         log_debug(ls, 
"scan_waiters %u warn %u over %d us max %lld us",
 
 1450               debug_scanned, debug_expired,
 
 1451               dlm_config.ci_waitwarn_us, (
long long)debug_maxus);
 
 1457 static int add_to_waiters(
struct dlm_lkb *lkb, 
int mstype, 
int to_nodeid)
 
 1464     if (is_overlap_unlock(lkb) ||
 
 1485         log_debug(ls, 
"addwait %x cur %d overlap %d count %d f %x",
 
 1503         log_error(ls, 
"addwait error %x %d flags %x %d %d %s",
 
 1515 static int _remove_from_waiters(
struct dlm_lkb *lkb, 
int mstype,
 
 1519     int overlap_done = 0;
 
 1540         log_debug(ls, 
"remwait %x cancel_reply wait_type %d",
 
 1555         is_overlap_cancel(lkb) && ms && !ms->
m_result) {
 
 1556         log_debug(ls, 
"remwait %x convert_reply zap overlap_cancel",
 
 1572     log_error(ls, 
"remwait error %x remote %d %x msg %d flags %x no wait",
 
 1584         log_error(ls, 
"remwait error %x reply %d wait_type %d overlap",
 
 1600 static int remove_from_waiters(
struct dlm_lkb *lkb, 
int mstype)
 
 1606     error = _remove_from_waiters(lkb, mstype, 
NULL);
 
 1621     error = _remove_from_waiters(lkb, ms->
m_type, ms);
 
 1632 static void wait_pending_remove(
struct dlm_rsb *r)
 
 1639         log_debug(ls, 
"delay lookup for remove dir %d %s",
 
 1655 static void shrink_bucket(
struct dlm_ls *ls, 
int b)
 
 1661     int remote_count = 0;
 
 1676         if (!dlm_no_directory(ls) &&
 
 1687         if (!dlm_no_directory(ls) &&
 
 1705         if (!kref_put(&r->
res_ref, kill_rsb)) {
 
 1729     for (i = 0; i < remote_count; i++) {
 
 1737             log_debug(ls, 
"remove_name not toss %s", name);
 
 1743             log_debug(ls, 
"remove_name master %d dir %d our %d %s",
 
 1752             log_error(ls, 
"remove_name dir %d master %d our %d %s",
 
 1761             log_debug(ls, 
"remove_name toss_time %lu now %lu %s",
 
 1766         if (!kref_put(&r->
res_ref, kill_rsb)) {
 
 1768             log_error(ls, 
"remove_name in use %s", name);
 
 1798         shrink_bucket(ls, i);
 
 1799         if (dlm_locking_stopped(ls))
 
 1805 static void add_timeout(
struct dlm_lkb *lkb)
 
 1809     if (is_master_copy(lkb))
 
 1829 static void del_timeout(
struct dlm_lkb *lkb)
 
 1851     int do_cancel, do_warn;
 
 1855         if (dlm_locking_stopped(ls))
 
 1863             wait_us = ktime_to_us(ktime_sub(
ktime_get(),
 
 1871                 wait_us >= 
dlm_config.ci_timewarn_cs * 10000)
 
 1874             if (!do_cancel && !do_warn)
 
 1881         if (!do_cancel && !do_warn)
 
 1897             log_debug(ls, 
"timeout cancel %x node %d %s",
 
 1902             _cancel_lock(r, lkb);
 
 1940     int b, len = r->
res_ls->ls_lvblen;
 
 1961     } 
else if (b == 0) {
 
 1989 static void set_lvb_unlock(
struct dlm_rsb *r, 
struct dlm_lkb *lkb)
 
 2018 static void set_lvb_lock_pc(
struct dlm_rsb *r, 
struct dlm_lkb *lkb,
 
 2031         int len = receive_extralen(ms);
 
 2060     set_lvb_unlock(r, lkb);
 
 2061     _remove_lock(r, lkb);
 
 2064 static void remove_lock_pc(
struct dlm_rsb *r, 
struct dlm_lkb *lkb)
 
 2066     _remove_lock(r, lkb);
 
 2100 static int revert_lock_pc(
struct dlm_rsb *r, 
struct dlm_lkb *lkb)
 
 2102     return revert_lock(r, lkb);
 
 2121     set_lvb_lock(r, lkb);
 
 2122     _grant_lock(r, lkb);
 
 2125 static void grant_lock_pc(
struct dlm_rsb *r, 
struct dlm_lkb *lkb,
 
 2128     set_lvb_lock_pc(r, lkb, ms);
 
 2129     _grant_lock(r, lkb);
 
 2136 static void grant_lock_pending(
struct dlm_rsb *r, 
struct dlm_lkb *lkb)
 
 2139     if (is_master_copy(lkb))
 
 2142         queue_cast(r, lkb, 0);
 
 2153 static void munge_demoted(
struct dlm_lkb *lkb)
 
 2156         log_print(
"munge_demoted %x invalid modes gr %d rq %d",
 
 2168         log_print(
"munge_altmode %x invalid reply type %d",
 
 2183 static inline int first_in_list(
struct dlm_lkb *lkb, 
struct list_head *head)
 
 2249 static int conversion_deadlock_detect(
struct dlm_rsb *r, 
struct dlm_lkb *lkb2)
 
 2252     int lkb_is_ahead = 0;
 
 2260         if (!lkb_is_ahead) {
 
 2288 static int _can_be_granted(
struct dlm_rsb *r, 
struct dlm_lkb *lkb, 
int now,
 
 2346     if (conv && recover)
 
 2434 static int can_be_granted(
struct dlm_rsb *r, 
struct dlm_lkb *lkb, 
int now,
 
 2435               int recover, 
int *
err)
 
 2444     rv = _can_be_granted(r, lkb, now, recover);
 
 2454     if (is_convert && can_be_queued(lkb) &&
 
 2455         conversion_deadlock_detect(r, lkb)) {
 
 2463                 log_print(
"can_be_granted deadlock %x now %d",
 
 2485         rv = _can_be_granted(r, lkb, now, 0);
 
 2505 static int grant_pending_convert(
struct dlm_rsb *r, 
int high, 
int *cw,
 
 2506                  unsigned int *count)
 
 2510     int hi, demoted, quit, grant_restart, demote_restart;
 
 2520         demoted = is_demoted(lkb);
 
 2523         if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
 
 2524             grant_lock_pending(r, lkb);
 
 2531         if (!demoted && is_demoted(lkb)) {
 
 2532             log_print(
"WARN: pending demoted %x node %d %s",
 
 2539             log_print(
"WARN: pending deadlock %x node %d %s",
 
 2553     if (demote_restart && !quit) {
 
 2558     return max_t(
int, high, hi);
 
 2561 static int grant_pending_wait(
struct dlm_rsb *r, 
int high, 
int *cw,
 
 2562                   unsigned int *count)
 
 2567         if (can_be_granted(r, lkb, 0, 0, 
NULL)) {
 
 2568             grant_lock_pending(r, lkb);
 
 2586 static int lock_requires_bast(
struct dlm_lkb *
gr, 
int high, 
int cw)
 
 2595         !__dlm_compat_matrix[gr->
lkb_grmode+1][high+1])
 
 2600 static void grant_pending_locks(
struct dlm_rsb *r, 
unsigned int *count)
 
 2606     if (!is_master(r)) {
 
 2612     high = grant_pending_convert(r, high, &cw, count);
 
 2613     high = grant_pending_wait(r, high, &cw, count);
 
 2625         if (lkb->
lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
 
 2630                 queue_bast(r, lkb, high);
 
 2659         if (gr->
lkb_bastfn && modes_require_bast(gr, lkb)) {
 
 2666 static void send_blocking_asts(
struct dlm_rsb *r, 
struct dlm_lkb *lkb)
 
 2671 static void send_blocking_asts_all(
struct dlm_rsb *r, 
struct dlm_lkb *lkb)
 
 2738     wait_pending_remove(r);
 
 2741     send_lookup(r, lkb);
 
 2745 static void process_lookup_list(
struct dlm_rsb *r)
 
 2751         _request_lock(r, lkb);
 
 2758 static void confirm_master(
struct dlm_rsb *r, 
int error)
 
 2769         process_lookup_list(r);
 
 2786             _request_lock(r, lkb);
 
 2796              int namelen, 
unsigned long timeout_cs,
 
 2797              void (*ast) (
void *astparam),
 
 2799              void (*bast) (
void *astparam, 
int mode),
 
 2842     if (flags & DLM_LKF_CONVERT && !lksb->
sb_lkid)
 
 2861 static int set_unlock_args(
uint32_t flags, 
void *astarg, 
struct dlm_args *args)
 
 2875 static int validate_lock_args(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 2880     if (args->
flags & DLM_LKF_CONVERT) {
 
 2884         if (args->
flags & DLM_LKF_QUECVT &&
 
 2895         if (is_overlap(lkb))
 
 2912         log_debug(ls, 
"validate_lock_args %d %x %x %x %d %d %s",
 
 2926 static int validate_unlock_args(
struct dlm_lkb *lkb, 
struct dlm_args *args)
 
 2955                    args->
flags & DLM_LKF_CANCEL ?
 
 2966     if (args->
flags & DLM_LKF_CANCEL) {
 
 2970         if (is_overlap(lkb))
 
 3011         if (is_overlap_unlock(lkb))
 
 3049         log_debug(ls, 
"validate_unlock_args %d %x %x %x %x %d %s", rv,
 
 3067     if (can_be_granted(r, lkb, 1, 0, 
NULL)) {
 
 3069         queue_cast(r, lkb, 0);
 
 3073     if (can_be_queued(lkb)) {
 
 3081     queue_cast(r, lkb, -
EAGAIN);
 
 3086 static void do_request_effects(
struct dlm_rsb *r, 
struct dlm_lkb *lkb,
 
 3091         if (force_blocking_asts(lkb))
 
 3092             send_blocking_asts_all(r, lkb);
 
 3095         send_blocking_asts(r, lkb);
 
 3107     if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
 
 3109         queue_cast(r, lkb, 0);
 
 3119         revert_lock(r, lkb);
 
 3131     if (is_demoted(lkb)) {
 
 3133         if (_can_be_granted(r, lkb, 1, 0)) {
 
 3135             queue_cast(r, lkb, 0);
 
 3141     if (can_be_queued(lkb)) {
 
 3150     queue_cast(r, lkb, -
EAGAIN);
 
 3155 static void do_convert_effects(
struct dlm_rsb *r, 
struct dlm_lkb *lkb,
 
 3160         grant_pending_locks(r, 
NULL);
 
 3164         if (force_blocking_asts(lkb))
 
 3165             send_blocking_asts_all(r, lkb);
 
 3168         send_blocking_asts(r, lkb);
 
 3175     remove_lock(r, lkb);
 
 3180 static void do_unlock_effects(
struct dlm_rsb *r, 
struct dlm_lkb *lkb,
 
 3183     grant_pending_locks(r, 
NULL);
 
 3192     error = revert_lock(r, lkb);
 
 3200 static void do_cancel_effects(
struct dlm_rsb *r, 
struct dlm_lkb *lkb,
 
 3204         grant_pending_locks(r, 
NULL);
 
 3220     error = set_master(r, lkb);
 
 3230         error = send_request(r, lkb);
 
 3232         error = do_request(r, lkb);
 
 3235         do_request_effects(r, lkb, error);
 
 3249         error = send_convert(r, lkb);
 
 3251         error = do_convert(r, lkb);
 
 3254         do_convert_effects(r, lkb, error);
 
 3268         error = send_unlock(r, lkb);
 
 3270         error = do_unlock(r, lkb);
 
 3273         do_unlock_effects(r, lkb, error);
 
 3287         error = send_cancel(r, lkb);
 
 3289         error = do_cancel(r, lkb);
 
 3292         do_cancel_effects(r, lkb, error);
 
 3303 static int request_lock(
struct dlm_ls *ls, 
struct dlm_lkb *lkb, 
char *name,
 
 3309     error = validate_lock_args(ls, lkb, args);
 
 3313     error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
 
 3322     error = _request_lock(r, lkb);
 
 3329 static int convert_lock(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 3340     error = validate_lock_args(ls, lkb, args);
 
 3344     error = _convert_lock(r, lkb);
 
 3351 static int unlock_lock(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 3362     error = validate_unlock_args(lkb, args);
 
 3366     error = _unlock_lock(r, lkb);
 
 3373 static int cancel_lock(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 3384     error = validate_unlock_args(lkb, args);
 
 3388     error = _cancel_lock(r, lkb);
 
 3404          unsigned int namelen,
 
 3406          void (*ast) (
void *astarg),
 
 3408          void (*bast) (
void *astarg, 
int mode))
 
 3419     dlm_lock_recovery(ls);
 
 3422         error = find_lkb(ls, lksb->
sb_lkid, &lkb);
 
 3424         error = create_lkb(ls, &lkb);
 
 3429     error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
 
 3430                   astarg, bast, &args);
 
 3435         error = convert_lock(ls, lkb, &args);
 
 3437         error = request_lock(ls, lkb, name, namelen, &args);
 
 3442     if (convert || error)
 
 3467     dlm_lock_recovery(ls);
 
 3469     error = find_lkb(ls, lkid, &lkb);
 
 3473     error = set_unlock_args(flags, astarg, &args);
 
 3477     if (flags & DLM_LKF_CANCEL)
 
 3478         error = cancel_lock(ls, lkb, &args);
 
 3480         error = unlock_lock(ls, lkb, &args);
 
 3516 static int _create_message(
struct dlm_ls *ls, 
int mb_len,
 
 3517                int to_nodeid, 
int mstype,
 
 3519                struct dlm_mhandle **mh_ret)
 
 3522     struct dlm_mhandle *mh;
 
 3550 static int create_message(
struct dlm_rsb *r, 
struct dlm_lkb *lkb,
 
 3551               int to_nodeid, 
int mstype,
 
 3553               struct dlm_mhandle **mh_ret)
 
 3569             mb_len += r->
res_ls->ls_lvblen;
 
 3573     return _create_message(r->
res_ls, mb_len, to_nodeid, mstype,
 
 3580 static int send_message(
struct dlm_mhandle *mh, 
struct dlm_message *ms)
 
 3631 static int send_common(
struct dlm_rsb *r, 
struct dlm_lkb *lkb, 
int mstype)
 
 3634     struct dlm_mhandle *mh;
 
 3635     int to_nodeid, 
error;
 
 3639     error = add_to_waiters(lkb, mstype, to_nodeid);
 
 3643     error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
 
 3647     send_args(r, lkb, ms);
 
 3649     error = send_message(mh, ms);
 
 3655     remove_from_waiters(lkb, msg_reply_type(mstype));
 
 3671     if (!error && down_conversion(lkb)) {
 
 3675         r->
res_ls->ls_stub_ms.m_result = 0;
 
 3676         __receive_convert_reply(r, lkb, &r->
res_ls->ls_stub_ms);
 
 3699     struct dlm_mhandle *mh;
 
 3700     int to_nodeid, 
error;
 
 3704     error = create_message(r, lkb, to_nodeid, 
DLM_MSG_GRANT, &ms, &mh);
 
 3708     send_args(r, lkb, ms);
 
 3712     error = send_message(mh, ms);
 
 3717 static int send_bast(
struct dlm_rsb *r, 
struct dlm_lkb *lkb, 
int mode)
 
 3720     struct dlm_mhandle *mh;
 
 3721     int to_nodeid, 
error;
 
 3729     send_args(r, lkb, ms);
 
 3733     error = send_message(mh, ms);
 
 3741     struct dlm_mhandle *mh;
 
 3742     int to_nodeid, 
error;
 
 3754     send_args(r, lkb, ms);
 
 3756     error = send_message(mh, ms);
 
 3766 static int send_remove(
struct dlm_rsb *r)
 
 3769     struct dlm_mhandle *mh;
 
 3770     int to_nodeid, 
error;
 
 3781     error = send_message(mh, ms);
 
 3786 static int send_common_reply(
struct dlm_rsb *r, 
struct dlm_lkb *lkb,
 
 3790     struct dlm_mhandle *mh;
 
 3791     int to_nodeid, 
error;
 
 3795     error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
 
 3799     send_args(r, lkb, ms);
 
 3803     error = send_message(mh, ms);
 
 3808 static int send_request_reply(
struct dlm_rsb *r, 
struct dlm_lkb *lkb, 
int rv)
 
 3813 static int send_convert_reply(
struct dlm_rsb *r, 
struct dlm_lkb *lkb, 
int rv)
 
 3818 static int send_unlock_reply(
struct dlm_rsb *r, 
struct dlm_lkb *lkb, 
int rv)
 
 3823 static int send_cancel_reply(
struct dlm_rsb *r, 
struct dlm_lkb *lkb, 
int rv)
 
 3829                  int ret_nodeid, 
int rv)
 
 3833     struct dlm_mhandle *mh;
 
 3844     error = send_message(mh, ms);
 
 3871 static int receive_extralen(
struct dlm_message *ms)
 
 3876 static int receive_lvb(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 3886         len = receive_extralen(ms);
 
 3894 static void fake_bastfn(
void *astparam, 
int mode)
 
 3896     log_print(
"fake_bastfn should not be called");
 
 3899 static void fake_astfn(
void *astparam)
 
 3901     log_print(
"fake_astfn should not be called");
 
 3904 static int receive_request_args(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 3926 static int receive_convert_args(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 3932     if (receive_lvb(ls, lkb, ms))
 
 3941 static int receive_unlock_args(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 3944     if (receive_lvb(ls, lkb, ms))
 
 3971         if (!is_master_copy(lkb) || lkb->
lkb_nodeid != from)
 
 3980         if (!is_process_copy(lkb) || lkb->
lkb_nodeid != from)
 
 3985         if (!is_process_copy(lkb))
 
 3997               "ignore invalid message %d from %d %x %x %x %d",
 
 4003 static void send_repeat_remove(
struct dlm_ls *ls, 
char *ms_name, 
int len)
 
 4007     struct dlm_mhandle *mh;
 
 4012     memset(name, 0, 
sizeof(name));
 
 4013     memcpy(name, ms_name, len);
 
 4015     hash = jhash(name, len, 0);
 
 4020     log_error(ls, 
"send_repeat_remove dir %d %s", dir_nodeid, name);
 
 4026         log_error(ls, 
"repeat_remove on keep %s", name);
 
 4033         log_error(ls, 
"repeat_remove on toss %s", name);
 
 4045     rv = _create_message(ls, 
sizeof(
struct dlm_message) + len,
 
 4053     send_message(mh, ms);
 
 4066     int error, namelen = 0;
 
 4068     from_nodeid = ms->
m_header.h_nodeid;
 
 4070     error = create_lkb(ls, &lkb);
 
 4074     receive_flags(lkb, ms);
 
 4076     error = receive_request_args(ls, lkb, ms);
 
 4088     namelen = receive_extralen(ms);
 
 4090     error = find_rsb(ls, ms->
m_extra, namelen, from_nodeid,
 
 4091              R_RECEIVE_REQUEST, &r);
 
 4100         error = validate_master_nodeid(ls, r, from_nodeid);
 
 4110     error = do_request(r, lkb);
 
 4111     send_request_reply(r, lkb, error);
 
 4112     do_request_effects(r, lkb, error);
 
 4140         log_limit(ls, 
"receive_request %x from %d %d",
 
 4141               ms->
m_lkid, from_nodeid, error);
 
 4144     if (namelen && error == -
EBADR) {
 
 4145         send_repeat_remove(ls, ms->
m_extra, namelen);
 
 4149     setup_stub_lkb(ls, ms);
 
 4158     int error, reply = 1;
 
 4160     error = find_lkb(ls, ms->
m_remid, &lkb);
 
 4165         log_error(ls, 
"receive_convert %x remid %x recover_seq %llu " 
 4178     error = validate_message(lkb, ms);
 
 4182     receive_flags(lkb, ms);
 
 4184     error = receive_convert_args(ls, lkb, ms);
 
 4186         send_convert_reply(r, lkb, error);
 
 4190     reply = !down_conversion(lkb);
 
 4192     error = do_convert(r, lkb);
 
 4194         send_convert_reply(r, lkb, error);
 
 4195     do_convert_effects(r, lkb, error);
 
 4203     setup_stub_lkb(ls, ms);
 
 4214     error = find_lkb(ls, ms->
m_remid, &lkb);
 
 4219         log_error(ls, 
"receive_unlock %x remid %x remote %d %x",
 
 4231     error = validate_message(lkb, ms);
 
 4235     receive_flags(lkb, ms);
 
 4237     error = receive_unlock_args(ls, lkb, ms);
 
 4239         send_unlock_reply(r, lkb, error);
 
 4243     error = do_unlock(r, lkb);
 
 4244     send_unlock_reply(r, lkb, error);
 
 4245     do_unlock_effects(r, lkb, error);
 
 4253     setup_stub_lkb(ls, ms);
 
 4264     error = find_lkb(ls, ms->
m_remid, &lkb);
 
 4268     receive_flags(lkb, ms);
 
 4275     error = validate_message(lkb, ms);
 
 4279     error = do_cancel(r, lkb);
 
 4280     send_cancel_reply(r, lkb, error);
 
 4281     do_cancel_effects(r, lkb, error);
 
 4289     setup_stub_lkb(ls, ms);
 
 4300     error = find_lkb(ls, ms->
m_remid, &lkb);
 
 4309     error = validate_message(lkb, ms);
 
 4313     receive_flags_reply(lkb, ms);
 
 4314     if (is_altmode(lkb))
 
 4315         munge_altmode(lkb, ms);
 
 4316     grant_lock_pc(r, lkb, ms);
 
 4317     queue_cast(r, lkb, 0);
 
 4331     error = find_lkb(ls, ms->
m_remid, &lkb);
 
 4340     error = validate_message(lkb, ms);
 
 4355     int len, 
error, ret_nodeid, from_nodeid, our_nodeid;
 
 4357     from_nodeid = ms->
m_header.h_nodeid;
 
 4360     len = receive_extralen(ms);
 
 4366     if (!error && ret_nodeid == our_nodeid) {
 
 4367         receive_request(ls, ms);
 
 4370     send_lookup_reply(ls, ms, ret_nodeid, error);
 
 4378     int rv, len, dir_nodeid, from_nodeid;
 
 4380     from_nodeid = ms->
m_header.h_nodeid;
 
 4382     len = receive_extralen(ms);
 
 4385         log_error(ls, 
"receive_remove from %d bad len %d",
 
 4392         log_error(ls, 
"receive_remove from %d bad nodeid %d",
 
 4393               from_nodeid, dir_nodeid);
 
 4406     memset(name, 0, 
sizeof(name));
 
 4409     hash = jhash(name, len, 0);
 
 4420             log_error(ls, 
"receive_remove from %d not found %s",
 
 4427             log_error(ls, 
"receive_remove keep from %d master %d",
 
 4434         log_debug(ls, 
"receive_remove from %d master %d first %x %s",
 
 4442         log_error(ls, 
"receive_remove toss from %d master %d",
 
 4449     if (kref_put(&r->
res_ref, kill_rsb)) {
 
 4454         log_error(ls, 
"receive_remove from %d rsb ref error",
 
 4471     int from_nodeid = ms->
m_header.h_nodeid;
 
 4473     error = find_lkb(ls, ms->
m_remid, &lkb);
 
 4481     error = validate_message(lkb, ms);
 
 4488         log_error(ls, 
"receive_request_reply %x remote %d %x result %d",
 
 4508         queue_cast(r, lkb, -
EAGAIN);
 
 4509         confirm_master(r, -
EAGAIN);
 
 4516         receive_flags_reply(lkb, ms);
 
 4518         if (is_altmode(lkb))
 
 4519             munge_altmode(lkb, ms);
 
 4524             grant_lock_pc(r, lkb, ms);
 
 4525             queue_cast(r, lkb, 0);
 
 4527         confirm_master(r, result);
 
 4533         log_limit(ls, 
"receive_request_reply %x from %d %d " 
 4534               "master %d dir %d first %x %s", lkb->
lkb_id,
 
 4546         if (is_overlap(lkb)) {
 
 4548             queue_cast_overlap(r, lkb);
 
 4549             confirm_master(r, result);
 
 4552             _request_lock(r, lkb);
 
 4555                 confirm_master(r, 0);
 
 4560         log_error(ls, 
"receive_request_reply %x error %d",
 
 4564     if (is_overlap_unlock(lkb) && (result == 0 || result == -
EINPROGRESS)) {
 
 4565         log_debug(ls, 
"receive_request_reply %x result %d unlock",
 
 4569         send_unlock(r, lkb);
 
 4570     } 
else if (is_overlap_cancel(lkb) && (result == -
EINPROGRESS)) {
 
 4574         send_cancel(r, lkb);
 
 4586 static void __receive_convert_reply(
struct dlm_rsb *r, 
struct dlm_lkb *lkb,
 
 4593         queue_cast(r, lkb, -
EAGAIN);
 
 4597         receive_flags_reply(lkb, ms);
 
 4598         revert_lock_pc(r, lkb);
 
 4604         receive_flags_reply(lkb, ms);
 
 4605         if (is_demoted(lkb))
 
 4614         receive_flags_reply(lkb, ms);
 
 4615         if (is_demoted(lkb))
 
 4617         grant_lock_pc(r, lkb, ms);
 
 4618         queue_cast(r, lkb, 0);
 
 4638     error = validate_message(lkb, ms);
 
 4643     error = remove_from_waiters_ms(lkb, ms);
 
 4647     __receive_convert_reply(r, lkb, ms);
 
 4658     error = find_lkb(ls, ms->
m_remid, &lkb);
 
 4662     _receive_convert_reply(lkb, ms);
 
 4675     error = validate_message(lkb, ms);
 
 4680     error = remove_from_waiters_ms(lkb, ms);
 
 4688         receive_flags_reply(lkb, ms);
 
 4689         remove_lock_pc(r, lkb);
 
 4708     error = find_lkb(ls, ms->
m_remid, &lkb);
 
 4712     _receive_unlock_reply(lkb, ms);
 
 4725     error = validate_message(lkb, ms);
 
 4730     error = remove_from_waiters_ms(lkb, ms);
 
 4738         receive_flags_reply(lkb, ms);
 
 4739         revert_lock_pc(r, lkb);
 
 4758     error = find_lkb(ls, ms->
m_remid, &lkb);
 
 4762     _receive_cancel_reply(lkb, ms);
 
 4771     int error, ret_nodeid;
 
 4772     int do_lookup_list = 0;
 
 4774     error = find_lkb(ls, ms->
m_lkid, &lkb);
 
 4801         log_error(ls, 
"receive_lookup_reply %x from %d ret %d " 
 4802               "master %d dir %d our %d first %x %s",
 
 4813     } 
else if (ret_nodeid == -1) {
 
 4815         log_error(ls, 
"receive_lookup_reply %x from %d bad ret_nodeid",
 
 4826     if (is_overlap(lkb)) {
 
 4827         log_debug(ls, 
"receive_lookup_reply %x unlock %x",
 
 4829         queue_cast_overlap(r, lkb);
 
 4834     _request_lock(r, lkb);
 
 4838         process_lookup_list(r);
 
 4848     int error = 0, noent = 0;
 
 4851         log_limit(ls, 
"receive %d from non-member %d %x %x %d",
 
 4862         error = receive_request(ls, ms);
 
 4866         error = receive_convert(ls, ms);
 
 4870         error = receive_unlock(ls, ms);
 
 4875         error = receive_cancel(ls, ms);
 
 4881         error = receive_request_reply(ls, ms);
 
 4885         error = receive_convert_reply(ls, ms);
 
 4889         error = receive_unlock_reply(ls, ms);
 
 4893         error = receive_cancel_reply(ls, ms);
 
 4900         error = receive_grant(ls, ms);
 
 4905         error = receive_bast(ls, ms);
 
 4911         receive_lookup(ls, ms);
 
 4915         receive_remove(ls, ms);
 
 4921         receive_lookup_reply(ls, ms);
 
 4927         receive_purge(ls, ms);
 
 4945     if (error == -
ENOENT && noent) {
 
 4946         log_debug(ls, 
"receive %d no %x remote %d %x saved_seq %u",
 
 4949     } 
else if (error == -
ENOENT) {
 
 4950         log_error(ls, 
"receive %d no %x remote %d %x saved_seq %u",
 
 4955             dlm_dump_rsb_hash(ls, ms->
m_hash);
 
 4959         log_error(ls, 
"receive %d inval from %d lkid %x remid %x " 
 4977     if (dlm_locking_stopped(ls)) {
 
 4982             log_limit(ls, 
"receive %d from %d ignore old gen",
 
 4990         _receive_message(ls, ms, 0);
 
 5000     _receive_message(ls, ms, saved_seq);
 
 5014     switch (hd->
h_cmd) {
 
 5021         type = p->
rcom.rc_type;
 
 5029         log_print(
"invalid h_nodeid %d from %d lockspace %x",
 
 5038                 "%u from %d cmd %d type %d\n",
 
 5052         dlm_receive_message(ls, &p->
message, nodeid);
 
 5060 static void recover_convert_waiter(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 5063     if (middle_conversion(lkb)) {
 
 5070         _receive_convert_reply(lkb, ms_stub);
 
 5088 static int waiter_needs_recovery(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 5091     if (dlm_no_directory(ls))
 
 5110     int wait_type, stub_unlock_result, stub_cancel_result;
 
 5115         log_error(ls, 
"dlm_recover_waiters_pre no mem");
 
 5129             log_debug(ls, 
"waiter %x remote %x msg %d r_nodeid %d " 
 5130                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
 
 5148         if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
 
 5161             if (is_overlap_cancel(lkb)) {
 
 5164                     stub_cancel_result = 0;
 
 5166             if (is_overlap_unlock(lkb)) {
 
 5169                     stub_unlock_result = -
ENOENT;
 
 5172             log_debug(ls, 
"rwpre overlap %x %x %d %d %d",
 
 5174                   stub_cancel_result, stub_unlock_result);
 
 5177         switch (wait_type) {
 
 5184             recover_convert_waiter(ls, lkb, ms_stub);
 
 5192             ms_stub->
m_result = stub_unlock_result;
 
 5194             _receive_unlock_reply(lkb, ms_stub);
 
 5203             ms_stub->
m_result = stub_cancel_result;
 
 5205             _receive_cancel_reply(lkb, ms_stub);
 
 5210             log_error(ls, 
"invalid lkb wait_type %d %d",
 
 5219 static struct dlm_lkb *find_resend_waiter(
struct dlm_ls *ls)
 
 5259     int error = 0, mstype, 
err, oc, ou;
 
 5262         if (dlm_locking_stopped(ls)) {
 
 5263             log_debug(ls, 
"recover_waiters_post aborted");
 
 5268         lkb = find_resend_waiter(ls);
 
 5277         oc = is_overlap_cancel(lkb);
 
 5278         ou = is_overlap_unlock(lkb);
 
 5281         log_debug(ls, 
"waiter %x remote %x msg %d r_nodeid %d " 
 5282               "lkb_nodeid %d wait_nodeid %d dir_nodeid %d " 
 5315                     _unlock_lock(r, lkb);
 
 5325                 _request_lock(r, lkb);
 
 5327                     confirm_master(r, 0);
 
 5330                 _convert_lock(r, lkb);
 
 5338             log_error(ls, 
"waiter %x msg %d r_nodeid %d " 
 5339                   "dir_nodeid %d overlap %d %d",
 
 5351 static void purge_mstcpy_list(
struct dlm_ls *ls, 
struct dlm_rsb *r,
 
 5357         if (!is_master_copy(lkb))
 
 5370             log_error(ls, 
"purged mstcpy lkb not released");
 
 5383 static void purge_dead_list(
struct dlm_ls *ls, 
struct dlm_rsb *r,
 
 5385                 int nodeid_gone, 
unsigned int *count)
 
 5390         if (!is_master_copy(lkb))
 
 5400                 log_error(ls, 
"purged dead lkb not released");
 
 5415     int nodes_count = 0;
 
 5416     int nodeid_gone = 0;
 
 5417     unsigned int lkb_count = 0;
 
 5424         nodeid_gone = memb->
nodeid;
 
 5436                     nodeid_gone, &lkb_count);
 
 5438                     nodeid_gone, &lkb_count);
 
 5440                     nodeid_gone, &lkb_count);
 
 5449         log_debug(ls, 
"dlm_recover_purge %u locks for %u nodes",
 
 5450               lkb_count, nodes_count);
 
 5453 static struct dlm_rsb *find_grant_rsb(
struct dlm_ls *ls, 
int bucket)
 
 5464         if (!is_master(r)) {
 
 5469         spin_unlock(&ls->
ls_rsbtbl[bucket].lock);
 
 5472     spin_unlock(&ls->
ls_rsbtbl[bucket].lock);
 
 5497     unsigned int count = 0;
 
 5498     unsigned int rsb_count = 0;
 
 5499     unsigned int lkb_count = 0;
 
 5502         r = find_grant_rsb(ls, bucket);
 
 5513         grant_pending_locks(r, &count);
 
 5516         confirm_master(r, 0);
 
 5523         log_debug(ls, 
"dlm_recover_grant %u locks on %u resources",
 
 5524               lkb_count, rsb_count);
 
 5527 static struct dlm_lkb *search_remid_list(
struct list_head *head, 
int nodeid,
 
 5539 static struct dlm_lkb *search_remid(
struct dlm_rsb *r, 
int nodeid,
 
 5557 static int receive_rcom_lock_args(
struct dlm_ls *ls, 
struct dlm_lkb *lkb,
 
 5592         middle_conversion(lkb)) {
 
 5614     int from_nodeid = rc->
rc_header.h_nodeid;
 
 5640         log_error(ls, 
"dlm_recover_master_copy remote %d %x not dir",
 
 5641               from_nodeid, remid);
 
 5646     lkb = search_remid(r, from_nodeid, remid);
 
 5652     error = create_lkb(ls, &lkb);
 
 5656     error = receive_rcom_lock_args(ls, lkb, r, rc);
 
 5681     if (error && error != -
EEXIST)
 
 5682         log_debug(ls, 
"dlm_recover_master_copy remote %d %x error %d",
 
 5683               from_nodeid, remid, error);
 
 5701     error = find_lkb(ls, lkid, &lkb);
 
 5703         log_error(ls, 
"dlm_recover_process_copy no %x remote %d %x %d",
 
 5704               lkid, rc->
rc_header.h_nodeid, remid, result);
 
 5712     if (!is_process_copy(lkb)) {
 
 5713         log_error(ls, 
"dlm_recover_process_copy bad %x remote %d %x %d",
 
 5714               lkid, rc->
rc_header.h_nodeid, remid, result);
 
 5728         log_debug(ls, 
"dlm_recover_process_copy %x remote %d %x %d",
 
 5729               lkid, rc->
rc_header.h_nodeid, remid, result);
 
 5738         log_error(ls, 
"dlm_recover_process_copy %x remote %d %x %d unk",
 
 5739               lkid, rc->
rc_header.h_nodeid, remid, result);
 
 5754              int mode, 
uint32_t flags, 
void *name, 
unsigned int namelen,
 
 5755              unsigned long timeout_cs)
 
 5761     dlm_lock_recovery(ls);
 
 5763     error = create_lkb(ls, &lkb);
 
 5771         if (!ua->
lksb.sb_lvbptr) {
 
 5783     error = set_lock_args(mode, &ua->
lksb, flags, namelen, timeout_cs,
 
 5784                   fake_astfn, ua, fake_bastfn, &args);
 
 5792     error = request_lock(ls, lkb, name, namelen, &args);
 
 5809     spin_lock(&ua->
proc->locks_spin);
 
 5812     spin_unlock(&ua->
proc->locks_spin);
 
 5820              unsigned long timeout_cs)
 
 5827     dlm_lock_recovery(ls);
 
 5829     error = find_lkb(ls, lkid, &lkb);
 
 5840         if (!ua->
lksb.sb_lvbptr) {
 
 5845     if (lvb_in && ua->
lksb.sb_lvbptr)
 
 5855     error = set_lock_args(mode, &ua->
lksb, flags, 0, timeout_cs,
 
 5856                   fake_astfn, ua, fake_bastfn, &args);
 
 5860     error = convert_lock(ls, lkb, &args);
 
 5880     dlm_lock_recovery(ls);
 
 5882     error = find_lkb(ls, lkid, &lkb);
 
 5888     if (lvb_in && ua->
lksb.sb_lvbptr)
 
 5894     error = set_unlock_args(flags, ua, &args);
 
 5898     error = unlock_lock(ls, lkb, &args);
 
 5908     spin_lock(&ua->
proc->locks_spin);
 
 5912     spin_unlock(&ua->
proc->locks_spin);
 
 5929     dlm_lock_recovery(ls);
 
 5931     error = find_lkb(ls, lkid, &lkb);
 
 5940     error = set_unlock_args(flags, ua, &args);
 
 5944     error = cancel_lock(ls, lkb, &args);
 
 5949     if (error == -
EBUSY)
 
 5967     dlm_lock_recovery(ls);
 
 5969     error = find_lkb(ls, lkid, &lkb);
 
 5975     error = set_unlock_args(flags, ua, &args);
 
 5985     error = validate_unlock_args(lkb, &args);
 
 5990     error = _cancel_lock(r, lkb);
 
 5998     if (error == -
EBUSY)
 
 6010 static int orphan_proc_lock(
struct dlm_ls *ls, 
struct dlm_lkb *lkb)
 
 6020     set_unlock_args(0, lkb->
lkb_ua, &args);
 
 6022     error = cancel_lock(ls, lkb, &args);
 
 6031 static int unlock_proc_lock(
struct dlm_ls *ls, 
struct dlm_lkb *lkb)
 
 6038     error = unlock_lock(ls, lkb, &args);
 
 6054     if (list_empty(&proc->
locks))
 
 6083     dlm_lock_recovery(ls);
 
 6086         lkb = del_proc_lock(ls, proc);
 
 6091             orphan_proc_lock(ls, lkb);
 
 6093             unlock_proc_lock(ls, lkb);
 
 6129         if (!list_empty(&proc->
locks)) {
 
 6140         unlock_proc_lock(ls, lkb);
 
 6164 static void do_purge(
struct dlm_ls *ls, 
int nodeid, 
int pid)
 
 6172         unlock_proc_lock(ls, lkb);
 
 6179 static int send_purge(
struct dlm_ls *ls, 
int nodeid, 
int pid)
 
 6182     struct dlm_mhandle *mh;
 
 6185     error = _create_message(ls, 
sizeof(
struct dlm_message), nodeid,
 
 6192     return send_message(mh, ms);
 
 6196            int nodeid, 
int pid)
 
 6201         error = send_purge(ls, nodeid, pid);
 
 6203         dlm_lock_recovery(ls);
 
 6205             purge_proc_locks(ls, proc);
 
 6207             do_purge(ls, nodeid, pid);