target: Fix bug in handling of FILEIO + block_device resize ops
[linux-flexiantxendom0-3.2.10.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
164                "     status %d rqmode %d grmode %d wait_type %d\n",
165                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167                lkb->lkb_grmode, lkb->lkb_wait_type);
168 }
169
170 static void dlm_print_rsb(struct dlm_rsb *r)
171 {
172         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
173                r->res_nodeid, r->res_flags, r->res_first_lkid,
174                r->res_recover_locks_count, r->res_name);
175 }
176
177 void dlm_dump_rsb(struct dlm_rsb *r)
178 {
179         struct dlm_lkb *lkb;
180
181         dlm_print_rsb(r);
182
183         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
184                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
185         printk(KERN_ERR "rsb lookup list\n");
186         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
187                 dlm_print_lkb(lkb);
188         printk(KERN_ERR "rsb grant queue:\n");
189         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
190                 dlm_print_lkb(lkb);
191         printk(KERN_ERR "rsb convert queue:\n");
192         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
193                 dlm_print_lkb(lkb);
194         printk(KERN_ERR "rsb wait queue:\n");
195         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
196                 dlm_print_lkb(lkb);
197 }
198
199 /* Threads cannot use the lockspace while it's being recovered */
200
201 static inline void dlm_lock_recovery(struct dlm_ls *ls)
202 {
203         down_read(&ls->ls_in_recovery);
204 }
205
206 void dlm_unlock_recovery(struct dlm_ls *ls)
207 {
208         up_read(&ls->ls_in_recovery);
209 }
210
211 int dlm_lock_recovery_try(struct dlm_ls *ls)
212 {
213         return down_read_trylock(&ls->ls_in_recovery);
214 }
215
216 static inline int can_be_queued(struct dlm_lkb *lkb)
217 {
218         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
219 }
220
221 static inline int force_blocking_asts(struct dlm_lkb *lkb)
222 {
223         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
224 }
225
226 static inline int is_demoted(struct dlm_lkb *lkb)
227 {
228         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
229 }
230
231 static inline int is_altmode(struct dlm_lkb *lkb)
232 {
233         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
234 }
235
236 static inline int is_granted(struct dlm_lkb *lkb)
237 {
238         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
239 }
240
241 static inline int is_remote(struct dlm_rsb *r)
242 {
243         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
244         return !!r->res_nodeid;
245 }
246
247 static inline int is_process_copy(struct dlm_lkb *lkb)
248 {
249         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
250 }
251
252 static inline int is_master_copy(struct dlm_lkb *lkb)
253 {
254         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
255                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
256         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257 }
258
259 static inline int middle_conversion(struct dlm_lkb *lkb)
260 {
261         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
262             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
263                 return 1;
264         return 0;
265 }
266
267 static inline int down_conversion(struct dlm_lkb *lkb)
268 {
269         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
270 }
271
272 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273 {
274         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
275 }
276
277 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278 {
279         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
280 }
281
282 static inline int is_overlap(struct dlm_lkb *lkb)
283 {
284         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
285                                   DLM_IFL_OVERLAP_CANCEL));
286 }
287
288 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289 {
290         if (is_master_copy(lkb))
291                 return;
292
293         del_timeout(lkb);
294
295         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
298            timeout caused the cancel then return -ETIMEDOUT */
299         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
300                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
301                 rv = -ETIMEDOUT;
302         }
303
304         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
305                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
306                 rv = -EDEADLK;
307         }
308
309         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
310 }
311
312 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
313 {
314         queue_cast(r, lkb,
315                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
316 }
317
318 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319 {
320         if (is_master_copy(lkb)) {
321                 send_bast(r, lkb, rqmode);
322         } else {
323                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
324         }
325 }
326
327 /*
328  * Basic operations on rsb's and lkb's
329  */
330
331 static int pre_rsb_struct(struct dlm_ls *ls)
332 {
333         struct dlm_rsb *r1, *r2;
334         int count = 0;
335
336         spin_lock(&ls->ls_new_rsb_spin);
337         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
338                 spin_unlock(&ls->ls_new_rsb_spin);
339                 return 0;
340         }
341         spin_unlock(&ls->ls_new_rsb_spin);
342
343         r1 = dlm_allocate_rsb(ls);
344         r2 = dlm_allocate_rsb(ls);
345
346         spin_lock(&ls->ls_new_rsb_spin);
347         if (r1) {
348                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
349                 ls->ls_new_rsb_count++;
350         }
351         if (r2) {
352                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
353                 ls->ls_new_rsb_count++;
354         }
355         count = ls->ls_new_rsb_count;
356         spin_unlock(&ls->ls_new_rsb_spin);
357
358         if (!count)
359                 return -ENOMEM;
360         return 0;
361 }
362
363 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
364    unlock any spinlocks, go back and call pre_rsb_struct again.
365    Otherwise, take an rsb off the list and return it. */
366
367 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
368                           struct dlm_rsb **r_ret)
369 {
370         struct dlm_rsb *r;
371         int count;
372
373         spin_lock(&ls->ls_new_rsb_spin);
374         if (list_empty(&ls->ls_new_rsb)) {
375                 count = ls->ls_new_rsb_count;
376                 spin_unlock(&ls->ls_new_rsb_spin);
377                 log_debug(ls, "find_rsb retry %d %d %s",
378                           count, dlm_config.ci_new_rsb_count, name);
379                 return -EAGAIN;
380         }
381
382         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
383         list_del(&r->res_hashchain);
384         /* Convert the empty list_head to a NULL rb_node for tree usage: */
385         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
386         ls->ls_new_rsb_count--;
387         spin_unlock(&ls->ls_new_rsb_spin);
388
389         r->res_ls = ls;
390         r->res_length = len;
391         memcpy(r->res_name, name, len);
392         mutex_init(&r->res_mutex);
393
394         INIT_LIST_HEAD(&r->res_lookup);
395         INIT_LIST_HEAD(&r->res_grantqueue);
396         INIT_LIST_HEAD(&r->res_convertqueue);
397         INIT_LIST_HEAD(&r->res_waitqueue);
398         INIT_LIST_HEAD(&r->res_root_list);
399         INIT_LIST_HEAD(&r->res_recover_list);
400
401         *r_ret = r;
402         return 0;
403 }
404
405 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
406 {
407         char maxname[DLM_RESNAME_MAXLEN];
408
409         memset(maxname, 0, DLM_RESNAME_MAXLEN);
410         memcpy(maxname, name, nlen);
411         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
412 }
413
414 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
415                         unsigned int flags, struct dlm_rsb **r_ret)
416 {
417         struct rb_node *node = tree->rb_node;
418         struct dlm_rsb *r;
419         int error = 0;
420         int rc;
421
422         while (node) {
423                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
424                 rc = rsb_cmp(r, name, len);
425                 if (rc < 0)
426                         node = node->rb_left;
427                 else if (rc > 0)
428                         node = node->rb_right;
429                 else
430                         goto found;
431         }
432         *r_ret = NULL;
433         return -EBADR;
434
435  found:
436         if (r->res_nodeid && (flags & R_MASTER))
437                 error = -ENOTBLK;
438         *r_ret = r;
439         return error;
440 }
441
442 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
443 {
444         struct rb_node **newn = &tree->rb_node;
445         struct rb_node *parent = NULL;
446         int rc;
447
448         while (*newn) {
449                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
450                                                res_hashnode);
451
452                 parent = *newn;
453                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
454                 if (rc < 0)
455                         newn = &parent->rb_left;
456                 else if (rc > 0)
457                         newn = &parent->rb_right;
458                 else {
459                         log_print("rsb_insert match");
460                         dlm_dump_rsb(rsb);
461                         dlm_dump_rsb(cur);
462                         return -EEXIST;
463                 }
464         }
465
466         rb_link_node(&rsb->res_hashnode, parent, newn);
467         rb_insert_color(&rsb->res_hashnode, tree);
468         return 0;
469 }
470
471 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
472                        unsigned int flags, struct dlm_rsb **r_ret)
473 {
474         struct dlm_rsb *r;
475         int error;
476
477         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
478         if (!error) {
479                 kref_get(&r->res_ref);
480                 goto out;
481         }
482         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
483         if (error)
484                 goto out;
485
486         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
487         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
488         if (error)
489                 return error;
490
491         if (dlm_no_directory(ls))
492                 goto out;
493
494         if (r->res_nodeid == -1) {
495                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
496                 r->res_first_lkid = 0;
497         } else if (r->res_nodeid > 0) {
498                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
499                 r->res_first_lkid = 0;
500         } else {
501                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
502                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
503         }
504  out:
505         *r_ret = r;
506         return error;
507 }
508
509 /*
510  * Find rsb in rsbtbl and potentially create/add one
511  *
512  * Delaying the release of rsb's has a similar benefit to applications keeping
513  * NL locks on an rsb, but without the guarantee that the cached master value
514  * will still be valid when the rsb is reused.  Apps aren't always smart enough
515  * to keep NL locks on an rsb that they may lock again shortly; this can lead
516  * to excessive master lookups and removals if we don't delay the release.
517  *
518  * Searching for an rsb means looking through both the normal list and toss
519  * list.  When found on the toss list the rsb is moved to the normal list with
520  * ref count of 1; when found on normal list the ref count is incremented.
521  */
522
523 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
524                     unsigned int flags, struct dlm_rsb **r_ret)
525 {
526         struct dlm_rsb *r = NULL;
527         uint32_t hash, bucket;
528         int error;
529
530         if (namelen > DLM_RESNAME_MAXLEN) {
531                 error = -EINVAL;
532                 goto out;
533         }
534
535         if (dlm_no_directory(ls))
536                 flags |= R_CREATE;
537
538         hash = jhash(name, namelen, 0);
539         bucket = hash & (ls->ls_rsbtbl_size - 1);
540
541  retry:
542         if (flags & R_CREATE) {
543                 error = pre_rsb_struct(ls);
544                 if (error < 0)
545                         goto out;
546         }
547
548         spin_lock(&ls->ls_rsbtbl[bucket].lock);
549
550         error = _search_rsb(ls, name, namelen, bucket, flags, &r);
551         if (!error)
552                 goto out_unlock;
553
554         if (error == -EBADR && !(flags & R_CREATE))
555                 goto out_unlock;
556
557         /* the rsb was found but wasn't a master copy */
558         if (error == -ENOTBLK)
559                 goto out_unlock;
560
561         error = get_rsb_struct(ls, name, namelen, &r);
562         if (error == -EAGAIN) {
563                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
564                 goto retry;
565         }
566         if (error)
567                 goto out_unlock;
568
569         r->res_hash = hash;
570         r->res_bucket = bucket;
571         r->res_nodeid = -1;
572         kref_init(&r->res_ref);
573
574         /* With no directory, the master can be set immediately */
575         if (dlm_no_directory(ls)) {
576                 int nodeid = dlm_dir_nodeid(r);
577                 if (nodeid == dlm_our_nodeid())
578                         nodeid = 0;
579                 r->res_nodeid = nodeid;
580         }
581         error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
582  out_unlock:
583         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
584  out:
585         *r_ret = r;
586         return error;
587 }
588
589 /* This is only called to add a reference when the code already holds
590    a valid reference to the rsb, so there's no need for locking. */
591
592 static inline void hold_rsb(struct dlm_rsb *r)
593 {
594         kref_get(&r->res_ref);
595 }
596
597 void dlm_hold_rsb(struct dlm_rsb *r)
598 {
599         hold_rsb(r);
600 }
601
602 static void toss_rsb(struct kref *kref)
603 {
604         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
605         struct dlm_ls *ls = r->res_ls;
606
607         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
608         kref_init(&r->res_ref);
609         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
610         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
611         r->res_toss_time = jiffies;
612         if (r->res_lvbptr) {
613                 dlm_free_lvb(r->res_lvbptr);
614                 r->res_lvbptr = NULL;
615         }
616 }
617
618 /* When all references to the rsb are gone it's transferred to
619    the tossed list for later disposal. */
620
621 static void put_rsb(struct dlm_rsb *r)
622 {
623         struct dlm_ls *ls = r->res_ls;
624         uint32_t bucket = r->res_bucket;
625
626         spin_lock(&ls->ls_rsbtbl[bucket].lock);
627         kref_put(&r->res_ref, toss_rsb);
628         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
629 }
630
631 void dlm_put_rsb(struct dlm_rsb *r)
632 {
633         put_rsb(r);
634 }
635
636 /* See comment for unhold_lkb */
637
638 static void unhold_rsb(struct dlm_rsb *r)
639 {
640         int rv;
641         rv = kref_put(&r->res_ref, toss_rsb);
642         DLM_ASSERT(!rv, dlm_dump_rsb(r););
643 }
644
645 static void kill_rsb(struct kref *kref)
646 {
647         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
648
649         /* All work is done after the return from kref_put() so we
650            can release the write_lock before the remove and free. */
651
652         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
653         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
654         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
655         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
656         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
657         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
658 }
659
660 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
661    The rsb must exist as long as any lkb's for it do. */
662
663 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
664 {
665         hold_rsb(r);
666         lkb->lkb_resource = r;
667 }
668
669 static void detach_lkb(struct dlm_lkb *lkb)
670 {
671         if (lkb->lkb_resource) {
672                 put_rsb(lkb->lkb_resource);
673                 lkb->lkb_resource = NULL;
674         }
675 }
676
677 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
678 {
679         struct dlm_lkb *lkb;
680         int rv, id;
681
682         lkb = dlm_allocate_lkb(ls);
683         if (!lkb)
684                 return -ENOMEM;
685
686         lkb->lkb_nodeid = -1;
687         lkb->lkb_grmode = DLM_LOCK_IV;
688         kref_init(&lkb->lkb_ref);
689         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
690         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
691         INIT_LIST_HEAD(&lkb->lkb_time_list);
692         INIT_LIST_HEAD(&lkb->lkb_cb_list);
693         mutex_init(&lkb->lkb_cb_mutex);
694         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
695
696  retry:
697         rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
698         if (!rv)
699                 return -ENOMEM;
700
701         spin_lock(&ls->ls_lkbidr_spin);
702         rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
703         if (!rv)
704                 lkb->lkb_id = id;
705         spin_unlock(&ls->ls_lkbidr_spin);
706
707         if (rv == -EAGAIN)
708                 goto retry;
709
710         if (rv < 0) {
711                 log_error(ls, "create_lkb idr error %d", rv);
712                 return rv;
713         }
714
715         *lkb_ret = lkb;
716         return 0;
717 }
718
719 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
720 {
721         struct dlm_lkb *lkb;
722
723         spin_lock(&ls->ls_lkbidr_spin);
724         lkb = idr_find(&ls->ls_lkbidr, lkid);
725         if (lkb)
726                 kref_get(&lkb->lkb_ref);
727         spin_unlock(&ls->ls_lkbidr_spin);
728
729         *lkb_ret = lkb;
730         return lkb ? 0 : -ENOENT;
731 }
732
733 static void kill_lkb(struct kref *kref)
734 {
735         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
736
737         /* All work is done after the return from kref_put() so we
738            can release the write_lock before the detach_lkb */
739
740         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741 }
742
743 /* __put_lkb() is used when an lkb may not have an rsb attached to
744    it so we need to provide the lockspace explicitly */
745
746 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
747 {
748         uint32_t lkid = lkb->lkb_id;
749
750         spin_lock(&ls->ls_lkbidr_spin);
751         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
752                 idr_remove(&ls->ls_lkbidr, lkid);
753                 spin_unlock(&ls->ls_lkbidr_spin);
754
755                 detach_lkb(lkb);
756
757                 /* for local/process lkbs, lvbptr points to caller's lksb */
758                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
759                         dlm_free_lvb(lkb->lkb_lvbptr);
760                 dlm_free_lkb(lkb);
761                 return 1;
762         } else {
763                 spin_unlock(&ls->ls_lkbidr_spin);
764                 return 0;
765         }
766 }
767
768 int dlm_put_lkb(struct dlm_lkb *lkb)
769 {
770         struct dlm_ls *ls;
771
772         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
773         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
774
775         ls = lkb->lkb_resource->res_ls;
776         return __put_lkb(ls, lkb);
777 }
778
779 /* This is only called to add a reference when the code already holds
780    a valid reference to the lkb, so there's no need for locking. */
781
782 static inline void hold_lkb(struct dlm_lkb *lkb)
783 {
784         kref_get(&lkb->lkb_ref);
785 }
786
787 /* This is called when we need to remove a reference and are certain
788    it's not the last ref.  e.g. del_lkb is always called between a
789    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
790    put_lkb would work fine, but would involve unnecessary locking */
791
792 static inline void unhold_lkb(struct dlm_lkb *lkb)
793 {
794         int rv;
795         rv = kref_put(&lkb->lkb_ref, kill_lkb);
796         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
797 }
798
799 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
800                             int mode)
801 {
802         struct dlm_lkb *lkb = NULL;
803
804         list_for_each_entry(lkb, head, lkb_statequeue)
805                 if (lkb->lkb_rqmode < mode)
806                         break;
807
808         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
809 }
810
811 /* add/remove lkb to rsb's grant/convert/wait queue */
812
813 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
814 {
815         kref_get(&lkb->lkb_ref);
816
817         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
818
819         lkb->lkb_timestamp = ktime_get();
820
821         lkb->lkb_status = status;
822
823         switch (status) {
824         case DLM_LKSTS_WAITING:
825                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
826                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
827                 else
828                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
829                 break;
830         case DLM_LKSTS_GRANTED:
831                 /* convention says granted locks kept in order of grmode */
832                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
833                                 lkb->lkb_grmode);
834                 break;
835         case DLM_LKSTS_CONVERT:
836                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
837                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
838                 else
839                         list_add_tail(&lkb->lkb_statequeue,
840                                       &r->res_convertqueue);
841                 break;
842         default:
843                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
844         }
845 }
846
847 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
848 {
849         lkb->lkb_status = 0;
850         list_del(&lkb->lkb_statequeue);
851         unhold_lkb(lkb);
852 }
853
854 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
855 {
856         hold_lkb(lkb);
857         del_lkb(r, lkb);
858         add_lkb(r, lkb, sts);
859         unhold_lkb(lkb);
860 }
861
862 static int msg_reply_type(int mstype)
863 {
864         switch (mstype) {
865         case DLM_MSG_REQUEST:
866                 return DLM_MSG_REQUEST_REPLY;
867         case DLM_MSG_CONVERT:
868                 return DLM_MSG_CONVERT_REPLY;
869         case DLM_MSG_UNLOCK:
870                 return DLM_MSG_UNLOCK_REPLY;
871         case DLM_MSG_CANCEL:
872                 return DLM_MSG_CANCEL_REPLY;
873         case DLM_MSG_LOOKUP:
874                 return DLM_MSG_LOOKUP_REPLY;
875         }
876         return -1;
877 }
878
879 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
880 {
881         int i;
882
883         for (i = 0; i < num_nodes; i++) {
884                 if (!warned[i]) {
885                         warned[i] = nodeid;
886                         return 0;
887                 }
888                 if (warned[i] == nodeid)
889                         return 1;
890         }
891         return 0;
892 }
893
894 void dlm_scan_waiters(struct dlm_ls *ls)
895 {
896         struct dlm_lkb *lkb;
897         ktime_t zero = ktime_set(0, 0);
898         s64 us;
899         s64 debug_maxus = 0;
900         u32 debug_scanned = 0;
901         u32 debug_expired = 0;
902         int num_nodes = 0;
903         int *warned = NULL;
904
905         if (!dlm_config.ci_waitwarn_us)
906                 return;
907
908         mutex_lock(&ls->ls_waiters_mutex);
909
910         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
911                 if (ktime_equal(lkb->lkb_wait_time, zero))
912                         continue;
913
914                 debug_scanned++;
915
916                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
917
918                 if (us < dlm_config.ci_waitwarn_us)
919                         continue;
920
921                 lkb->lkb_wait_time = zero;
922
923                 debug_expired++;
924                 if (us > debug_maxus)
925                         debug_maxus = us;
926
927                 if (!num_nodes) {
928                         num_nodes = ls->ls_num_nodes;
929                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
930                 }
931                 if (!warned)
932                         continue;
933                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
934                         continue;
935
936                 log_error(ls, "waitwarn %x %lld %d us check connection to "
937                           "node %d", lkb->lkb_id, (long long)us,
938                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
939         }
940         mutex_unlock(&ls->ls_waiters_mutex);
941         kfree(warned);
942
943         if (debug_expired)
944                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
945                           debug_scanned, debug_expired,
946                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
947 }
948
949 /* add/remove lkb from global waiters list of lkb's waiting for
950    a reply from a remote node */
951
952 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
953 {
954         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
955         int error = 0;
956
957         mutex_lock(&ls->ls_waiters_mutex);
958
959         if (is_overlap_unlock(lkb) ||
960             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
961                 error = -EINVAL;
962                 goto out;
963         }
964
965         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
966                 switch (mstype) {
967                 case DLM_MSG_UNLOCK:
968                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
969                         break;
970                 case DLM_MSG_CANCEL:
971                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
972                         break;
973                 default:
974                         error = -EBUSY;
975                         goto out;
976                 }
977                 lkb->lkb_wait_count++;
978                 hold_lkb(lkb);
979
980                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
981                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
982                           lkb->lkb_wait_count, lkb->lkb_flags);
983                 goto out;
984         }
985
986         DLM_ASSERT(!lkb->lkb_wait_count,
987                    dlm_print_lkb(lkb);
988                    printk("wait_count %d\n", lkb->lkb_wait_count););
989
990         lkb->lkb_wait_count++;
991         lkb->lkb_wait_type = mstype;
992         lkb->lkb_wait_time = ktime_get();
993         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
994         hold_lkb(lkb);
995         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
996  out:
997         if (error)
998                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
999                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1000                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1001         mutex_unlock(&ls->ls_waiters_mutex);
1002         return error;
1003 }
1004
1005 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1006    list as part of process_requestqueue (e.g. a lookup that has an optimized
1007    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1008    set RESEND and dlm_recover_waiters_post() */
1009
1010 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1011                                 struct dlm_message *ms)
1012 {
1013         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1014         int overlap_done = 0;
1015
1016         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1017                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1018                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1019                 overlap_done = 1;
1020                 goto out_del;
1021         }
1022
1023         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1024                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1025                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1026                 overlap_done = 1;
1027                 goto out_del;
1028         }
1029
1030         /* Cancel state was preemptively cleared by a successful convert,
1031            see next comment, nothing to do. */
1032
1033         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1034             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1035                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1036                           lkb->lkb_id, lkb->lkb_wait_type);
1037                 return -1;
1038         }
1039
1040         /* Remove for the convert reply, and premptively remove for the
1041            cancel reply.  A convert has been granted while there's still
1042            an outstanding cancel on it (the cancel is moot and the result
1043            in the cancel reply should be 0).  We preempt the cancel reply
1044            because the app gets the convert result and then can follow up
1045            with another op, like convert.  This subsequent op would see the
1046            lingering state of the cancel and fail with -EBUSY. */
1047
1048         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1049             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1050             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1051                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1052                           lkb->lkb_id);
1053                 lkb->lkb_wait_type = 0;
1054                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1055                 lkb->lkb_wait_count--;
1056                 goto out_del;
1057         }
1058
1059         /* N.B. type of reply may not always correspond to type of original
1060            msg due to lookup->request optimization, verify others? */
1061
1062         if (lkb->lkb_wait_type) {
1063                 lkb->lkb_wait_type = 0;
1064                 goto out_del;
1065         }
1066
1067         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1068                   lkb->lkb_id, mstype, lkb->lkb_flags);
1069         return -1;
1070
1071  out_del:
1072         /* the force-unlock/cancel has completed and we haven't recvd a reply
1073            to the op that was in progress prior to the unlock/cancel; we
1074            give up on any reply to the earlier op.  FIXME: not sure when/how
1075            this would happen */
1076
1077         if (overlap_done && lkb->lkb_wait_type) {
1078                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1079                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1080                 lkb->lkb_wait_count--;
1081                 lkb->lkb_wait_type = 0;
1082         }
1083
1084         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1085
1086         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1087         lkb->lkb_wait_count--;
1088         if (!lkb->lkb_wait_count)
1089                 list_del_init(&lkb->lkb_wait_reply);
1090         unhold_lkb(lkb);
1091         return 0;
1092 }
1093
1094 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1095 {
1096         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1097         int error;
1098
1099         mutex_lock(&ls->ls_waiters_mutex);
1100         error = _remove_from_waiters(lkb, mstype, NULL);
1101         mutex_unlock(&ls->ls_waiters_mutex);
1102         return error;
1103 }
1104
1105 /* Handles situations where we might be processing a "fake" or "stub" reply in
1106    which we can't try to take waiters_mutex again. */
1107
1108 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1109 {
1110         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1111         int error;
1112
1113         if (ms->m_flags != DLM_IFL_STUB_MS)
1114                 mutex_lock(&ls->ls_waiters_mutex);
1115         error = _remove_from_waiters(lkb, ms->m_type, ms);
1116         if (ms->m_flags != DLM_IFL_STUB_MS)
1117                 mutex_unlock(&ls->ls_waiters_mutex);
1118         return error;
1119 }
1120
1121 static void dir_remove(struct dlm_rsb *r)
1122 {
1123         int to_nodeid;
1124
1125         if (dlm_no_directory(r->res_ls))
1126                 return;
1127
1128         to_nodeid = dlm_dir_nodeid(r);
1129         if (to_nodeid != dlm_our_nodeid())
1130                 send_remove(r);
1131         else
1132                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1133                                      r->res_name, r->res_length);
1134 }
1135
1136 /* FIXME: make this more efficient */
1137
1138 static int shrink_bucket(struct dlm_ls *ls, int b)
1139 {
1140         struct rb_node *n;
1141         struct dlm_rsb *r;
1142         int count = 0, found;
1143
1144         for (;;) {
1145                 found = 0;
1146                 spin_lock(&ls->ls_rsbtbl[b].lock);
1147                 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1148                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1149                         if (!time_after_eq(jiffies, r->res_toss_time +
1150                                            dlm_config.ci_toss_secs * HZ))
1151                                 continue;
1152                         found = 1;
1153                         break;
1154                 }
1155
1156                 if (!found) {
1157                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1158                         break;
1159                 }
1160
1161                 if (kref_put(&r->res_ref, kill_rsb)) {
1162                         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1163                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1164
1165                         if (is_master(r))
1166                                 dir_remove(r);
1167                         dlm_free_rsb(r);
1168                         count++;
1169                 } else {
1170                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1171                         log_error(ls, "tossed rsb in use %s", r->res_name);
1172                 }
1173         }
1174
1175         return count;
1176 }
1177
1178 void dlm_scan_rsbs(struct dlm_ls *ls)
1179 {
1180         int i;
1181
1182         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1183                 shrink_bucket(ls, i);
1184                 if (dlm_locking_stopped(ls))
1185                         break;
1186                 cond_resched();
1187         }
1188 }
1189
1190 static void add_timeout(struct dlm_lkb *lkb)
1191 {
1192         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1193
1194         if (is_master_copy(lkb))
1195                 return;
1196
1197         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1198             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1199                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1200                 goto add_it;
1201         }
1202         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1203                 goto add_it;
1204         return;
1205
1206  add_it:
1207         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1208         mutex_lock(&ls->ls_timeout_mutex);
1209         hold_lkb(lkb);
1210         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1211         mutex_unlock(&ls->ls_timeout_mutex);
1212 }
1213
1214 static void del_timeout(struct dlm_lkb *lkb)
1215 {
1216         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1217
1218         mutex_lock(&ls->ls_timeout_mutex);
1219         if (!list_empty(&lkb->lkb_time_list)) {
1220                 list_del_init(&lkb->lkb_time_list);
1221                 unhold_lkb(lkb);
1222         }
1223         mutex_unlock(&ls->ls_timeout_mutex);
1224 }
1225
1226 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1227    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1228    and then lock rsb because of lock ordering in add_timeout.  We may need
1229    to specify some special timeout-related bits in the lkb that are just to
1230    be accessed under the timeout_mutex. */
1231
1232 void dlm_scan_timeout(struct dlm_ls *ls)
1233 {
1234         struct dlm_rsb *r;
1235         struct dlm_lkb *lkb;
1236         int do_cancel, do_warn;
1237         s64 wait_us;
1238
1239         for (;;) {
1240                 if (dlm_locking_stopped(ls))
1241                         break;
1242
1243                 do_cancel = 0;
1244                 do_warn = 0;
1245                 mutex_lock(&ls->ls_timeout_mutex);
1246                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1247
1248                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1249                                                         lkb->lkb_timestamp));
1250
1251                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1252                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1253                                 do_cancel = 1;
1254
1255                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1256                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1257                                 do_warn = 1;
1258
1259                         if (!do_cancel && !do_warn)
1260                                 continue;
1261                         hold_lkb(lkb);
1262                         break;
1263                 }
1264                 mutex_unlock(&ls->ls_timeout_mutex);
1265
1266                 if (!do_cancel && !do_warn)
1267                         break;
1268
1269                 r = lkb->lkb_resource;
1270                 hold_rsb(r);
1271                 lock_rsb(r);
1272
1273                 if (do_warn) {
1274                         /* clear flag so we only warn once */
1275                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1276                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1277                                 del_timeout(lkb);
1278                         dlm_timeout_warn(lkb);
1279                 }
1280
1281                 if (do_cancel) {
1282                         log_debug(ls, "timeout cancel %x node %d %s",
1283                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1284                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1285                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1286                         del_timeout(lkb);
1287                         _cancel_lock(r, lkb);
1288                 }
1289
1290                 unlock_rsb(r);
1291                 unhold_rsb(r);
1292                 dlm_put_lkb(lkb);
1293         }
1294 }
1295
1296 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1297    dlm_recoverd before checking/setting ls_recover_begin. */
1298
1299 void dlm_adjust_timeouts(struct dlm_ls *ls)
1300 {
1301         struct dlm_lkb *lkb;
1302         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1303
1304         ls->ls_recover_begin = 0;
1305         mutex_lock(&ls->ls_timeout_mutex);
1306         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1307                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1308         mutex_unlock(&ls->ls_timeout_mutex);
1309
1310         if (!dlm_config.ci_waitwarn_us)
1311                 return;
1312
1313         mutex_lock(&ls->ls_waiters_mutex);
1314         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1315                 if (ktime_to_us(lkb->lkb_wait_time))
1316                         lkb->lkb_wait_time = ktime_get();
1317         }
1318         mutex_unlock(&ls->ls_waiters_mutex);
1319 }
1320
1321 /* lkb is master or local copy */
1322
1323 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1324 {
1325         int b, len = r->res_ls->ls_lvblen;
1326
1327         /* b=1 lvb returned to caller
1328            b=0 lvb written to rsb or invalidated
1329            b=-1 do nothing */
1330
1331         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1332
1333         if (b == 1) {
1334                 if (!lkb->lkb_lvbptr)
1335                         return;
1336
1337                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1338                         return;
1339
1340                 if (!r->res_lvbptr)
1341                         return;
1342
1343                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1344                 lkb->lkb_lvbseq = r->res_lvbseq;
1345
1346         } else if (b == 0) {
1347                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1348                         rsb_set_flag(r, RSB_VALNOTVALID);
1349                         return;
1350                 }
1351
1352                 if (!lkb->lkb_lvbptr)
1353                         return;
1354
1355                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1356                         return;
1357
1358                 if (!r->res_lvbptr)
1359                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1360
1361                 if (!r->res_lvbptr)
1362                         return;
1363
1364                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1365                 r->res_lvbseq++;
1366                 lkb->lkb_lvbseq = r->res_lvbseq;
1367                 rsb_clear_flag(r, RSB_VALNOTVALID);
1368         }
1369
1370         if (rsb_flag(r, RSB_VALNOTVALID))
1371                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1372 }
1373
1374 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1375 {
1376         if (lkb->lkb_grmode < DLM_LOCK_PW)
1377                 return;
1378
1379         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1380                 rsb_set_flag(r, RSB_VALNOTVALID);
1381                 return;
1382         }
1383
1384         if (!lkb->lkb_lvbptr)
1385                 return;
1386
1387         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1388                 return;
1389
1390         if (!r->res_lvbptr)
1391                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1392
1393         if (!r->res_lvbptr)
1394                 return;
1395
1396         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1397         r->res_lvbseq++;
1398         rsb_clear_flag(r, RSB_VALNOTVALID);
1399 }
1400
1401 /* lkb is process copy (pc) */
1402
1403 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1404                             struct dlm_message *ms)
1405 {
1406         int b;
1407
1408         if (!lkb->lkb_lvbptr)
1409                 return;
1410
1411         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1412                 return;
1413
1414         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1415         if (b == 1) {
1416                 int len = receive_extralen(ms);
1417                 if (len > DLM_RESNAME_MAXLEN)
1418                         len = DLM_RESNAME_MAXLEN;
1419                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1420                 lkb->lkb_lvbseq = ms->m_lvbseq;
1421         }
1422 }
1423
1424 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1425    remove_lock -- used for unlock, removes lkb from granted
1426    revert_lock -- used for cancel, moves lkb from convert to granted
1427    grant_lock  -- used for request and convert, adds lkb to granted or
1428                   moves lkb from convert or waiting to granted
1429
1430    Each of these is used for master or local copy lkb's.  There is
1431    also a _pc() variation used to make the corresponding change on
1432    a process copy (pc) lkb. */
1433
1434 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1435 {
1436         del_lkb(r, lkb);
1437         lkb->lkb_grmode = DLM_LOCK_IV;
1438         /* this unhold undoes the original ref from create_lkb()
1439            so this leads to the lkb being freed */
1440         unhold_lkb(lkb);
1441 }
1442
1443 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1444 {
1445         set_lvb_unlock(r, lkb);
1446         _remove_lock(r, lkb);
1447 }
1448
1449 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1450 {
1451         _remove_lock(r, lkb);
1452 }
1453
1454 /* returns: 0 did nothing
1455             1 moved lock to granted
1456            -1 removed lock */
1457
1458 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1459 {
1460         int rv = 0;
1461
1462         lkb->lkb_rqmode = DLM_LOCK_IV;
1463
1464         switch (lkb->lkb_status) {
1465         case DLM_LKSTS_GRANTED:
1466                 break;
1467         case DLM_LKSTS_CONVERT:
1468                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1469                 rv = 1;
1470                 break;
1471         case DLM_LKSTS_WAITING:
1472                 del_lkb(r, lkb);
1473                 lkb->lkb_grmode = DLM_LOCK_IV;
1474                 /* this unhold undoes the original ref from create_lkb()
1475                    so this leads to the lkb being freed */
1476                 unhold_lkb(lkb);
1477                 rv = -1;
1478                 break;
1479         default:
1480                 log_print("invalid status for revert %d", lkb->lkb_status);
1481         }
1482         return rv;
1483 }
1484
1485 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1486 {
1487         return revert_lock(r, lkb);
1488 }
1489
1490 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1491 {
1492         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1493                 lkb->lkb_grmode = lkb->lkb_rqmode;
1494                 if (lkb->lkb_status)
1495                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1496                 else
1497                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1498         }
1499
1500         lkb->lkb_rqmode = DLM_LOCK_IV;
1501 }
1502
1503 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1504 {
1505         set_lvb_lock(r, lkb);
1506         _grant_lock(r, lkb);
1507         lkb->lkb_highbast = 0;
1508 }
1509
1510 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1511                           struct dlm_message *ms)
1512 {
1513         set_lvb_lock_pc(r, lkb, ms);
1514         _grant_lock(r, lkb);
1515 }
1516
1517 /* called by grant_pending_locks() which means an async grant message must
1518    be sent to the requesting node in addition to granting the lock if the
1519    lkb belongs to a remote node. */
1520
1521 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1522 {
1523         grant_lock(r, lkb);
1524         if (is_master_copy(lkb))
1525                 send_grant(r, lkb);
1526         else
1527                 queue_cast(r, lkb, 0);
1528 }
1529
1530 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1531    change the granted/requested modes.  We're munging things accordingly in
1532    the process copy.
1533    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1534    conversion deadlock
1535    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1536    compatible with other granted locks */
1537
1538 static void munge_demoted(struct dlm_lkb *lkb)
1539 {
1540         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1541                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1542                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1543                 return;
1544         }
1545
1546         lkb->lkb_grmode = DLM_LOCK_NL;
1547 }
1548
1549 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1550 {
1551         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1552             ms->m_type != DLM_MSG_GRANT) {
1553                 log_print("munge_altmode %x invalid reply type %d",
1554                           lkb->lkb_id, ms->m_type);
1555                 return;
1556         }
1557
1558         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1559                 lkb->lkb_rqmode = DLM_LOCK_PR;
1560         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1561                 lkb->lkb_rqmode = DLM_LOCK_CW;
1562         else {
1563                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1564                 dlm_print_lkb(lkb);
1565         }
1566 }
1567
1568 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1569 {
1570         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1571                                            lkb_statequeue);
1572         if (lkb->lkb_id == first->lkb_id)
1573                 return 1;
1574
1575         return 0;
1576 }
1577
1578 /* Check if the given lkb conflicts with another lkb on the queue. */
1579
1580 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1581 {
1582         struct dlm_lkb *this;
1583
1584         list_for_each_entry(this, head, lkb_statequeue) {
1585                 if (this == lkb)
1586                         continue;
1587                 if (!modes_compat(this, lkb))
1588                         return 1;
1589         }
1590         return 0;
1591 }
1592
1593 /*
1594  * "A conversion deadlock arises with a pair of lock requests in the converting
1595  * queue for one resource.  The granted mode of each lock blocks the requested
1596  * mode of the other lock."
1597  *
1598  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1599  * convert queue from being granted, then deadlk/demote lkb.
1600  *
1601  * Example:
1602  * Granted Queue: empty
1603  * Convert Queue: NL->EX (first lock)
1604  *                PR->EX (second lock)
1605  *
1606  * The first lock can't be granted because of the granted mode of the second
1607  * lock and the second lock can't be granted because it's not first in the
1608  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1609  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1610  * flag set and return DEMOTED in the lksb flags.
1611  *
1612  * Originally, this function detected conv-deadlk in a more limited scope:
1613  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1614  * - if lkb1 was the first entry in the queue (not just earlier), and was
1615  *   blocked by the granted mode of lkb2, and there was nothing on the
1616  *   granted queue preventing lkb1 from being granted immediately, i.e.
1617  *   lkb2 was the only thing preventing lkb1 from being granted.
1618  *
1619  * That second condition meant we'd only say there was conv-deadlk if
1620  * resolving it (by demotion) would lead to the first lock on the convert
1621  * queue being granted right away.  It allowed conversion deadlocks to exist
1622  * between locks on the convert queue while they couldn't be granted anyway.
1623  *
1624  * Now, we detect and take action on conversion deadlocks immediately when
1625  * they're created, even if they may not be immediately consequential.  If
1626  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1627  * mode that would prevent lkb1's conversion from being granted, we do a
1628  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1629  * I think this means that the lkb_is_ahead condition below should always
1630  * be zero, i.e. there will never be conv-deadlk between two locks that are
1631  * both already on the convert queue.
1632  */
1633
1634 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1635 {
1636         struct dlm_lkb *lkb1;
1637         int lkb_is_ahead = 0;
1638
1639         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1640                 if (lkb1 == lkb2) {
1641                         lkb_is_ahead = 1;
1642                         continue;
1643                 }
1644
1645                 if (!lkb_is_ahead) {
1646                         if (!modes_compat(lkb2, lkb1))
1647                                 return 1;
1648                 } else {
1649                         if (!modes_compat(lkb2, lkb1) &&
1650                             !modes_compat(lkb1, lkb2))
1651                                 return 1;
1652                 }
1653         }
1654         return 0;
1655 }
1656
1657 /*
1658  * Return 1 if the lock can be granted, 0 otherwise.
1659  * Also detect and resolve conversion deadlocks.
1660  *
1661  * lkb is the lock to be granted
1662  *
1663  * now is 1 if the function is being called in the context of the
1664  * immediate request, it is 0 if called later, after the lock has been
1665  * queued.
1666  *
1667  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1668  */
1669
1670 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1671 {
1672         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1673
1674         /*
1675          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1676          * a new request for a NL mode lock being blocked.
1677          *
1678          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1679          * request, then it would be granted.  In essence, the use of this flag
1680          * tells the Lock Manager to expedite theis request by not considering
1681          * what may be in the CONVERTING or WAITING queues...  As of this
1682          * writing, the EXPEDITE flag can be used only with new requests for NL
1683          * mode locks.  This flag is not valid for conversion requests.
1684          *
1685          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1686          * conversion or used with a non-NL requested mode.  We also know an
1687          * EXPEDITE request is always granted immediately, so now must always
1688          * be 1.  The full condition to grant an expedite request: (now &&
1689          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1690          * therefore be shortened to just checking the flag.
1691          */
1692
1693         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1694                 return 1;
1695
1696         /*
1697          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1698          * added to the remaining conditions.
1699          */
1700
1701         if (queue_conflict(&r->res_grantqueue, lkb))
1702                 goto out;
1703
1704         /*
1705          * 6-3: By default, a conversion request is immediately granted if the
1706          * requested mode is compatible with the modes of all other granted
1707          * locks
1708          */
1709
1710         if (queue_conflict(&r->res_convertqueue, lkb))
1711                 goto out;
1712
1713         /*
1714          * 6-5: But the default algorithm for deciding whether to grant or
1715          * queue conversion requests does not by itself guarantee that such
1716          * requests are serviced on a "first come first serve" basis.  This, in
1717          * turn, can lead to a phenomenon known as "indefinate postponement".
1718          *
1719          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1720          * the system service employed to request a lock conversion.  This flag
1721          * forces certain conversion requests to be queued, even if they are
1722          * compatible with the granted modes of other locks on the same
1723          * resource.  Thus, the use of this flag results in conversion requests
1724          * being ordered on a "first come first servce" basis.
1725          *
1726          * DCT: This condition is all about new conversions being able to occur
1727          * "in place" while the lock remains on the granted queue (assuming
1728          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1729          * doesn't _have_ to go onto the convert queue where it's processed in
1730          * order.  The "now" variable is necessary to distinguish converts
1731          * being received and processed for the first time now, because once a
1732          * convert is moved to the conversion queue the condition below applies
1733          * requiring fifo granting.
1734          */
1735
1736         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1737                 return 1;
1738
1739         /*
1740          * The NOORDER flag is set to avoid the standard vms rules on grant
1741          * order.
1742          */
1743
1744         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1745                 return 1;
1746
1747         /*
1748          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1749          * granted until all other conversion requests ahead of it are granted
1750          * and/or canceled.
1751          */
1752
1753         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1754                 return 1;
1755
1756         /*
1757          * 6-4: By default, a new request is immediately granted only if all
1758          * three of the following conditions are satisfied when the request is
1759          * issued:
1760          * - The queue of ungranted conversion requests for the resource is
1761          *   empty.
1762          * - The queue of ungranted new requests for the resource is empty.
1763          * - The mode of the new request is compatible with the most
1764          *   restrictive mode of all granted locks on the resource.
1765          */
1766
1767         if (now && !conv && list_empty(&r->res_convertqueue) &&
1768             list_empty(&r->res_waitqueue))
1769                 return 1;
1770
1771         /*
1772          * 6-4: Once a lock request is in the queue of ungranted new requests,
1773          * it cannot be granted until the queue of ungranted conversion
1774          * requests is empty, all ungranted new requests ahead of it are
1775          * granted and/or canceled, and it is compatible with the granted mode
1776          * of the most restrictive lock granted on the resource.
1777          */
1778
1779         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1780             first_in_list(lkb, &r->res_waitqueue))
1781                 return 1;
1782  out:
1783         return 0;
1784 }
1785
1786 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1787                           int *err)
1788 {
1789         int rv;
1790         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1791         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1792
1793         if (err)
1794                 *err = 0;
1795
1796         rv = _can_be_granted(r, lkb, now);
1797         if (rv)
1798                 goto out;
1799
1800         /*
1801          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1802          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1803          * cancels one of the locks.
1804          */
1805
1806         if (is_convert && can_be_queued(lkb) &&
1807             conversion_deadlock_detect(r, lkb)) {
1808                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1809                         lkb->lkb_grmode = DLM_LOCK_NL;
1810                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1811                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1812                         if (err)
1813                                 *err = -EDEADLK;
1814                         else {
1815                                 log_print("can_be_granted deadlock %x now %d",
1816                                           lkb->lkb_id, now);
1817                                 dlm_dump_rsb(r);
1818                         }
1819                 }
1820                 goto out;
1821         }
1822
1823         /*
1824          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1825          * to grant a request in a mode other than the normal rqmode.  It's a
1826          * simple way to provide a big optimization to applications that can
1827          * use them.
1828          */
1829
1830         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1831                 alt = DLM_LOCK_PR;
1832         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1833                 alt = DLM_LOCK_CW;
1834
1835         if (alt) {
1836                 lkb->lkb_rqmode = alt;
1837                 rv = _can_be_granted(r, lkb, now);
1838                 if (rv)
1839                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1840                 else
1841                         lkb->lkb_rqmode = rqmode;
1842         }
1843  out:
1844         return rv;
1845 }
1846
1847 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1848    for locks pending on the convert list.  Once verified (watch for these
1849    log_prints), we should be able to just call _can_be_granted() and not
1850    bother with the demote/deadlk cases here (and there's no easy way to deal
1851    with a deadlk here, we'd have to generate something like grant_lock with
1852    the deadlk error.) */
1853
1854 /* Returns the highest requested mode of all blocked conversions; sets
1855    cw if there's a blocked conversion to DLM_LOCK_CW. */
1856
1857 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1858 {
1859         struct dlm_lkb *lkb, *s;
1860         int hi, demoted, quit, grant_restart, demote_restart;
1861         int deadlk;
1862
1863         quit = 0;
1864  restart:
1865         grant_restart = 0;
1866         demote_restart = 0;
1867         hi = DLM_LOCK_IV;
1868
1869         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1870                 demoted = is_demoted(lkb);
1871                 deadlk = 0;
1872
1873                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1874                         grant_lock_pending(r, lkb);
1875                         grant_restart = 1;
1876                         continue;
1877                 }
1878
1879                 if (!demoted && is_demoted(lkb)) {
1880                         log_print("WARN: pending demoted %x node %d %s",
1881                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1882                         demote_restart = 1;
1883                         continue;
1884                 }
1885
1886                 if (deadlk) {
1887                         log_print("WARN: pending deadlock %x node %d %s",
1888                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1889                         dlm_dump_rsb(r);
1890                         continue;
1891                 }
1892
1893                 hi = max_t(int, lkb->lkb_rqmode, hi);
1894
1895                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1896                         *cw = 1;
1897         }
1898
1899         if (grant_restart)
1900                 goto restart;
1901         if (demote_restart && !quit) {
1902                 quit = 1;
1903                 goto restart;
1904         }
1905
1906         return max_t(int, high, hi);
1907 }
1908
1909 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1910 {
1911         struct dlm_lkb *lkb, *s;
1912
1913         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1914                 if (can_be_granted(r, lkb, 0, NULL))
1915                         grant_lock_pending(r, lkb);
1916                 else {
1917                         high = max_t(int, lkb->lkb_rqmode, high);
1918                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1919                                 *cw = 1;
1920                 }
1921         }
1922
1923         return high;
1924 }
1925
1926 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1927    on either the convert or waiting queue.
1928    high is the largest rqmode of all locks blocked on the convert or
1929    waiting queue. */
1930
1931 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1932 {
1933         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1934                 if (gr->lkb_highbast < DLM_LOCK_EX)
1935                         return 1;
1936                 return 0;
1937         }
1938
1939         if (gr->lkb_highbast < high &&
1940             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1941                 return 1;
1942         return 0;
1943 }
1944
1945 static void grant_pending_locks(struct dlm_rsb *r)
1946 {
1947         struct dlm_lkb *lkb, *s;
1948         int high = DLM_LOCK_IV;
1949         int cw = 0;
1950
1951         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1952
1953         high = grant_pending_convert(r, high, &cw);
1954         high = grant_pending_wait(r, high, &cw);
1955
1956         if (high == DLM_LOCK_IV)
1957                 return;
1958
1959         /*
1960          * If there are locks left on the wait/convert queue then send blocking
1961          * ASTs to granted locks based on the largest requested mode (high)
1962          * found above.
1963          */
1964
1965         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1966                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1967                         if (cw && high == DLM_LOCK_PR &&
1968                             lkb->lkb_grmode == DLM_LOCK_PR)
1969                                 queue_bast(r, lkb, DLM_LOCK_CW);
1970                         else
1971                                 queue_bast(r, lkb, high);
1972                         lkb->lkb_highbast = high;
1973                 }
1974         }
1975 }
1976
1977 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1978 {
1979         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1980             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1981                 if (gr->lkb_highbast < DLM_LOCK_EX)
1982                         return 1;
1983                 return 0;
1984         }
1985
1986         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1987                 return 1;
1988         return 0;
1989 }
1990
1991 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1992                             struct dlm_lkb *lkb)
1993 {
1994         struct dlm_lkb *gr;
1995
1996         list_for_each_entry(gr, head, lkb_statequeue) {
1997                 /* skip self when sending basts to convertqueue */
1998                 if (gr == lkb)
1999                         continue;
2000                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2001                         queue_bast(r, gr, lkb->lkb_rqmode);
2002                         gr->lkb_highbast = lkb->lkb_rqmode;
2003                 }
2004         }
2005 }
2006
2007 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2008 {
2009         send_bast_queue(r, &r->res_grantqueue, lkb);
2010 }
2011
2012 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2013 {
2014         send_bast_queue(r, &r->res_grantqueue, lkb);
2015         send_bast_queue(r, &r->res_convertqueue, lkb);
2016 }
2017
2018 /* set_master(r, lkb) -- set the master nodeid of a resource
2019
2020    The purpose of this function is to set the nodeid field in the given
2021    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2022    known, it can just be copied to the lkb and the function will return
2023    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2024    before it can be copied to the lkb.
2025
2026    When the rsb nodeid is being looked up remotely, the initial lkb
2027    causing the lookup is kept on the ls_waiters list waiting for the
2028    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2029    on the rsb's res_lookup list until the master is verified.
2030
2031    Return values:
2032    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2033    1: the rsb master is not available and the lkb has been placed on
2034       a wait queue
2035 */
2036
2037 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2038 {
2039         struct dlm_ls *ls = r->res_ls;
2040         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
2041
2042         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2043                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2044                 r->res_first_lkid = lkb->lkb_id;
2045                 lkb->lkb_nodeid = r->res_nodeid;
2046                 return 0;
2047         }
2048
2049         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2050                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2051                 return 1;
2052         }
2053
2054         if (r->res_nodeid == 0) {
2055                 lkb->lkb_nodeid = 0;
2056                 return 0;
2057         }
2058
2059         if (r->res_nodeid > 0) {
2060                 lkb->lkb_nodeid = r->res_nodeid;
2061                 return 0;
2062         }
2063
2064         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
2065
2066         dir_nodeid = dlm_dir_nodeid(r);
2067
2068         if (dir_nodeid != our_nodeid) {
2069                 r->res_first_lkid = lkb->lkb_id;
2070                 send_lookup(r, lkb);
2071                 return 1;
2072         }
2073
2074         for (i = 0; i < 2; i++) {
2075                 /* It's possible for dlm_scand to remove an old rsb for
2076                    this same resource from the toss list, us to create
2077                    a new one, look up the master locally, and find it
2078                    already exists just before dlm_scand does the
2079                    dir_remove() on the previous rsb. */
2080
2081                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2082                                        r->res_length, &ret_nodeid);
2083                 if (!error)
2084                         break;
2085                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2086                 schedule();
2087         }
2088         if (error && error != -EEXIST)
2089                 return error;
2090
2091         if (ret_nodeid == our_nodeid) {
2092                 r->res_first_lkid = 0;
2093                 r->res_nodeid = 0;
2094                 lkb->lkb_nodeid = 0;
2095         } else {
2096                 r->res_first_lkid = lkb->lkb_id;
2097                 r->res_nodeid = ret_nodeid;
2098                 lkb->lkb_nodeid = ret_nodeid;
2099         }
2100         return 0;
2101 }
2102
2103 static void process_lookup_list(struct dlm_rsb *r)
2104 {
2105         struct dlm_lkb *lkb, *safe;
2106
2107         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2108                 list_del_init(&lkb->lkb_rsb_lookup);
2109                 _request_lock(r, lkb);
2110                 schedule();
2111         }
2112 }
2113
2114 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2115
2116 static void confirm_master(struct dlm_rsb *r, int error)
2117 {
2118         struct dlm_lkb *lkb;
2119
2120         if (!r->res_first_lkid)
2121                 return;
2122
2123         switch (error) {
2124         case 0:
2125         case -EINPROGRESS:
2126                 r->res_first_lkid = 0;
2127                 process_lookup_list(r);
2128                 break;
2129
2130         case -EAGAIN:
2131         case -EBADR:
2132         case -ENOTBLK:
2133                 /* the remote request failed and won't be retried (it was
2134                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2135                    lkb the first_lkid */
2136
2137                 r->res_first_lkid = 0;
2138
2139                 if (!list_empty(&r->res_lookup)) {
2140                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2141                                          lkb_rsb_lookup);
2142                         list_del_init(&lkb->lkb_rsb_lookup);
2143                         r->res_first_lkid = lkb->lkb_id;
2144                         _request_lock(r, lkb);
2145                 }
2146                 break;
2147
2148         default:
2149                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2150         }
2151 }
2152
2153 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2154                          int namelen, unsigned long timeout_cs,
2155                          void (*ast) (void *astparam),
2156                          void *astparam,
2157                          void (*bast) (void *astparam, int mode),
2158                          struct dlm_args *args)
2159 {
2160         int rv = -EINVAL;
2161
2162         /* check for invalid arg usage */
2163
2164         if (mode < 0 || mode > DLM_LOCK_EX)
2165                 goto out;
2166
2167         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2168                 goto out;
2169
2170         if (flags & DLM_LKF_CANCEL)
2171                 goto out;
2172
2173         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2174                 goto out;
2175
2176         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2177                 goto out;
2178
2179         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2180                 goto out;
2181
2182         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2183                 goto out;
2184
2185         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2186                 goto out;
2187
2188         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2189                 goto out;
2190
2191         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2192                 goto out;
2193
2194         if (!ast || !lksb)
2195                 goto out;
2196
2197         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2198                 goto out;
2199
2200         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2201                 goto out;
2202
2203         /* these args will be copied to the lkb in validate_lock_args,
2204            it cannot be done now because when converting locks, fields in
2205            an active lkb cannot be modified before locking the rsb */
2206
2207         args->flags = flags;
2208         args->astfn = ast;
2209         args->astparam = astparam;
2210         args->bastfn = bast;
2211         args->timeout = timeout_cs;
2212         args->mode = mode;
2213         args->lksb = lksb;
2214         rv = 0;
2215  out:
2216         return rv;
2217 }
2218
2219 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2220 {
2221         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2222                       DLM_LKF_FORCEUNLOCK))
2223                 return -EINVAL;
2224
2225         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2226                 return -EINVAL;
2227
2228         args->flags = flags;
2229         args->astparam = astarg;
2230         return 0;
2231 }
2232
2233 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2234                               struct dlm_args *args)
2235 {
2236         int rv = -EINVAL;
2237
2238         if (args->flags & DLM_LKF_CONVERT) {
2239                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2240                         goto out;
2241
2242                 if (args->flags & DLM_LKF_QUECVT &&
2243                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2244                         goto out;
2245
2246                 rv = -EBUSY;
2247                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2248                         goto out;
2249
2250                 if (lkb->lkb_wait_type)
2251                         goto out;
2252
2253                 if (is_overlap(lkb))
2254                         goto out;
2255         }
2256
2257         lkb->lkb_exflags = args->flags;
2258         lkb->lkb_sbflags = 0;
2259         lkb->lkb_astfn = args->astfn;
2260         lkb->lkb_astparam = args->astparam;
2261         lkb->lkb_bastfn = args->bastfn;
2262         lkb->lkb_rqmode = args->mode;
2263         lkb->lkb_lksb = args->lksb;
2264         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2265         lkb->lkb_ownpid = (int) current->pid;
2266         lkb->lkb_timeout_cs = args->timeout;
2267         rv = 0;
2268  out:
2269         if (rv)
2270                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2271                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2272                           lkb->lkb_status, lkb->lkb_wait_type,
2273                           lkb->lkb_resource->res_name);
2274         return rv;
2275 }
2276
2277 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2278    for success */
2279
2280 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2281    because there may be a lookup in progress and it's valid to do
2282    cancel/unlockf on it */
2283
2284 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2285 {
2286         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2287         int rv = -EINVAL;
2288
2289         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2290                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2291                 dlm_print_lkb(lkb);
2292                 goto out;
2293         }
2294
2295         /* an lkb may still exist even though the lock is EOL'ed due to a
2296            cancel, unlock or failed noqueue request; an app can't use these
2297            locks; return same error as if the lkid had not been found at all */
2298
2299         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2300                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2301                 rv = -ENOENT;
2302                 goto out;
2303         }
2304
2305         /* an lkb may be waiting for an rsb lookup to complete where the
2306            lookup was initiated by another lock */
2307
2308         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2309                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2310                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2311                         list_del_init(&lkb->lkb_rsb_lookup);
2312                         queue_cast(lkb->lkb_resource, lkb,
2313                                    args->flags & DLM_LKF_CANCEL ?
2314                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2315                         unhold_lkb(lkb); /* undoes create_lkb() */
2316                 }
2317                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2318                 rv = -EBUSY;
2319                 goto out;
2320         }
2321
2322         /* cancel not allowed with another cancel/unlock in progress */
2323
2324         if (args->flags & DLM_LKF_CANCEL) {
2325                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2326                         goto out;
2327
2328                 if (is_overlap(lkb))
2329                         goto out;
2330
2331                 /* don't let scand try to do a cancel */
2332                 del_timeout(lkb);
2333
2334                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2335                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2336                         rv = -EBUSY;
2337                         goto out;
2338                 }
2339
2340                 /* there's nothing to cancel */
2341                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2342                     !lkb->lkb_wait_type) {
2343                         rv = -EBUSY;
2344                         goto out;
2345                 }
2346
2347                 switch (lkb->lkb_wait_type) {
2348                 case DLM_MSG_LOOKUP:
2349                 case DLM_MSG_REQUEST:
2350                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2351                         rv = -EBUSY;
2352                         goto out;
2353                 case DLM_MSG_UNLOCK:
2354                 case DLM_MSG_CANCEL:
2355                         goto out;
2356                 }
2357                 /* add_to_waiters() will set OVERLAP_CANCEL */
2358                 goto out_ok;
2359         }
2360
2361         /* do we need to allow a force-unlock if there's a normal unlock
2362            already in progress?  in what conditions could the normal unlock
2363            fail such that we'd want to send a force-unlock to be sure? */
2364
2365         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2366                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2367                         goto out;
2368
2369                 if (is_overlap_unlock(lkb))
2370                         goto out;
2371
2372                 /* don't let scand try to do a cancel */
2373                 del_timeout(lkb);
2374
2375                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2376                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2377                         rv = -EBUSY;
2378                         goto out;
2379                 }
2380
2381                 switch (lkb->lkb_wait_type) {
2382                 case DLM_MSG_LOOKUP:
2383                 case DLM_MSG_REQUEST:
2384                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2385                         rv = -EBUSY;
2386                         goto out;
2387                 case DLM_MSG_UNLOCK:
2388                         goto out;
2389                 }
2390                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2391                 goto out_ok;
2392         }
2393
2394         /* normal unlock not allowed if there's any op in progress */
2395         rv = -EBUSY;
2396         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2397                 goto out;
2398
2399  out_ok:
2400         /* an overlapping op shouldn't blow away exflags from other op */
2401         lkb->lkb_exflags |= args->flags;
2402         lkb->lkb_sbflags = 0;
2403         lkb->lkb_astparam = args->astparam;
2404         rv = 0;
2405  out:
2406         if (rv)
2407                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2408                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2409                           args->flags, lkb->lkb_wait_type,
2410                           lkb->lkb_resource->res_name);
2411         return rv;
2412 }
2413
2414 /*
2415  * Four stage 4 varieties:
2416  * do_request(), do_convert(), do_unlock(), do_cancel()
2417  * These are called on the master node for the given lock and
2418  * from the central locking logic.
2419  */
2420
2421 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2422 {
2423         int error = 0;
2424
2425         if (can_be_granted(r, lkb, 1, NULL)) {
2426                 grant_lock(r, lkb);
2427                 queue_cast(r, lkb, 0);
2428                 goto out;
2429         }
2430
2431         if (can_be_queued(lkb)) {
2432                 error = -EINPROGRESS;
2433                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2434                 add_timeout(lkb);
2435                 goto out;
2436         }
2437
2438         error = -EAGAIN;
2439         queue_cast(r, lkb, -EAGAIN);
2440  out:
2441         return error;
2442 }
2443
2444 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2445                                int error)
2446 {
2447         switch (error) {
2448         case -EAGAIN:
2449                 if (force_blocking_asts(lkb))
2450                         send_blocking_asts_all(r, lkb);
2451                 break;
2452         case -EINPROGRESS:
2453                 send_blocking_asts(r, lkb);
2454                 break;
2455         }
2456 }
2457
2458 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2459 {
2460         int error = 0;
2461         int deadlk = 0;
2462
2463         /* changing an existing lock may allow others to be granted */
2464
2465         if (can_be_granted(r, lkb, 1, &deadlk)) {
2466                 grant_lock(r, lkb);
2467                 queue_cast(r, lkb, 0);
2468                 goto out;
2469         }
2470
2471         /* can_be_granted() detected that this lock would block in a conversion
2472            deadlock, so we leave it on the granted queue and return EDEADLK in
2473            the ast for the convert. */
2474
2475         if (deadlk) {
2476                 /* it's left on the granted queue */
2477                 revert_lock(r, lkb);
2478                 queue_cast(r, lkb, -EDEADLK);
2479                 error = -EDEADLK;
2480                 goto out;
2481         }
2482
2483         /* is_demoted() means the can_be_granted() above set the grmode
2484            to NL, and left us on the granted queue.  This auto-demotion
2485            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2486            now grantable.  We have to try to grant other converting locks
2487            before we try again to grant this one. */
2488
2489         if (is_demoted(lkb)) {
2490                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2491                 if (_can_be_granted(r, lkb, 1)) {
2492                         grant_lock(r, lkb);
2493                         queue_cast(r, lkb, 0);
2494                         goto out;
2495                 }
2496                 /* else fall through and move to convert queue */
2497         }
2498
2499         if (can_be_queued(lkb)) {
2500                 error = -EINPROGRESS;
2501                 del_lkb(r, lkb);
2502                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2503                 add_timeout(lkb);
2504                 goto out;
2505         }
2506
2507         error = -EAGAIN;
2508         queue_cast(r, lkb, -EAGAIN);
2509  out:
2510         return error;
2511 }
2512
2513 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2514                                int error)
2515 {
2516         switch (error) {
2517         case 0:
2518                 grant_pending_locks(r);
2519                 /* grant_pending_locks also sends basts */
2520                 break;
2521         case -EAGAIN:
2522                 if (force_blocking_asts(lkb))
2523                         send_blocking_asts_all(r, lkb);
2524                 break;
2525         case -EINPROGRESS:
2526                 send_blocking_asts(r, lkb);
2527                 break;
2528         }
2529 }
2530
2531 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2532 {
2533         remove_lock(r, lkb);
2534         queue_cast(r, lkb, -DLM_EUNLOCK);
2535         return -DLM_EUNLOCK;
2536 }
2537
2538 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2539                               int error)
2540 {
2541         grant_pending_locks(r);
2542 }
2543
2544 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2545  
2546 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2547 {
2548         int error;
2549
2550         error = revert_lock(r, lkb);
2551         if (error) {
2552                 queue_cast(r, lkb, -DLM_ECANCEL);
2553                 return -DLM_ECANCEL;
2554         }
2555         return 0;
2556 }
2557
2558 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2559                               int error)
2560 {
2561         if (error)
2562                 grant_pending_locks(r);
2563 }
2564
2565 /*
2566  * Four stage 3 varieties:
2567  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2568  */
2569
2570 /* add a new lkb to a possibly new rsb, called by requesting process */
2571
2572 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2573 {
2574         int error;
2575
2576         /* set_master: sets lkb nodeid from r */
2577
2578         error = set_master(r, lkb);
2579         if (error < 0)
2580                 goto out;
2581         if (error) {
2582                 error = 0;
2583                 goto out;
2584         }
2585
2586         if (is_remote(r)) {
2587                 /* receive_request() calls do_request() on remote node */
2588                 error = send_request(r, lkb);
2589         } else {
2590                 error = do_request(r, lkb);
2591                 /* for remote locks the request_reply is sent
2592                    between do_request and do_request_effects */
2593                 do_request_effects(r, lkb, error);
2594         }
2595  out:
2596         return error;
2597 }
2598
2599 /* change some property of an existing lkb, e.g. mode */
2600
2601 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2602 {
2603         int error;
2604
2605         if (is_remote(r)) {
2606                 /* receive_convert() calls do_convert() on remote node */
2607                 error = send_convert(r, lkb);
2608         } else {
2609                 error = do_convert(r, lkb);
2610                 /* for remote locks the convert_reply is sent
2611                    between do_convert and do_convert_effects */
2612                 do_convert_effects(r, lkb, error);
2613         }
2614
2615         return error;
2616 }
2617
2618 /* remove an existing lkb from the granted queue */
2619
2620 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2621 {
2622         int error;
2623
2624         if (is_remote(r)) {
2625                 /* receive_unlock() calls do_unlock() on remote node */
2626                 error = send_unlock(r, lkb);
2627         } else {
2628                 error = do_unlock(r, lkb);
2629                 /* for remote locks the unlock_reply is sent
2630                    between do_unlock and do_unlock_effects */
2631                 do_unlock_effects(r, lkb, error);
2632         }
2633
2634         return error;
2635 }
2636
2637 /* remove an existing lkb from the convert or wait queue */
2638
2639 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2640 {
2641         int error;
2642
2643         if (is_remote(r)) {
2644                 /* receive_cancel() calls do_cancel() on remote node */
2645                 error = send_cancel(r, lkb);
2646         } else {
2647                 error = do_cancel(r, lkb);
2648                 /* for remote locks the cancel_reply is sent
2649                    between do_cancel and do_cancel_effects */
2650                 do_cancel_effects(r, lkb, error);
2651         }
2652
2653         return error;
2654 }
2655
2656 /*
2657  * Four stage 2 varieties:
2658  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2659  */
2660
2661 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2662                         int len, struct dlm_args *args)
2663 {
2664         struct dlm_rsb *r;
2665         int error;
2666
2667         error = validate_lock_args(ls, lkb, args);
2668         if (error)
2669                 goto out;
2670
2671         error = find_rsb(ls, name, len, R_CREATE, &r);
2672         if (error)
2673                 goto out;
2674
2675         lock_rsb(r);
2676
2677         attach_lkb(r, lkb);
2678         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2679
2680         error = _request_lock(r, lkb);
2681
2682         unlock_rsb(r);
2683         put_rsb(r);
2684
2685  out:
2686         return error;
2687 }
2688
2689 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2690                         struct dlm_args *args)
2691 {
2692         struct dlm_rsb *r;
2693         int error;
2694
2695         r = lkb->lkb_resource;
2696
2697         hold_rsb(r);
2698         lock_rsb(r);
2699
2700         error = validate_lock_args(ls, lkb, args);
2701         if (error)
2702                 goto out;
2703
2704         error = _convert_lock(r, lkb);
2705  out:
2706         unlock_rsb(r);
2707         put_rsb(r);
2708         return error;
2709 }
2710
2711 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2712                        struct dlm_args *args)
2713 {
2714         struct dlm_rsb *r;
2715         int error;
2716
2717         r = lkb->lkb_resource;
2718
2719         hold_rsb(r);
2720         lock_rsb(r);
2721
2722         error = validate_unlock_args(lkb, args);
2723         if (error)
2724                 goto out;
2725
2726         error = _unlock_lock(r, lkb);
2727  out:
2728         unlock_rsb(r);
2729         put_rsb(r);
2730         return error;
2731 }
2732
2733 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2734                        struct dlm_args *args)
2735 {
2736         struct dlm_rsb *r;
2737         int error;
2738
2739         r = lkb->lkb_resource;
2740
2741         hold_rsb(r);
2742         lock_rsb(r);
2743
2744         error = validate_unlock_args(lkb, args);
2745         if (error)
2746                 goto out;
2747
2748         error = _cancel_lock(r, lkb);
2749  out:
2750         unlock_rsb(r);
2751         put_rsb(r);
2752         return error;
2753 }
2754
2755 /*
2756  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2757  */
2758
2759 int dlm_lock(dlm_lockspace_t *lockspace,
2760              int mode,
2761              struct dlm_lksb *lksb,
2762              uint32_t flags,
2763              void *name,
2764              unsigned int namelen,
2765              uint32_t parent_lkid,
2766              void (*ast) (void *astarg),
2767              void *astarg,
2768              void (*bast) (void *astarg, int mode))
2769 {
2770         struct dlm_ls *ls;
2771         struct dlm_lkb *lkb;
2772         struct dlm_args args;
2773         int error, convert = flags & DLM_LKF_CONVERT;
2774
2775         ls = dlm_find_lockspace_local(lockspace);
2776         if (!ls)
2777                 return -EINVAL;
2778
2779         dlm_lock_recovery(ls);
2780
2781         if (convert)
2782                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2783         else
2784                 error = create_lkb(ls, &lkb);
2785
2786         if (error)
2787                 goto out;
2788
2789         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2790                               astarg, bast, &args);
2791         if (error)
2792                 goto out_put;
2793
2794         if (convert)
2795                 error = convert_lock(ls, lkb, &args);
2796         else
2797                 error = request_lock(ls, lkb, name, namelen, &args);
2798
2799         if (error == -EINPROGRESS)
2800                 error = 0;
2801  out_put:
2802         if (convert || error)
2803                 __put_lkb(ls, lkb);
2804         if (error == -EAGAIN || error == -EDEADLK)
2805                 error = 0;
2806  out:
2807         dlm_unlock_recovery(ls);
2808         dlm_put_lockspace(ls);
2809         return error;
2810 }
2811
2812 int dlm_unlock(dlm_lockspace_t *lockspace,
2813                uint32_t lkid,
2814                uint32_t flags,
2815                struct dlm_lksb *lksb,
2816                void *astarg)
2817 {
2818         struct dlm_ls *ls;
2819         struct dlm_lkb *lkb;
2820         struct dlm_args args;
2821         int error;
2822
2823         ls = dlm_find_lockspace_local(lockspace);
2824         if (!ls)
2825                 return -EINVAL;
2826
2827         dlm_lock_recovery(ls);
2828
2829         error = find_lkb(ls, lkid, &lkb);
2830         if (error)
2831                 goto out;
2832
2833         error = set_unlock_args(flags, astarg, &args);
2834         if (error)
2835                 goto out_put;
2836
2837         if (flags & DLM_LKF_CANCEL)
2838                 error = cancel_lock(ls, lkb, &args);
2839         else
2840                 error = unlock_lock(ls, lkb, &args);
2841
2842         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2843                 error = 0;
2844         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2845                 error = 0;
2846  out_put:
2847         dlm_put_lkb(lkb);
2848  out:
2849         dlm_unlock_recovery(ls);
2850         dlm_put_lockspace(ls);
2851         return error;
2852 }
2853
2854 /*
2855  * send/receive routines for remote operations and replies
2856  *
2857  * send_args
2858  * send_common
2859  * send_request                 receive_request
2860  * send_convert                 receive_convert
2861  * send_unlock                  receive_unlock
2862  * send_cancel                  receive_cancel
2863  * send_grant                   receive_grant
2864  * send_bast                    receive_bast
2865  * send_lookup                  receive_lookup
2866  * send_remove                  receive_remove
2867  *
2868  *                              send_common_reply
2869  * receive_request_reply        send_request_reply
2870  * receive_convert_reply        send_convert_reply
2871  * receive_unlock_reply         send_unlock_reply
2872  * receive_cancel_reply         send_cancel_reply
2873  * receive_lookup_reply         send_lookup_reply
2874  */
2875
2876 static int _create_message(struct dlm_ls *ls, int mb_len,
2877                            int to_nodeid, int mstype,
2878                            struct dlm_message **ms_ret,
2879                            struct dlm_mhandle **mh_ret)
2880 {
2881         struct dlm_message *ms;
2882         struct dlm_mhandle *mh;
2883         char *mb;
2884
2885         /* get_buffer gives us a message handle (mh) that we need to
2886            pass into lowcomms_commit and a message buffer (mb) that we
2887            write our data into */
2888
2889         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2890         if (!mh)
2891                 return -ENOBUFS;
2892
2893         memset(mb, 0, mb_len);
2894
2895         ms = (struct dlm_message *) mb;
2896
2897         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2898         ms->m_header.h_lockspace = ls->ls_global_id;
2899         ms->m_header.h_nodeid = dlm_our_nodeid();
2900         ms->m_header.h_length = mb_len;
2901         ms->m_header.h_cmd = DLM_MSG;
2902
2903         ms->m_type = mstype;
2904
2905         *mh_ret = mh;
2906         *ms_ret = ms;
2907         return 0;
2908 }
2909
2910 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2911                           int to_nodeid, int mstype,
2912                           struct dlm_message **ms_ret,
2913                           struct dlm_mhandle **mh_ret)
2914 {
2915         int mb_len = sizeof(struct dlm_message);
2916
2917         switch (mstype) {
2918         case DLM_MSG_REQUEST:
2919         case DLM_MSG_LOOKUP:
2920         case DLM_MSG_REMOVE:
2921                 mb_len += r->res_length;
2922                 break;
2923         case DLM_MSG_CONVERT:
2924         case DLM_MSG_UNLOCK:
2925         case DLM_MSG_REQUEST_REPLY:
2926         case DLM_MSG_CONVERT_REPLY:
2927         case DLM_MSG_GRANT:
2928                 if (lkb && lkb->lkb_lvbptr)
2929                         mb_len += r->res_ls->ls_lvblen;
2930                 break;
2931         }
2932
2933         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2934                                ms_ret, mh_ret);
2935 }
2936
2937 /* further lowcomms enhancements or alternate implementations may make
2938    the return value from this function useful at some point */
2939
2940 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2941 {
2942         dlm_message_out(ms);
2943         dlm_lowcomms_commit_buffer(mh);
2944         return 0;
2945 }
2946
2947 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2948                       struct dlm_message *ms)
2949 {
2950         ms->m_nodeid   = lkb->lkb_nodeid;
2951         ms->m_pid      = lkb->lkb_ownpid;
2952         ms->m_lkid     = lkb->lkb_id;
2953         ms->m_remid    = lkb->lkb_remid;
2954         ms->m_exflags  = lkb->lkb_exflags;
2955         ms->m_sbflags  = lkb->lkb_sbflags;
2956         ms->m_flags    = lkb->lkb_flags;
2957         ms->m_lvbseq   = lkb->lkb_lvbseq;
2958         ms->m_status   = lkb->lkb_status;
2959         ms->m_grmode   = lkb->lkb_grmode;
2960         ms->m_rqmode   = lkb->lkb_rqmode;
2961         ms->m_hash     = r->res_hash;
2962
2963         /* m_result and m_bastmode are set from function args,
2964            not from lkb fields */
2965
2966         if (lkb->lkb_bastfn)
2967                 ms->m_asts |= DLM_CB_BAST;
2968         if (lkb->lkb_astfn)
2969                 ms->m_asts |= DLM_CB_CAST;
2970
2971         /* compare with switch in create_message; send_remove() doesn't
2972            use send_args() */
2973
2974         switch (ms->m_type) {
2975         case DLM_MSG_REQUEST:
2976         case DLM_MSG_LOOKUP:
2977                 memcpy(ms->m_extra, r->res_name, r->res_length);
2978                 break;
2979         case DLM_MSG_CONVERT:
2980         case DLM_MSG_UNLOCK:
2981         case DLM_MSG_REQUEST_REPLY:
2982         case DLM_MSG_CONVERT_REPLY:
2983         case DLM_MSG_GRANT:
2984                 if (!lkb->lkb_lvbptr)
2985                         break;
2986                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2987                 break;
2988         }
2989 }
2990
2991 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2992 {
2993         struct dlm_message *ms;
2994         struct dlm_mhandle *mh;
2995         int to_nodeid, error;
2996
2997         to_nodeid = r->res_nodeid;
2998
2999         error = add_to_waiters(lkb, mstype, to_nodeid);
3000         if (error)
3001                 return error;
3002
3003         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3004         if (error)
3005                 goto fail;
3006
3007         send_args(r, lkb, ms);
3008
3009         error = send_message(mh, ms);
3010         if (error)
3011                 goto fail;
3012         return 0;
3013
3014  fail:
3015         remove_from_waiters(lkb, msg_reply_type(mstype));
3016         return error;
3017 }
3018
3019 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3020 {
3021         return send_common(r, lkb, DLM_MSG_REQUEST);
3022 }
3023
3024 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3025 {
3026         int error;
3027
3028         error = send_common(r, lkb, DLM_MSG_CONVERT);
3029
3030         /* down conversions go without a reply from the master */
3031         if (!error && down_conversion(lkb)) {
3032                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3033                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3034                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3035                 r->res_ls->ls_stub_ms.m_result = 0;
3036                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3037         }
3038
3039         return error;
3040 }
3041
3042 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3043    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3044    that the master is still correct. */
3045
3046 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3047 {
3048         return send_common(r, lkb, DLM_MSG_UNLOCK);
3049 }
3050
3051 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3052 {
3053         return send_common(r, lkb, DLM_MSG_CANCEL);
3054 }
3055
3056 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3057 {
3058         struct dlm_message *ms;
3059         struct dlm_mhandle *mh;
3060         int to_nodeid, error;
3061
3062         to_nodeid = lkb->lkb_nodeid;
3063
3064         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3065         if (error)
3066                 goto out;
3067
3068         send_args(r, lkb, ms);
3069
3070         ms->m_result = 0;
3071
3072         error = send_message(mh, ms);
3073  out:
3074         return error;
3075 }
3076
3077 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3078 {
3079         struct dlm_message *ms;
3080         struct dlm_mhandle *mh;
3081         int to_nodeid, error;
3082
3083         to_nodeid = lkb->lkb_nodeid;
3084
3085         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3086         if (error)
3087                 goto out;
3088
3089         send_args(r, lkb, ms);
3090
3091         ms->m_bastmode = mode;
3092
3093         error = send_message(mh, ms);
3094  out:
3095         return error;
3096 }
3097
3098 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3099 {
3100         struct dlm_message *ms;
3101         struct dlm_mhandle *mh;
3102         int to_nodeid, error;
3103
3104         to_nodeid = dlm_dir_nodeid(r);
3105
3106         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3107         if (error)
3108                 return error;
3109
3110         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3111         if (error)
3112                 goto fail;
3113
3114         send_args(r, lkb, ms);
3115
3116         error = send_message(mh, ms);
3117         if (error)
3118                 goto fail;
3119         return 0;
3120
3121  fail:
3122         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3123         return error;
3124 }
3125
3126 static int send_remove(struct dlm_rsb *r)
3127 {
3128         struct dlm_message *ms;
3129         struct dlm_mhandle *mh;
3130         int to_nodeid, error;
3131
3132         to_nodeid = dlm_dir_nodeid(r);
3133
3134         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3135         if (error)
3136                 goto out;
3137
3138         memcpy(ms->m_extra, r->res_name, r->res_length);
3139         ms->m_hash = r->res_hash;
3140
3141         error = send_message(mh, ms);
3142  out:
3143         return error;
3144 }
3145
3146 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3147                              int mstype, int rv)
3148 {
3149         struct dlm_message *ms;
3150         struct dlm_mhandle *mh;
3151         int to_nodeid, error;
3152
3153         to_nodeid = lkb->lkb_nodeid;
3154
3155         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3156         if (error)
3157                 goto out;
3158
3159         send_args(r, lkb, ms);
3160
3161         ms->m_result = rv;
3162
3163         error = send_message(mh, ms);
3164  out:
3165         return error;
3166 }
3167
3168 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3169 {
3170         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3171 }
3172
3173 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3174 {
3175         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3176 }
3177
3178 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3179 {
3180         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3181 }
3182
3183 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3184 {
3185         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3186 }
3187
3188 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3189                              int ret_nodeid, int rv)
3190 {
3191         struct dlm_rsb *r = &ls->ls_stub_rsb;
3192         struct dlm_message *ms;
3193         struct dlm_mhandle *mh;
3194         int error, nodeid = ms_in->m_header.h_nodeid;
3195
3196         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3197         if (error)
3198                 goto out;
3199
3200         ms->m_lkid = ms_in->m_lkid;
3201         ms->m_result = rv;
3202         ms->m_nodeid = ret_nodeid;
3203
3204         error = send_message(mh, ms);
3205  out:
3206         return error;
3207 }
3208
3209 /* which args we save from a received message depends heavily on the type
3210    of message, unlike the send side where we can safely send everything about
3211    the lkb for any type of message */
3212
3213 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3214 {
3215         lkb->lkb_exflags = ms->m_exflags;
3216         lkb->lkb_sbflags = ms->m_sbflags;
3217         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3218                          (ms->m_flags & 0x0000FFFF);
3219 }
3220
3221 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3222 {
3223         if (ms->m_flags == DLM_IFL_STUB_MS)
3224                 return;
3225
3226         lkb->lkb_sbflags = ms->m_sbflags;
3227         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3228                          (ms->m_flags & 0x0000FFFF);
3229 }
3230
3231 static int receive_extralen(struct dlm_message *ms)
3232 {
3233         return (ms->m_header.h_length - sizeof(struct dlm_message));
3234 }
3235
3236 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3237                        struct dlm_message *ms)
3238 {
3239         int len;
3240
3241         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3242                 if (!lkb->lkb_lvbptr)
3243                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3244                 if (!lkb->lkb_lvbptr)
3245                         return -ENOMEM;
3246                 len = receive_extralen(ms);
3247                 if (len > DLM_RESNAME_MAXLEN)
3248                         len = DLM_RESNAME_MAXLEN;
3249                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3250         }
3251         return 0;
3252 }
3253
3254 static void fake_bastfn(void *astparam, int mode)
3255 {
3256         log_print("fake_bastfn should not be called");
3257 }
3258
3259 static void fake_astfn(void *astparam)
3260 {
3261         log_print("fake_astfn should not be called");
3262 }
3263
3264 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3265                                 struct dlm_message *ms)
3266 {
3267         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3268         lkb->lkb_ownpid = ms->m_pid;
3269         lkb->lkb_remid = ms->m_lkid;
3270         lkb->lkb_grmode = DLM_LOCK_IV;
3271         lkb->lkb_rqmode = ms->m_rqmode;
3272
3273         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3274         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3275
3276         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3277                 /* lkb was just created so there won't be an lvb yet */
3278                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3279                 if (!lkb->lkb_lvbptr)
3280                         return -ENOMEM;
3281         }
3282
3283         return 0;
3284 }
3285
3286 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3287                                 struct dlm_message *ms)
3288 {
3289         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3290                 return -EBUSY;
3291
3292         if (receive_lvb(ls, lkb, ms))
3293                 return -ENOMEM;
3294
3295         lkb->lkb_rqmode = ms->m_rqmode;
3296         lkb->lkb_lvbseq = ms->m_lvbseq;
3297
3298         return 0;
3299 }
3300
3301 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3302                                struct dlm_message *ms)
3303 {
3304         if (receive_lvb(ls, lkb, ms))
3305                 return -ENOMEM;
3306         return 0;
3307 }
3308
3309 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3310    uses to send a reply and that the remote end uses to process the reply. */
3311
3312 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3313 {
3314         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3315         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3316         lkb->lkb_remid = ms->m_lkid;
3317 }
3318
3319 /* This is called after the rsb is locked so that we can safely inspect
3320    fields in the lkb. */
3321
3322 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3323 {
3324         int from = ms->m_header.h_nodeid;
3325         int error = 0;
3326
3327         switch (ms->m_type) {
3328         case DLM_MSG_CONVERT:
3329         case DLM_MSG_UNLOCK:
3330         case DLM_MSG_CANCEL:
3331                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3332                         error = -EINVAL;
3333                 break;
3334
3335         case DLM_MSG_CONVERT_REPLY:
3336         case DLM_MSG_UNLOCK_REPLY:
3337         case DLM_MSG_CANCEL_REPLY:
3338         case DLM_MSG_GRANT:
3339         case DLM_MSG_BAST:
3340                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3341                         error = -EINVAL;
3342                 break;
3343
3344         case DLM_MSG_REQUEST_REPLY:
3345                 if (!is_process_copy(lkb))
3346                         error = -EINVAL;
3347                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3348                         error = -EINVAL;
3349                 break;
3350
3351         default:
3352                 error = -EINVAL;
3353         }
3354
3355         if (error)
3356                 log_error(lkb->lkb_resource->res_ls,
3357                           "ignore invalid message %d from %d %x %x %x %d",
3358                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3359                           lkb->lkb_flags, lkb->lkb_nodeid);
3360         return error;
3361 }
3362
3363 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3364 {
3365         struct dlm_lkb *lkb;
3366         struct dlm_rsb *r;
3367         int error, namelen;
3368
3369         error = create_lkb(ls, &lkb);
3370         if (error)
3371                 goto fail;
3372
3373         receive_flags(lkb, ms);
3374         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3375         error = receive_request_args(ls, lkb, ms);
3376         if (error) {
3377                 __put_lkb(ls, lkb);
3378                 goto fail;
3379         }
3380
3381         namelen = receive_extralen(ms);
3382
3383         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3384         if (error) {
3385                 __put_lkb(ls, lkb);
3386                 goto fail;
3387         }
3388
3389         lock_rsb(r);
3390
3391         attach_lkb(r, lkb);
3392         error = do_request(r, lkb);
3393         send_request_reply(r, lkb, error);
3394         do_request_effects(r, lkb, error);
3395
3396         unlock_rsb(r);
3397         put_rsb(r);
3398
3399         if (error == -EINPROGRESS)
3400                 error = 0;
3401         if (error)
3402                 dlm_put_lkb(lkb);
3403         return;
3404
3405  fail:
3406         setup_stub_lkb(ls, ms);
3407         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3408 }
3409
3410 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3411 {
3412         struct dlm_lkb *lkb;
3413         struct dlm_rsb *r;
3414         int error, reply = 1;
3415
3416         error = find_lkb(ls, ms->m_remid, &lkb);
3417         if (error)
3418                 goto fail;
3419
3420         r = lkb->lkb_resource;
3421
3422         hold_rsb(r);
3423         lock_rsb(r);
3424
3425         error = validate_message(lkb, ms);
3426         if (error)
3427                 goto out;
3428
3429         receive_flags(lkb, ms);
3430
3431         error = receive_convert_args(ls, lkb, ms);
3432         if (error) {
3433                 send_convert_reply(r, lkb, error);
3434                 goto out;
3435         }
3436
3437         reply = !down_conversion(lkb);
3438
3439         error = do_convert(r, lkb);
3440         if (reply)
3441                 send_convert_reply(r, lkb, error);
3442         do_convert_effects(r, lkb, error);
3443  out:
3444         unlock_rsb(r);
3445         put_rsb(r);
3446         dlm_put_lkb(lkb);
3447         return;
3448
3449  fail:
3450         setup_stub_lkb(ls, ms);
3451         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3452 }
3453
3454 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3455 {
3456         struct dlm_lkb *lkb;
3457         struct dlm_rsb *r;
3458         int error;
3459
3460         error = find_lkb(ls, ms->m_remid, &lkb);
3461         if (error)
3462                 goto fail;
3463
3464         r = lkb->lkb_resource;
3465
3466         hold_rsb(r);
3467         lock_rsb(r);
3468
3469         error = validate_message(lkb, ms);
3470         if (error)
3471                 goto out;
3472
3473         receive_flags(lkb, ms);
3474
3475         error = receive_unlock_args(ls, lkb, ms);
3476         if (error) {
3477                 send_unlock_reply(r, lkb, error);
3478                 goto out;
3479         }
3480
3481         error = do_unlock(r, lkb);
3482         send_unlock_reply(r, lkb, error);
3483         do_unlock_effects(r, lkb, error);
3484  out:
3485         unlock_rsb(r);
3486         put_rsb(r);
3487         dlm_put_lkb(lkb);
3488         return;
3489
3490  fail:
3491         setup_stub_lkb(ls, ms);
3492         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3493 }
3494
3495 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3496 {
3497         struct dlm_lkb *lkb;
3498         struct dlm_rsb *r;
3499         int error;
3500
3501         error = find_lkb(ls, ms->m_remid, &lkb);
3502         if (error)
3503                 goto fail;
3504
3505         receive_flags(lkb, ms);
3506
3507         r = lkb->lkb_resource;
3508
3509         hold_rsb(r);
3510         lock_rsb(r);
3511
3512         error = validate_message(lkb, ms);
3513         if (error)
3514                 goto out;
3515
3516         error = do_cancel(r, lkb);
3517         send_cancel_reply(r, lkb, error);
3518         do_cancel_effects(r, lkb, error);
3519  out:
3520         unlock_rsb(r);
3521         put_rsb(r);
3522         dlm_put_lkb(lkb);
3523         return;
3524
3525  fail:
3526         setup_stub_lkb(ls, ms);
3527         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3528 }
3529
3530 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3531 {
3532         struct dlm_lkb *lkb;
3533         struct dlm_rsb *r;
3534         int error;
3535
3536         error = find_lkb(ls, ms->m_remid, &lkb);
3537         if (error) {
3538                 log_debug(ls, "receive_grant from %d no lkb %x",
3539                           ms->m_header.h_nodeid, ms->m_remid);
3540                 return;
3541         }
3542
3543         r = lkb->lkb_resource;
3544
3545         hold_rsb(r);
3546         lock_rsb(r);
3547
3548         error = validate_message(lkb, ms);
3549         if (error)
3550                 goto out;
3551
3552         receive_flags_reply(lkb, ms);
3553         if (is_altmode(lkb))
3554                 munge_altmode(lkb, ms);
3555         grant_lock_pc(r, lkb, ms);
3556         queue_cast(r, lkb, 0);
3557  out:
3558         unlock_rsb(r);
3559         put_rsb(r);
3560         dlm_put_lkb(lkb);
3561 }
3562
3563 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3564 {
3565         struct dlm_lkb *lkb;
3566         struct dlm_rsb *r;
3567         int error;
3568
3569         error = find_lkb(ls, ms->m_remid, &lkb);
3570         if (error) {
3571                 log_debug(ls, "receive_bast from %d no lkb %x",
3572                           ms->m_header.h_nodeid, ms->m_remid);
3573                 return;
3574         }
3575
3576         r = lkb->lkb_resource;
3577
3578         hold_rsb(r);
3579         lock_rsb(r);
3580
3581         error = validate_message(lkb, ms);
3582         if (error)
3583                 goto out;
3584
3585         queue_bast(r, lkb, ms->m_bastmode);
3586  out:
3587         unlock_rsb(r);
3588         put_rsb(r);
3589         dlm_put_lkb(lkb);
3590 }
3591
3592 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3593 {
3594         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3595
3596         from_nodeid = ms->m_header.h_nodeid;
3597         our_nodeid = dlm_our_nodeid();
3598
3599         len = receive_extralen(ms);
3600
3601         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3602         if (dir_nodeid != our_nodeid) {
3603                 log_error(ls, "lookup dir_nodeid %d from %d",
3604                           dir_nodeid, from_nodeid);
3605                 error = -EINVAL;
3606                 ret_nodeid = -1;
3607                 goto out;
3608         }
3609
3610         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3611
3612         /* Optimization: we're master so treat lookup as a request */
3613         if (!error && ret_nodeid == our_nodeid) {
3614                 receive_request(ls, ms);
3615                 return;
3616         }
3617  out:
3618         send_lookup_reply(ls, ms, ret_nodeid, error);
3619 }
3620
3621 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3622 {
3623         int len, dir_nodeid, from_nodeid;
3624
3625         from_nodeid = ms->m_header.h_nodeid;
3626
3627         len = receive_extralen(ms);
3628
3629         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3630         if (dir_nodeid != dlm_our_nodeid()) {
3631                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3632                           dir_nodeid, from_nodeid);
3633                 return;
3634         }
3635
3636         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3637 }
3638
3639 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3640 {
3641         do_purge(ls, ms->m_nodeid, ms->m_pid);
3642 }
3643
3644 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3645 {
3646         struct dlm_lkb *lkb;
3647         struct dlm_rsb *r;
3648         int error, mstype, result;
3649
3650         error = find_lkb(ls, ms->m_remid, &lkb);
3651         if (error) {
3652                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3653                           ms->m_header.h_nodeid, ms->m_remid);
3654                 return;
3655         }
3656
3657         r = lkb->lkb_resource;
3658         hold_rsb(r);
3659         lock_rsb(r);
3660
3661         error = validate_message(lkb, ms);
3662         if (error)
3663                 goto out;
3664
3665         mstype = lkb->lkb_wait_type;
3666         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3667         if (error)
3668                 goto out;
3669
3670         /* Optimization: the dir node was also the master, so it took our
3671            lookup as a request and sent request reply instead of lookup reply */
3672         if (mstype == DLM_MSG_LOOKUP) {
3673                 r->res_nodeid = ms->m_header.h_nodeid;
3674                 lkb->lkb_nodeid = r->res_nodeid;
3675         }
3676
3677         /* this is the value returned from do_request() on the master */
3678         result = ms->m_result;
3679
3680         switch (result) {
3681         case -EAGAIN:
3682                 /* request would block (be queued) on remote master */
3683                 queue_cast(r, lkb, -EAGAIN);
3684                 confirm_master(r, -EAGAIN);
3685                 unhold_lkb(lkb); /* undoes create_lkb() */
3686                 break;
3687
3688         case -EINPROGRESS:
3689         case 0:
3690                 /* request was queued or granted on remote master */
3691                 receive_flags_reply(lkb, ms);
3692                 lkb->lkb_remid = ms->m_lkid;
3693                 if (is_altmode(lkb))
3694                         munge_altmode(lkb, ms);
3695                 if (result) {
3696                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3697                         add_timeout(lkb);
3698                 } else {
3699                         grant_lock_pc(r, lkb, ms);
3700                         queue_cast(r, lkb, 0);
3701                 }
3702                 confirm_master(r, result);
3703                 break;
3704
3705         case -EBADR:
3706         case -ENOTBLK:
3707                 /* find_rsb failed to find rsb or rsb wasn't master */
3708                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3709                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3710                 r->res_nodeid = -1;
3711                 lkb->lkb_nodeid = -1;
3712
3713                 if (is_overlap(lkb)) {
3714                         /* we'll ignore error in cancel/unlock reply */
3715                         queue_cast_overlap(r, lkb);
3716                         confirm_master(r, result);
3717                         unhold_lkb(lkb); /* undoes create_lkb() */
3718                 } else
3719                         _request_lock(r, lkb);
3720                 break;
3721
3722         default:
3723                 log_error(ls, "receive_request_reply %x error %d",
3724                           lkb->lkb_id, result);
3725         }
3726
3727         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3728                 log_debug(ls, "receive_request_reply %x result %d unlock",
3729                           lkb->lkb_id, result);
3730                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3731                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3732                 send_unlock(r, lkb);
3733         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3734                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3735                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3736                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3737                 send_cancel(r, lkb);
3738         } else {
3739                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3740                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3741         }
3742  out:
3743         unlock_rsb(r);
3744         put_rsb(r);
3745         dlm_put_lkb(lkb);
3746 }
3747
3748 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3749                                     struct dlm_message *ms)
3750 {
3751         /* this is the value returned from do_convert() on the master */
3752         switch (ms->m_result) {
3753         case -EAGAIN:
3754                 /* convert would block (be queued) on remote master */
3755                 queue_cast(r, lkb, -EAGAIN);
3756                 break;
3757
3758         case -EDEADLK:
3759                 receive_flags_reply(lkb, ms);
3760                 revert_lock_pc(r, lkb);
3761                 queue_cast(r, lkb, -EDEADLK);
3762                 break;
3763
3764         case -EINPROGRESS:
3765                 /* convert was queued on remote master */
3766                 receive_flags_reply(lkb, ms);
3767                 if (is_demoted(lkb))
3768                         munge_demoted(lkb);
3769                 del_lkb(r, lkb);
3770                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3771                 add_timeout(lkb);
3772                 break;
3773
3774         case 0:
3775                 /* convert was granted on remote master */
3776                 receive_flags_reply(lkb, ms);
3777                 if (is_demoted(lkb))
3778                         munge_demoted(lkb);
3779                 grant_lock_pc(r, lkb, ms);
3780                 queue_cast(r, lkb, 0);
3781                 break;
3782
3783         default:
3784                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3785                           lkb->lkb_id, ms->m_result);
3786         }
3787 }
3788
3789 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3790 {
3791         struct dlm_rsb *r = lkb->lkb_resource;
3792         int error;
3793
3794         hold_rsb(r);
3795         lock_rsb(r);
3796
3797         error = validate_message(lkb, ms);
3798         if (error)
3799                 goto out;
3800
3801         /* stub reply can happen with waiters_mutex held */
3802         error = remove_from_waiters_ms(lkb, ms);
3803         if (error)
3804                 goto out;
3805
3806         __receive_convert_reply(r, lkb, ms);
3807  out:
3808         unlock_rsb(r);
3809         put_rsb(r);
3810 }
3811
3812 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3813 {
3814         struct dlm_lkb *lkb;
3815         int error;
3816
3817         error = find_lkb(ls, ms->m_remid, &lkb);
3818         if (error) {
3819                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3820                           ms->m_header.h_nodeid, ms->m_remid);
3821                 return;
3822         }
3823
3824         _receive_convert_reply(lkb, ms);
3825         dlm_put_lkb(lkb);
3826 }
3827
3828 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3829 {
3830         struct dlm_rsb *r = lkb->lkb_resource;
3831         int error;
3832
3833         hold_rsb(r);
3834         lock_rsb(r);
3835
3836         error = validate_message(lkb, ms);
3837         if (error)
3838                 goto out;
3839
3840         /* stub reply can happen with waiters_mutex held */
3841         error = remove_from_waiters_ms(lkb, ms);
3842         if (error)
3843                 goto out;
3844
3845         /* this is the value returned from do_unlock() on the master */
3846
3847         switch (ms->m_result) {
3848         case -DLM_EUNLOCK:
3849                 receive_flags_reply(lkb, ms);
3850                 remove_lock_pc(r, lkb);
3851                 queue_cast(r, lkb, -DLM_EUNLOCK);
3852                 break;
3853         case -ENOENT:
3854                 break;
3855         default:
3856                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3857                           lkb->lkb_id, ms->m_result);
3858         }
3859  out:
3860         unlock_rsb(r);
3861         put_rsb(r);
3862 }
3863
3864 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3865 {
3866         struct dlm_lkb *lkb;
3867         int error;
3868
3869         error = find_lkb(ls, ms->m_remid, &lkb);
3870         if (error) {
3871                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3872                           ms->m_header.h_nodeid, ms->m_remid);
3873                 return;
3874         }
3875
3876         _receive_unlock_reply(lkb, ms);
3877         dlm_put_lkb(lkb);
3878 }
3879
3880 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3881 {
3882         struct dlm_rsb *r = lkb->lkb_resource;
3883         int error;
3884
3885         hold_rsb(r);
3886         lock_rsb(r);
3887
3888         error = validate_message(lkb, ms);
3889         if (error)
3890                 goto out;
3891
3892         /* stub reply can happen with waiters_mutex held */
3893         error = remove_from_waiters_ms(lkb, ms);
3894         if (error)
3895                 goto out;
3896
3897         /* this is the value returned from do_cancel() on the master */
3898
3899         switch (ms->m_result) {
3900         case -DLM_ECANCEL:
3901                 receive_flags_reply(lkb, ms);
3902                 revert_lock_pc(r, lkb);
3903                 queue_cast(r, lkb, -DLM_ECANCEL);
3904                 break;
3905         case 0:
3906                 break;
3907         default:
3908                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3909                           lkb->lkb_id, ms->m_result);
3910         }
3911  out:
3912         unlock_rsb(r);
3913         put_rsb(r);
3914 }
3915
3916 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3917 {
3918         struct dlm_lkb *lkb;
3919         int error;
3920
3921         error = find_lkb(ls, ms->m_remid, &lkb);
3922         if (error) {
3923                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3924                           ms->m_header.h_nodeid, ms->m_remid);
3925                 return;
3926         }
3927
3928         _receive_cancel_reply(lkb, ms);
3929         dlm_put_lkb(lkb);
3930 }
3931
3932 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3933 {
3934         struct dlm_lkb *lkb;
3935         struct dlm_rsb *r;
3936         int error, ret_nodeid;
3937
3938         error = find_lkb(ls, ms->m_lkid, &lkb);
3939         if (error) {
3940                 log_error(ls, "receive_lookup_reply no lkb");
3941                 return;
3942         }
3943
3944         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3945            FIXME: will a non-zero error ever be returned? */
3946
3947         r = lkb->lkb_resource;
3948         hold_rsb(r);
3949         lock_rsb(r);
3950
3951         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3952         if (error)
3953                 goto out;
3954
3955         ret_nodeid = ms->m_nodeid;
3956         if (ret_nodeid == dlm_our_nodeid()) {
3957                 r->res_nodeid = 0;
3958                 ret_nodeid = 0;
3959                 r->res_first_lkid = 0;
3960         } else {
3961                 /* set_master() will copy res_nodeid to lkb_nodeid */
3962                 r->res_nodeid = ret_nodeid;
3963         }
3964
3965         if (is_overlap(lkb)) {
3966                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3967                           lkb->lkb_id, lkb->lkb_flags);
3968                 queue_cast_overlap(r, lkb);
3969                 unhold_lkb(lkb); /* undoes create_lkb() */
3970                 goto out_list;
3971         }
3972
3973         _request_lock(r, lkb);
3974
3975  out_list:
3976         if (!ret_nodeid)
3977                 process_lookup_list(r);
3978  out:
3979         unlock_rsb(r);
3980         put_rsb(r);
3981         dlm_put_lkb(lkb);
3982 }
3983
3984 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3985 {
3986         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3987                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3988                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3989                           ms->m_remid, ms->m_result);
3990                 return;
3991         }
3992
3993         switch (ms->m_type) {
3994
3995         /* messages sent to a master node */
3996
3997         case DLM_MSG_REQUEST:
3998                 receive_request(ls, ms);
3999                 break;
4000
4001         case DLM_MSG_CONVERT:
4002                 receive_convert(ls, ms);
4003                 break;
4004
4005         case DLM_MSG_UNLOCK:
4006                 receive_unlock(ls, ms);
4007                 break;
4008
4009         case DLM_MSG_CANCEL:
4010                 receive_cancel(ls, ms);
4011                 break;
4012
4013         /* messages sent from a master node (replies to above) */
4014
4015         case DLM_MSG_REQUEST_REPLY:
4016                 receive_request_reply(ls, ms);
4017                 break;
4018
4019         case DLM_MSG_CONVERT_REPLY:
4020                 receive_convert_reply(ls, ms);
4021                 break;
4022
4023         case DLM_MSG_UNLOCK_REPLY:
4024                 receive_unlock_reply(ls, ms);
4025                 break;
4026
4027         case DLM_MSG_CANCEL_REPLY:
4028                 receive_cancel_reply(ls, ms);
4029                 break;
4030
4031         /* messages sent from a master node (only two types of async msg) */
4032
4033         case DLM_MSG_GRANT:
4034                 receive_grant(ls, ms);
4035                 break;
4036
4037         case DLM_MSG_BAST:
4038                 receive_bast(ls, ms);
4039                 break;
4040
4041         /* messages sent to a dir node */
4042
4043         case DLM_MSG_LOOKUP:
4044                 receive_lookup(ls, ms);
4045                 break;
4046
4047         case DLM_MSG_REMOVE:
4048                 receive_remove(ls, ms);
4049                 break;
4050
4051         /* messages sent from a dir node (remove has no reply) */
4052
4053         case DLM_MSG_LOOKUP_REPLY:
4054                 receive_lookup_reply(ls, ms);
4055                 break;
4056
4057         /* other messages */
4058
4059         case DLM_MSG_PURGE:
4060                 receive_purge(ls, ms);
4061                 break;
4062
4063         default:
4064                 log_error(ls, "unknown message type %d", ms->m_type);
4065         }
4066 }
4067
4068 /* If the lockspace is in recovery mode (locking stopped), then normal
4069    messages are saved on the requestqueue for processing after recovery is
4070    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4071    messages off the requestqueue before we process new ones. This occurs right
4072    after recovery completes when we transition from saving all messages on
4073    requestqueue, to processing all the saved messages, to processing new
4074    messages as they arrive. */
4075
4076 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4077                                 int nodeid)
4078 {
4079         if (dlm_locking_stopped(ls)) {
4080                 dlm_add_requestqueue(ls, nodeid, ms);
4081         } else {
4082                 dlm_wait_requestqueue(ls);
4083                 _receive_message(ls, ms);
4084         }
4085 }
4086
4087 /* This is called by dlm_recoverd to process messages that were saved on
4088    the requestqueue. */
4089
4090 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4091 {
4092         _receive_message(ls, ms);
4093 }
4094
4095 /* This is called by the midcomms layer when something is received for
4096    the lockspace.  It could be either a MSG (normal message sent as part of
4097    standard locking activity) or an RCOM (recovery message sent as part of
4098    lockspace recovery). */
4099
4100 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4101 {
4102         struct dlm_header *hd = &p->header;
4103         struct dlm_ls *ls;
4104         int type = 0;
4105
4106         switch (hd->h_cmd) {
4107         case DLM_MSG:
4108                 dlm_message_in(&p->message);
4109                 type = p->message.m_type;
4110                 break;
4111         case DLM_RCOM:
4112                 dlm_rcom_in(&p->rcom);
4113                 type = p->rcom.rc_type;
4114                 break;
4115         default:
4116                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4117                 return;
4118         }
4119
4120         if (hd->h_nodeid != nodeid) {
4121                 log_print("invalid h_nodeid %d from %d lockspace %x",
4122                           hd->h_nodeid, nodeid, hd->h_lockspace);
4123                 return;
4124         }
4125
4126         ls = dlm_find_lockspace_global(hd->h_lockspace);
4127         if (!ls) {
4128                 if (dlm_config.ci_log_debug)
4129                         log_print("invalid lockspace %x from %d cmd %d type %d",
4130                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
4131
4132                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4133                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4134                 return;
4135         }
4136
4137         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4138            be inactive (in this ls) before transitioning to recovery mode */
4139
4140         down_read(&ls->ls_recv_active);
4141         if (hd->h_cmd == DLM_MSG)
4142                 dlm_receive_message(ls, &p->message, nodeid);
4143         else
4144                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4145         up_read(&ls->ls_recv_active);
4146
4147         dlm_put_lockspace(ls);
4148 }
4149
4150 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4151                                    struct dlm_message *ms_stub)
4152 {
4153         if (middle_conversion(lkb)) {
4154                 hold_lkb(lkb);
4155                 memset(ms_stub, 0, sizeof(struct dlm_message));
4156                 ms_stub->m_flags = DLM_IFL_STUB_MS;
4157                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4158                 ms_stub->m_result = -EINPROGRESS;
4159                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4160                 _receive_convert_reply(lkb, ms_stub);
4161
4162                 /* Same special case as in receive_rcom_lock_args() */
4163                 lkb->lkb_grmode = DLM_LOCK_IV;
4164                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4165                 unhold_lkb(lkb);
4166
4167         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4168                 lkb->lkb_flags |= DLM_IFL_RESEND;
4169         }
4170
4171         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4172            conversions are async; there's no reply from the remote master */
4173 }
4174
4175 /* A waiting lkb needs recovery if the master node has failed, or
4176    the master node is changing (only when no directory is used) */
4177
4178 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4179 {
4180         if (dlm_is_removed(ls, lkb->lkb_nodeid))
4181                 return 1;
4182
4183         if (!dlm_no_directory(ls))
4184                 return 0;
4185
4186         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4187                 return 1;
4188
4189         return 0;
4190 }
4191
4192 /* Recovery for locks that are waiting for replies from nodes that are now
4193    gone.  We can just complete unlocks and cancels by faking a reply from the
4194    dead node.  Requests and up-conversions we flag to be resent after
4195    recovery.  Down-conversions can just be completed with a fake reply like
4196    unlocks.  Conversions between PR and CW need special attention. */
4197
4198 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4199 {
4200         struct dlm_lkb *lkb, *safe;
4201         struct dlm_message *ms_stub;
4202         int wait_type, stub_unlock_result, stub_cancel_result;
4203
4204         ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4205         if (!ms_stub) {
4206                 log_error(ls, "dlm_recover_waiters_pre no mem");
4207                 return;
4208         }
4209
4210         mutex_lock(&ls->ls_waiters_mutex);
4211
4212         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4213
4214                 /* exclude debug messages about unlocks because there can be so
4215                    many and they aren't very interesting */
4216
4217                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4218                         log_debug(ls, "recover_waiter %x nodeid %d "
4219                                   "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4220                                   lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4221                 }
4222
4223                 /* all outstanding lookups, regardless of destination  will be
4224                    resent after recovery is done */
4225
4226                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4227                         lkb->lkb_flags |= DLM_IFL_RESEND;
4228                         continue;
4229                 }
4230
4231                 if (!waiter_needs_recovery(ls, lkb))
4232                         continue;
4233
4234                 wait_type = lkb->lkb_wait_type;
4235                 stub_unlock_result = -DLM_EUNLOCK;
4236                 stub_cancel_result = -DLM_ECANCEL;
4237
4238                 /* Main reply may have been received leaving a zero wait_type,
4239                    but a reply for the overlapping op may not have been
4240                    received.  In that case we need to fake the appropriate
4241                    reply for the overlap op. */
4242
4243                 if (!wait_type) {
4244                         if (is_overlap_cancel(lkb)) {
4245                                 wait_type = DLM_MSG_CANCEL;
4246                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4247                                         stub_cancel_result = 0;
4248                         }
4249                         if (is_overlap_unlock(lkb)) {
4250                                 wait_type = DLM_MSG_UNLOCK;
4251                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4252                                         stub_unlock_result = -ENOENT;
4253                         }
4254
4255                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4256                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4257                                   stub_cancel_result, stub_unlock_result);
4258                 }
4259
4260                 switch (wait_type) {
4261
4262                 case DLM_MSG_REQUEST:
4263                         lkb->lkb_flags |= DLM_IFL_RESEND;
4264                         break;
4265
4266                 case DLM_MSG_CONVERT:
4267                         recover_convert_waiter(ls, lkb, ms_stub);
4268                         break;
4269
4270                 case DLM_MSG_UNLOCK:
4271                         hold_lkb(lkb);
4272                         memset(ms_stub, 0, sizeof(struct dlm_message));
4273                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4274                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4275                         ms_stub->m_result = stub_unlock_result;
4276                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4277                         _receive_unlock_reply(lkb, ms_stub);
4278                         dlm_put_lkb(lkb);
4279                         break;
4280
4281                 case DLM_MSG_CANCEL:
4282                         hold_lkb(lkb);
4283                         memset(ms_stub, 0, sizeof(struct dlm_message));
4284                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4285                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4286                         ms_stub->m_result = stub_cancel_result;
4287                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4288                         _receive_cancel_reply(lkb, ms_stub);
4289                         dlm_put_lkb(lkb);
4290                         break;
4291
4292                 default:
4293                         log_error(ls, "invalid lkb wait_type %d %d",
4294                                   lkb->lkb_wait_type, wait_type);
4295                 }
4296                 schedule();
4297         }
4298         mutex_unlock(&ls->ls_waiters_mutex);
4299         kfree(ms_stub);
4300 }
4301
4302 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4303 {
4304         struct dlm_lkb *lkb;
4305         int found = 0;
4306
4307         mutex_lock(&ls->ls_waiters_mutex);
4308         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4309                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4310                         hold_lkb(lkb);
4311                         found = 1;
4312                         break;
4313                 }
4314         }
4315         mutex_unlock(&ls->ls_waiters_mutex);
4316
4317         if (!found)
4318                 lkb = NULL;
4319         return lkb;
4320 }
4321
4322 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4323    master or dir-node for r.  Processing the lkb may result in it being placed
4324    back on waiters. */
4325
4326 /* We do this after normal locking has been enabled and any saved messages
4327    (in requestqueue) have been processed.  We should be confident that at
4328    this point we won't get or process a reply to any of these waiting
4329    operations.  But, new ops may be coming in on the rsbs/locks here from
4330    userspace or remotely. */
4331
4332 /* there may have been an overlap unlock/cancel prior to recovery or after
4333    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4334    overlap flag would just have been set and nothing new sent.  we can be
4335    confident here than any replies to either the initial op or overlap ops
4336    prior to recovery have been received. */
4337
4338 int dlm_recover_waiters_post(struct dlm_ls *ls)
4339 {
4340         struct dlm_lkb *lkb;
4341         struct dlm_rsb *r;
4342         int error = 0, mstype, err, oc, ou;
4343
4344         while (1) {
4345                 if (dlm_locking_stopped(ls)) {
4346                         log_debug(ls, "recover_waiters_post aborted");
4347                         error = -EINTR;
4348                         break;
4349                 }
4350
4351                 lkb = find_resend_waiter(ls);
4352                 if (!lkb)
4353                         break;
4354
4355                 r = lkb->lkb_resource;
4356                 hold_rsb(r);
4357                 lock_rsb(r);
4358
4359                 mstype = lkb->lkb_wait_type;
4360                 oc = is_overlap_cancel(lkb);
4361                 ou = is_overlap_unlock(lkb);
4362                 err = 0;
4363
4364                 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4365                           lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4366
4367                 /* At this point we assume that we won't get a reply to any
4368                    previous op or overlap op on this lock.  First, do a big
4369                    remove_from_waiters() for all previous ops. */
4370
4371                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4372                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4373                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4374                 lkb->lkb_wait_type = 0;
4375                 lkb->lkb_wait_count = 0;
4376                 mutex_lock(&ls->ls_waiters_mutex);
4377                 list_del_init(&lkb->lkb_wait_reply);
4378                 mutex_unlock(&ls->ls_waiters_mutex);
4379                 unhold_lkb(lkb); /* for waiters list */
4380
4381                 if (oc || ou) {
4382                         /* do an unlock or cancel instead of resending */
4383                         switch (mstype) {
4384                         case DLM_MSG_LOOKUP:
4385                         case DLM_MSG_REQUEST:
4386                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4387                                                         -DLM_ECANCEL);
4388                                 unhold_lkb(lkb); /* undoes create_lkb() */
4389                                 break;
4390                         case DLM_MSG_CONVERT:
4391                                 if (oc) {
4392                                         queue_cast(r, lkb, -DLM_ECANCEL);
4393                                 } else {
4394                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4395                                         _unlock_lock(r, lkb);
4396                                 }
4397                                 break;
4398                         default:
4399                                 err = 1;
4400                         }
4401                 } else {
4402                         switch (mstype) {
4403                         case DLM_MSG_LOOKUP:
4404                         case DLM_MSG_REQUEST:
4405                                 _request_lock(r, lkb);
4406                                 if (is_master(r))
4407                                         confirm_master(r, 0);
4408                                 break;
4409                         case DLM_MSG_CONVERT:
4410                                 _convert_lock(r, lkb);
4411                                 break;
4412                         default:
4413                                 err = 1;
4414                         }
4415                 }
4416
4417                 if (err)
4418                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4419                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4420                 unlock_rsb(r);
4421                 put_rsb(r);
4422                 dlm_put_lkb(lkb);
4423         }
4424
4425         return error;
4426 }
4427
4428 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4429                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4430 {
4431         struct dlm_ls *ls = r->res_ls;
4432         struct dlm_lkb *lkb, *safe;
4433
4434         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4435                 if (test(ls, lkb)) {
4436                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4437                         del_lkb(r, lkb);
4438                         /* this put should free the lkb */
4439                         if (!dlm_put_lkb(lkb))
4440                                 log_error(ls, "purged lkb not released");
4441                 }
4442         }
4443 }
4444
4445 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4446 {
4447         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4448 }
4449
4450 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4451 {
4452         return is_master_copy(lkb);
4453 }
4454
4455 static void purge_dead_locks(struct dlm_rsb *r)
4456 {
4457         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4458         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4459         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4460 }
4461
4462 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4463 {
4464         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4465         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4466         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4467 }
4468
4469 /* Get rid of locks held by nodes that are gone. */
4470
4471 int dlm_purge_locks(struct dlm_ls *ls)
4472 {
4473         struct dlm_rsb *r;
4474
4475         log_debug(ls, "dlm_purge_locks");
4476
4477         down_write(&ls->ls_root_sem);
4478         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4479                 hold_rsb(r);
4480                 lock_rsb(r);
4481                 if (is_master(r))
4482                         purge_dead_locks(r);
4483                 unlock_rsb(r);
4484                 unhold_rsb(r);
4485
4486                 schedule();
4487         }
4488         up_write(&ls->ls_root_sem);
4489
4490         return 0;
4491 }
4492
4493 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4494 {
4495         struct rb_node *n;
4496         struct dlm_rsb *r, *r_ret = NULL;
4497
4498         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4499         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4500                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4501                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4502                         continue;
4503                 hold_rsb(r);
4504                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4505                 r_ret = r;
4506                 break;
4507         }
4508         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4509         return r_ret;
4510 }
4511
4512 void dlm_grant_after_purge(struct dlm_ls *ls)
4513 {
4514         struct dlm_rsb *r;
4515         int bucket = 0;
4516
4517         while (1) {
4518                 r = find_purged_rsb(ls, bucket);
4519                 if (!r) {
4520                         if (bucket == ls->ls_rsbtbl_size - 1)
4521                                 break;
4522                         bucket++;
4523                         continue;
4524                 }
4525                 lock_rsb(r);
4526                 if (is_master(r)) {
4527                         grant_pending_locks(r);
4528                         confirm_master(r, 0);
4529                 }
4530                 unlock_rsb(r);
4531                 put_rsb(r);
4532                 schedule();
4533         }
4534 }
4535
4536 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4537                                          uint32_t remid)
4538 {
4539         struct dlm_lkb *lkb;
4540
4541         list_for_each_entry(lkb, head, lkb_statequeue) {
4542                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4543                         return lkb;
4544         }
4545         return NULL;
4546 }
4547
4548 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4549                                     uint32_t remid)
4550 {
4551         struct dlm_lkb *lkb;
4552
4553         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4554         if (lkb)
4555                 return lkb;
4556         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4557         if (lkb)
4558                 return lkb;
4559         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4560         if (lkb)
4561                 return lkb;
4562         return NULL;
4563 }
4564
4565 /* needs at least dlm_rcom + rcom_lock */
4566 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4567                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4568 {
4569         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4570
4571         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4572         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4573         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4574         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4575         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4576         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4577         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4578         lkb->lkb_rqmode = rl->rl_rqmode;
4579         lkb->lkb_grmode = rl->rl_grmode;
4580         /* don't set lkb_status because add_lkb wants to itself */
4581
4582         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4583         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4584
4585         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4586                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4587                          sizeof(struct rcom_lock);
4588                 if (lvblen > ls->ls_lvblen)
4589                         return -EINVAL;
4590                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4591                 if (!lkb->lkb_lvbptr)
4592                         return -ENOMEM;
4593                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4594         }
4595
4596         /* Conversions between PR and CW (middle modes) need special handling.
4597            The real granted mode of these converting locks cannot be determined
4598            until all locks have been rebuilt on the rsb (recover_conversion) */
4599
4600         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4601             middle_conversion(lkb)) {
4602                 rl->rl_status = DLM_LKSTS_CONVERT;
4603                 lkb->lkb_grmode = DLM_LOCK_IV;
4604                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4605         }
4606
4607         return 0;
4608 }
4609
4610 /* This lkb may have been recovered in a previous aborted recovery so we need
4611    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4612    If so we just send back a standard reply.  If not, we create a new lkb with
4613    the given values and send back our lkid.  We send back our lkid by sending
4614    back the rcom_lock struct we got but with the remid field filled in. */
4615
4616 /* needs at least dlm_rcom + rcom_lock */
4617 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4618 {
4619         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4620         struct dlm_rsb *r;
4621         struct dlm_lkb *lkb;
4622         int error;
4623
4624         if (rl->rl_parent_lkid) {
4625                 error = -EOPNOTSUPP;
4626                 goto out;
4627         }
4628
4629         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4630                          R_MASTER, &r);
4631         if (error)
4632                 goto out;
4633
4634         lock_rsb(r);
4635
4636         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4637         if (lkb) {
4638                 error = -EEXIST;
4639                 goto out_remid;
4640         }
4641
4642         error = create_lkb(ls, &lkb);
4643         if (error)
4644                 goto out_unlock;
4645
4646         error = receive_rcom_lock_args(ls, lkb, r, rc);
4647         if (error) {
4648                 __put_lkb(ls, lkb);
4649                 goto out_unlock;
4650         }
4651
4652         attach_lkb(r, lkb);
4653         add_lkb(r, lkb, rl->rl_status);
4654         error = 0;
4655
4656  out_remid:
4657         /* this is the new value returned to the lock holder for
4658            saving in its process-copy lkb */
4659         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4660
4661  out_unlock:
4662         unlock_rsb(r);
4663         put_rsb(r);
4664  out:
4665         if (error)
4666                 log_debug(ls, "recover_master_copy %d %x", error,
4667                           le32_to_cpu(rl->rl_lkid));
4668         rl->rl_result = cpu_to_le32(error);
4669         return error;
4670 }
4671
4672 /* needs at least dlm_rcom + rcom_lock */
4673 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4674 {
4675         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4676         struct dlm_rsb *r;
4677         struct dlm_lkb *lkb;
4678         int error;
4679
4680         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4681         if (error) {
4682                 log_error(ls, "recover_process_copy no lkid %x",
4683                                 le32_to_cpu(rl->rl_lkid));
4684                 return error;
4685         }
4686
4687         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4688
4689         error = le32_to_cpu(rl->rl_result);
4690
4691         r = lkb->lkb_resource;
4692         hold_rsb(r);
4693         lock_rsb(r);
4694
4695         switch (error) {
4696         case -EBADR:
4697                 /* There's a chance the new master received our lock before
4698                    dlm_recover_master_reply(), this wouldn't happen if we did
4699                    a barrier between recover_masters and recover_locks. */
4700                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4701                           (unsigned long)r, r->res_name);
4702                 dlm_send_rcom_lock(r, lkb);
4703                 goto out;
4704         case -EEXIST:
4705                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4706                 /* fall through */
4707         case 0:
4708                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4709                 break;
4710         default:
4711                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4712                           error, lkb->lkb_id);
4713         }
4714
4715         /* an ack for dlm_recover_locks() which waits for replies from
4716            all the locks it sends to new masters */
4717         dlm_recovered_lock(r);
4718  out:
4719         unlock_rsb(r);
4720         put_rsb(r);
4721         dlm_put_lkb(lkb);
4722
4723         return 0;
4724 }
4725
4726 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4727                      int mode, uint32_t flags, void *name, unsigned int namelen,
4728                      unsigned long timeout_cs)
4729 {
4730         struct dlm_lkb *lkb;
4731         struct dlm_args args;
4732         int error;
4733
4734         dlm_lock_recovery(ls);
4735
4736         error = create_lkb(ls, &lkb);
4737         if (error) {
4738                 kfree(ua);
4739                 goto out;
4740         }
4741
4742         if (flags & DLM_LKF_VALBLK) {
4743                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4744                 if (!ua->lksb.sb_lvbptr) {
4745                         kfree(ua);
4746                         __put_lkb(ls, lkb);
4747                         error = -ENOMEM;
4748                         goto out;
4749                 }
4750         }
4751
4752         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4753            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4754            lock and that lkb_astparam is the dlm_user_args structure. */
4755
4756         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4757                               fake_astfn, ua, fake_bastfn, &args);
4758         lkb->lkb_flags |= DLM_IFL_USER;
4759
4760         if (error) {
4761                 __put_lkb(ls, lkb);
4762                 goto out;
4763         }
4764
4765         error = request_lock(ls, lkb, name, namelen, &args);
4766
4767         switch (error) {
4768         case 0:
4769                 break;
4770         case -EINPROGRESS:
4771                 error = 0;
4772                 break;
4773         case -EAGAIN:
4774                 error = 0;
4775                 /* fall through */
4776         default:
4777                 __put_lkb(ls, lkb);
4778                 goto out;
4779         }
4780
4781         /* add this new lkb to the per-process list of locks */
4782         spin_lock(&ua->proc->locks_spin);
4783         hold_lkb(lkb);
4784         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4785         spin_unlock(&ua->proc->locks_spin);
4786  out:
4787         dlm_unlock_recovery(ls);
4788         return error;
4789 }
4790
4791 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4792                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4793                      unsigned long timeout_cs)
4794 {
4795         struct dlm_lkb *lkb;
4796         struct dlm_args args;
4797         struct dlm_user_args *ua;
4798         int error;
4799
4800         dlm_lock_recovery(ls);
4801
4802         error = find_lkb(ls, lkid, &lkb);
4803         if (error)
4804                 goto out;
4805
4806         /* user can change the params on its lock when it converts it, or
4807            add an lvb that didn't exist before */
4808
4809         ua = lkb->lkb_ua;
4810
4811         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4812                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4813                 if (!ua->lksb.sb_lvbptr) {
4814                         error = -ENOMEM;
4815                         goto out_put;
4816                 }
4817         }
4818         if (lvb_in && ua->lksb.sb_lvbptr)
4819                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4820
4821         ua->xid = ua_tmp->xid;
4822         ua->castparam = ua_tmp->castparam;
4823         ua->castaddr = ua_tmp->castaddr;
4824         ua->bastparam = ua_tmp->bastparam;
4825         ua->bastaddr = ua_tmp->bastaddr;
4826         ua->user_lksb = ua_tmp->user_lksb;
4827
4828         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4829                               fake_astfn, ua, fake_bastfn, &args);
4830         if (error)
4831                 goto out_put;
4832
4833         error = convert_lock(ls, lkb, &args);
4834
4835         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4836                 error = 0;
4837  out_put:
4838         dlm_put_lkb(lkb);
4839  out:
4840         dlm_unlock_recovery(ls);
4841         kfree(ua_tmp);
4842         return error;
4843 }
4844
4845 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4846                     uint32_t flags, uint32_t lkid, char *lvb_in)
4847 {
4848         struct dlm_lkb *lkb;
4849         struct dlm_args args;
4850         struct dlm_user_args *ua;
4851         int error;
4852
4853         dlm_lock_recovery(ls);
4854
4855         error = find_lkb(ls, lkid, &lkb);
4856         if (error)
4857                 goto out;
4858
4859         ua = lkb->lkb_ua;
4860
4861         if (lvb_in && ua->lksb.sb_lvbptr)
4862                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4863         if (ua_tmp->castparam)
4864                 ua->castparam = ua_tmp->castparam;
4865         ua->user_lksb = ua_tmp->user_lksb;
4866
4867         error = set_unlock_args(flags, ua, &args);
4868         if (error)
4869                 goto out_put;
4870
4871         error = unlock_lock(ls, lkb, &args);
4872
4873         if (error == -DLM_EUNLOCK)
4874                 error = 0;
4875         /* from validate_unlock_args() */
4876         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4877                 error = 0;
4878         if (error)
4879                 goto out_put;
4880
4881         spin_lock(&ua->proc->locks_spin);
4882         /* dlm_user_add_cb() may have already taken lkb off the proc list */
4883         if (!list_empty(&lkb->lkb_ownqueue))
4884                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4885         spin_unlock(&ua->proc->locks_spin);
4886  out_put:
4887         dlm_put_lkb(lkb);
4888  out:
4889         dlm_unlock_recovery(ls);
4890         kfree(ua_tmp);
4891         return error;
4892 }
4893
4894 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4895                     uint32_t flags, uint32_t lkid)
4896 {
4897         struct dlm_lkb *lkb;
4898         struct dlm_args args;
4899         struct dlm_user_args *ua;
4900         int error;
4901
4902         dlm_lock_recovery(ls);
4903
4904         error = find_lkb(ls, lkid, &lkb);
4905         if (error)
4906                 goto out;
4907
4908         ua = lkb->lkb_ua;
4909         if (ua_tmp->castparam)
4910                 ua->castparam = ua_tmp->castparam;
4911         ua->user_lksb = ua_tmp->user_lksb;
4912
4913         error = set_unlock_args(flags, ua, &args);
4914         if (error)
4915                 goto out_put;
4916
4917         error = cancel_lock(ls, lkb, &args);
4918
4919         if (error == -DLM_ECANCEL)
4920                 error = 0;
4921         /* from validate_unlock_args() */
4922         if (error == -EBUSY)
4923                 error = 0;
4924  out_put:
4925         dlm_put_lkb(lkb);
4926  out:
4927         dlm_unlock_recovery(ls);
4928         kfree(ua_tmp);
4929         return error;
4930 }
4931
4932 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4933 {
4934         struct dlm_lkb *lkb;
4935         struct dlm_args args;
4936         struct dlm_user_args *ua;
4937         struct dlm_rsb *r;
4938         int error;
4939
4940         dlm_lock_recovery(ls);
4941
4942         error = find_lkb(ls, lkid, &lkb);
4943         if (error)
4944                 goto out;
4945
4946         ua = lkb->lkb_ua;
4947
4948         error = set_unlock_args(flags, ua, &args);
4949         if (error)
4950                 goto out_put;
4951
4952         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4953
4954         r = lkb->lkb_resource;
4955         hold_rsb(r);
4956         lock_rsb(r);
4957
4958         error = validate_unlock_args(lkb, &args);
4959         if (error)
4960                 goto out_r;
4961         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4962
4963         error = _cancel_lock(r, lkb);
4964  out_r:
4965         unlock_rsb(r);
4966         put_rsb(r);
4967
4968         if (error == -DLM_ECANCEL)
4969                 error = 0;
4970         /* from validate_unlock_args() */
4971         if (error == -EBUSY)
4972                 error = 0;
4973  out_put:
4974         dlm_put_lkb(lkb);
4975  out:
4976         dlm_unlock_recovery(ls);
4977         return error;
4978 }
4979
4980 /* lkb's that are removed from the waiters list by revert are just left on the
4981    orphans list with the granted orphan locks, to be freed by purge */
4982
4983 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4984 {
4985         struct dlm_args args;
4986         int error;
4987
4988         hold_lkb(lkb);
4989         mutex_lock(&ls->ls_orphans_mutex);
4990         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4991         mutex_unlock(&ls->ls_orphans_mutex);
4992
4993         set_unlock_args(0, lkb->lkb_ua, &args);
4994
4995         error = cancel_lock(ls, lkb, &args);
4996         if (error == -DLM_ECANCEL)
4997                 error = 0;
4998         return error;
4999 }
5000
5001 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5002    Regardless of what rsb queue the lock is on, it's removed and freed. */
5003
5004 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5005 {
5006         struct dlm_args args;
5007         int error;
5008
5009         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
5010
5011         error = unlock_lock(ls, lkb, &args);
5012         if (error == -DLM_EUNLOCK)
5013                 error = 0;
5014         return error;
5015 }
5016
5017 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5018    (which does lock_rsb) due to deadlock with receiving a message that does
5019    lock_rsb followed by dlm_user_add_cb() */
5020
5021 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5022                                      struct dlm_user_proc *proc)
5023 {
5024         struct dlm_lkb *lkb = NULL;
5025
5026         mutex_lock(&ls->ls_clear_proc_locks);
5027         if (list_empty(&proc->locks))
5028                 goto out;
5029
5030         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5031         list_del_init(&lkb->lkb_ownqueue);
5032
5033         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5034                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
5035         else
5036                 lkb->lkb_flags |= DLM_IFL_DEAD;
5037  out:
5038         mutex_unlock(&ls->ls_clear_proc_locks);
5039         return lkb;
5040 }
5041
5042 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5043    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5044    which we clear here. */
5045
5046 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5047    list, and no more device_writes should add lkb's to proc->locks list; so we
5048    shouldn't need to take asts_spin or locks_spin here.  this assumes that
5049    device reads/writes/closes are serialized -- FIXME: we may need to serialize
5050    them ourself. */
5051
5052 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5053 {
5054         struct dlm_lkb *lkb, *safe;
5055
5056         dlm_lock_recovery(ls);
5057
5058         while (1) {
5059                 lkb = del_proc_lock(ls, proc);
5060                 if (!lkb)
5061                         break;
5062                 del_timeout(lkb);
5063                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5064                         orphan_proc_lock(ls, lkb);
5065                 else
5066                         unlock_proc_lock(ls, lkb);
5067
5068                 /* this removes the reference for the proc->locks list
5069                    added by dlm_user_request, it may result in the lkb
5070                    being freed */
5071
5072                 dlm_put_lkb(lkb);
5073         }
5074
5075         mutex_lock(&ls->ls_clear_proc_locks);
5076
5077         /* in-progress unlocks */
5078         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5079                 list_del_init(&lkb->lkb_ownqueue);
5080                 lkb->lkb_flags |= DLM_IFL_DEAD;
5081                 dlm_put_lkb(lkb);
5082         }
5083
5084         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5085                 memset(&lkb->lkb_callbacks, 0,
5086                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5087                 list_del_init(&lkb->lkb_cb_list);
5088                 dlm_put_lkb(lkb);
5089         }
5090
5091         mutex_unlock(&ls->ls_clear_proc_locks);
5092         dlm_unlock_recovery(ls);
5093 }
5094
5095 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5096 {
5097         struct dlm_lkb *lkb, *safe;
5098
5099         while (1) {
5100                 lkb = NULL;
5101                 spin_lock(&proc->locks_spin);
5102                 if (!list_empty(&proc->locks)) {
5103                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
5104                                          lkb_ownqueue);
5105                         list_del_init(&lkb->lkb_ownqueue);
5106                 }
5107                 spin_unlock(&proc->locks_spin);
5108
5109                 if (!lkb)
5110                         break;
5111
5112                 lkb->lkb_flags |= DLM_IFL_DEAD;
5113                 unlock_proc_lock(ls, lkb);
5114                 dlm_put_lkb(lkb); /* ref from proc->locks list */
5115         }
5116
5117         spin_lock(&proc->locks_spin);
5118         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5119                 list_del_init(&lkb->lkb_ownqueue);
5120                 lkb->lkb_flags |= DLM_IFL_DEAD;
5121                 dlm_put_lkb(lkb);
5122         }
5123         spin_unlock(&proc->locks_spin);
5124
5125         spin_lock(&proc->asts_spin);
5126         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5127                 memset(&lkb->lkb_callbacks, 0,
5128                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5129                 list_del_init(&lkb->lkb_cb_list);
5130                 dlm_put_lkb(lkb);
5131         }
5132         spin_unlock(&proc->asts_spin);
5133 }
5134
5135 /* pid of 0 means purge all orphans */
5136
5137 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5138 {
5139         struct dlm_lkb *lkb, *safe;
5140
5141         mutex_lock(&ls->ls_orphans_mutex);
5142         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5143                 if (pid && lkb->lkb_ownpid != pid)
5144                         continue;
5145                 unlock_proc_lock(ls, lkb);
5146                 list_del_init(&lkb->lkb_ownqueue);
5147                 dlm_put_lkb(lkb);
5148         }
5149         mutex_unlock(&ls->ls_orphans_mutex);
5150 }
5151
5152 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5153 {
5154         struct dlm_message *ms;
5155         struct dlm_mhandle *mh;
5156         int error;
5157
5158         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5159                                 DLM_MSG_PURGE, &ms, &mh);
5160         if (error)
5161                 return error;
5162         ms->m_nodeid = nodeid;
5163         ms->m_pid = pid;
5164
5165         return send_message(mh, ms);
5166 }
5167
5168 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5169                    int nodeid, int pid)
5170 {
5171         int error = 0;
5172
5173         if (nodeid != dlm_our_nodeid()) {
5174                 error = send_purge(ls, nodeid, pid);
5175         } else {
5176                 dlm_lock_recovery(ls);
5177                 if (pid == current->pid)
5178                         purge_proc_locks(ls, proc);
5179                 else
5180                         do_purge(ls, nodeid, pid);
5181                 dlm_unlock_recovery(ls);
5182         }
5183         return error;
5184 }
5185